Outline
1. Introduction to CountDownLatch waiting for multi-threading to complete
() method source code
() method source code
Summarize
5. Introduction to Semaphore for controlling the number of concurrent threads
Token acquisition process
Token release process
8. Introduction to CyclicBarrier
The await() method source code
10. Use CountDownLatch to wait for the completion of registration
11. Use CyclicBarrier to divide and conquer work tasks by multiple threads
12. Return results using CyclicBarrier aggregation service interface
13. Use Semaphore to wait for the specified number of threads to complete the task
volatile, synchronized, CAS, AQS, read and write locks, lock optimization and lock failures, concurrent collections, thread pools, synchronization components
1. CountDownLatch waiting for multi-threading to complete
(1) Introduction to CountDownLatch
(2) Application of CountDownLatch
(3) Example of CountDownLatch
(1) Introduction to CountDownLatch
CountDownLatch allows one or more threads to wait for other threads to complete operations. CountDownLatch provides two core methods, namely the await() method and the countDown() method. The () method allows the calling thread to block and enter a waiting state, and the () method is used to decrement the counter.
CountDownLatch needs to pass a positive integer as the initial value of the counter during construction. Every time the thread calls the countDown() method, it will decrement the counter by one. When the counter is 0, all threads that were blocked when executing the await() method will be awakened.
(2) Application of CountDownLatch
Application 1:
Use multithreading to parse the data of multiple sheets in an Excel, each thread parses the data of a sheet, and then prompt to complete the processing after all sheets are parsed. At this time, you can use CountDownLatch to implement it, of course you can use the () method.
Note: The () method is implemented based on wait() and notify(). Open a thread A in the main thread. If the main thread executes the join() method of thread A, it will cause the main thread to be blocked. The main thread will wait for thread A to complete execution before continuing to execute.
Application 2:
The register-client of the microservice registration center sends a heartbeat only after the registration thread is executed successfully. You can use CountDownLatch, and of course you can also use the() method.
Application Three:
Concurrency-like effects can be achieved through CountDownLatch. Set the CountDownLatch counter to 1, and then let 1000 threads call the await() method. When 1000 threads are initialized, the counter is called countDown() in the main thread to reset the counter to zero. In this way, these 1,000 threads will be awakened in a for() loop.
(3) Example of CountDownLatch
public class CountDownLatchDemo {
public static void main(String[] args) throws Exception {
final CountDownLatch latch = new CountDownLatch(2);
new Thread() {
public void run() {
try {
(1000);
("Thread 1 starts executing, sleeps for 2 seconds...");
(1000);
("Thread 1 is ready to perform the countDown operation...");
();
("Thread 1 completes the countDown operation...");
} catch (Exception e) {
();
}
}
}.start();
new Thread() {
public void run() {
try {
(1000);
("Thread 2 starts executing, sleeps for 2 seconds...");
(1000);
("Thread 2 is ready to perform the countDown operation...");
();
("Thread 2 completes the countDown operation...");
} catch (Exception e) {
();
}
}
}.start();
("The main thread is ready to execute the await operation of countDownLatch, and will synchronize and block and wait...");
();
("All threads complete countDown operation, end synchronization blocking and waiting...");
}
}
() method source code
(1) Blocking process of() method
(2) Wake-up process of() method
(3) Summary of blocking of() method
(1) Blocking process of() method
CountDownLatch is implemented based on shared locks in AQS. From the construction method of CountDownLatch, we can see that the count of CountDownLatch is the state of AQS.
When calling the await() method of CountDownLatch, the acquireSharedInterruptibly() template method of AQS will be called first, and then the tryAcquireShared() method implemented by the inner class Sync of CountDownLatch will be called. The tryAcquireShared() method will determine whether the value of state is 0. If it is 0, it will return 1, otherwise -1 will be returned.
When the return value obtained by calling the tryAcquireShared() method of Sync in CountDownLatch internal class Sync is -1, the doAcquireSharedInterruptibly() method of AQS will be called, encapsulated the current thread into a Node node and added to the waiting queue, and then suspend the current thread for blocking.
//A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
public class CountDownLatch {
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) {
throw new IllegalArgumentException("count < 0");
}
= new Sync(count);
}
//Synchronization control For CountDownLatch.
//Uses AQS state to represent count.
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
//Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0) {
return false;
}
int nextc = c-1;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
}
}
//Cauuses the current thread to wait until the latch has counted down to zero,
//unless the thread is Thread#interrupted.
public void await() throws InterruptedException {
//Execute AQS acquireSharedInterruptibly() method
(1);
}
...
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
...
//Acquires in shared mode, aborting if interrupted.
//Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.
//Otherwise the thread is queued, possibly repeatedly blocking and unblocking,
//invoking #tryAcquireShared until success or the thread is interrupted.
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (()) {
throw new InterruptedException();
}
//Execute the tryAcquireShared() method implemented by the inner Sync class of CountDownLatch to preempt the shared lock
if (tryAcquireShared(arg) < 0) {
//Execute AQS doAcquireSharedInterruptibly() method
doAcquireSharedInterruptibly(arg);
}
}
//Acquires in shared interruptible mode.
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter();//Encapsulate the Node node of the current thread as Shared type
boolean failed = true;
try {
//The first loop r = -1, so the shouldParkAfterFailedAcquire() method of AQS will be executed
//Set the status of the effective predecessor node of the node node to SIGNAL
for (;;) {
final Node p = ();//The predecessor node of the node node
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
= null; // help GC
failed = false;
return;
}
}
//Execute shouldParkAfterFailedAcquire() method to set the state of the predecessor node of the node to SIGNAL
//Execute the parkAndCheckInterrupt() method to hang the current thread
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
throw new InterruptedException();
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
//Checks and updates status for a node that failed to acquire.
//Returns true if thread should block. This is the main signal control in all acquire loops.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = ;
if (ws == ) {
//This node has already set status asking a release to signal it, so it can safely park.
return true;
}
if (ws > 0) {
//Predecessor was cancelled. Skip over predecessors and indicate retry.
do {
= pred = ;
} while ( > 0);
= node;
} else {
//waitStatus must be 0 or PROPAGATE.
//Indicate that we need a signal, but don't park yet.
//Caller will need to retry to make sure it cannot acquire before parking.
compareAndSetWaitStatus(pred, ws, );
}
return false;
}
//Set header nodes and wake up subsequent threads
//Sets head of queue, and checks if successor may be waiting in shared mode,
//if so propagating if either propagate > 0 or PROPAGATE status was set.
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);//Set the node node as the header node
if (propagate > 0 || h == null || < 0 || (h = head) == null || < 0) {
Node s = ;
if (s == null || ()) {
doReleaseShared();
}
}
}
private void setHead(Node node) {
head = node;
= null;
= null;
}
...
}
(2) Wake-up process of() method
When calling the await() method, the current thread will be encapsulated into a Node node and added to the waiting queue. Then, when executing the first for loop, the predecessor node status of the Node node will be set to SIGNAL, and then the execution will be performed. The current thread will be suspended and blocked only during the second for loop.
When the thread is subsequently awakened, the thread will enter the next for loop. If the predecessor node of the corresponding node of the thread is the header node of the waiting queue and the state value is 0, then execute the setHeadAndPropagate() method of AQS to set the header node + wake up subsequent threads.
The setHeadAndPropagate() method has two work (set header node + wake up pass):
Work 1: Set the node corresponding to the currently awakened thread as the header node
Work 2: When the following two conditions are met, doReleaseShared() method needs to be called to wake up subsequent threads
Condition 1: propagate > 0, indicating that it is currently a shared lock and needs to be awakened.
Condition 2: () Determine that the current node is in a shared mode
The implementation of CountDownLatch will call the doReleaseShared() method in the following two scenarios:
Scenario 1: The countDown() method called when state is 1 will call the doReleaseShared() method
Scenario 2: When the blocking thread is awakened, the setHeadAndPropagate() method will be called, and then the doReleaseShared() method will be called, which can improve the speed of waking up the shared nodes.
(3) Summary of blocking of() method
As long as state != 0, the following processing will be performed:
1. Encapsulate the current thread into a Node node and add it to the AQS waiting queue
2. Call the() method and suspend the current thread
() method source code
(1)() Wakeup process
(2)()
(3) AQS doReleaseShared() method
(1)() Wakeup process
When calling the countDown() method of CountDownLatch, the releaseShared() template method of AQS will be called first, and then the tryReleaseShared() method implemented by the inner class Sync of CountDownLatch will be executed.
If the tryReleaseShared() method returns true, execute AQS's doReleaseShared() method and wake up the thread in the waiting queue in shared lock mode through AQS's doReleaseShared() method.
//A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
public class CountDownLatch {
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) {
throw new IllegalArgumentException("count < 0");
}
= new Sync(count);
}
//Synchronization control For CountDownLatch.
//Uses AQS state to represent count.
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
//Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0) {
return false;
}
int nextc = c-1;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
}
}
//Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
public void countDown() {
//Execute AQS releaseShared() method
(1);
}
...
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
...
//Releases in shared mode.
//Implemented by unblocking one or more threads if #tryReleaseShared returns true.
public final boolean releaseShared(int arg) {
//Execute the tryReleaseShared() method implemented by the inner Sync class of CountDownLatch to release the shared lock
if (tryReleaseShared(arg)) {
//Execute the doReleaseShared() method of AQS
doReleaseShared();
return true;
}
return false;
}
//Release action for shared mode -- signals successor and ensures propagation.
//Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.
private void doReleaseShared() {
for (;;) {
//The head node changes every time the loop
//Because calling unparkSuccessor() method will wake up the blocking thread in the doAcquireSharedInterruptibly() method
//The blocking thread will modify the header node through setHead() when executing the setHeadAndPropagate() method
Node h = head;//Get the latest header node
if (h != null && h != tail) {//Waiting for the node where the suspended thread exists in the queue
int ws = ;
if (ws == ) {//The header node is in normal state, which means that the corresponding thread can be woken up
if (!compareAndSetWaitStatus(h, , 0)) {
continue;//loop to recheck cases
}
//Wake up the successor node of the head node
//The wake-up thread will execute the setHeadAndPropagate() method in the doAcquireSharedInterruptibly() method to modify the header node
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, )) {
//If ws = 0 indicates the initial state, the modification node is PROPAGATE state
continue;//loop on failed CAS
}
}
if (h == head) {//Judge whether the head node has changed
break;//loop if head changed
}
}
}
//Wakes up node's successor, if one exists.
private void unparkSuccessor(Node node) {
int ws = ;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
Node s = ;
if (s == null || > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = ) {
if ( <= 0) {
s = t;
}
}
}
if (s != null) {
();
}
}
...
}
(2)()
From the tryReleaseShared() method, we can see that countDown() is actually decrementing the AQS state value by 1, and then updating the state value through CAS. If the CAS setting is successful, then it is determined whether the current state value is 0. If it is 0, it returns true, and if it is not 0, it returns false. When true, AQS's doReleaseShared() method will be called to wake up the thread waiting in the queue.
(3) AQS doReleaseShared() method
This method needs to wake up the successor node of the header node from the waiting queue of AQS, and it needs to meet:
Condition 1: There must be a node in the waiting queue for suspending thread (h != null && h != tail)
Condition 2: The state of the waiting queue's head node is normal (= )
In shared lock mode, when state is 0, all pending threads need to be awakened through wake-up pass. First, the doReleaseShared() method will perform spin operations through for(;;);. Each loop will use Node h = head to obtain the latest head node in the waiting queue, and then use if (h == head) to judge the waiting queue. Whether the head node in the change. If there is no change, the spin exits.
Note: In shared lock mode, threads in the waiting queue waken by unparkSuccessor() will continue to execute the setHeadAndPropagate() method to modify the header node in the doAcquireSharedInterruptibly() method to achieve wake-up delivery.
Summarize
Suppose there are two threads A and B that call the await() method of CountDownLatch respectively. At this time, the counter represented by state is not 0. So threads A and B will be encapsulated into SHARED-type nodes and added to the AQS waiting queue.
When thread C calls the coutDown() method of CountDownLatch, if the state is decremented to 0, the doReleaseShared() method will be called to wake up the thread in the waiting queue. Then the awakened thread will continue to call the setHeadAndPropagate() method to achieve wake-up pass, thereby continuing to wake up all blocked threads in the waiting queue in the doReleaseShared() method.
5. Introduction to Semaphore for controlling the number of concurrent threads
(1) The role of Semaphore
(2) Semaphore method
(3) Analysis of Semaphore principle
(1) The role of Semaphore
Semaphore semaphore is used to control the number of threads that access specific resources at the same time, and there are two core methods.
Method 1: acquire() method, obtain a token
Method 2: release() method, release a token
When multiple threads access a resource that restricts access traffic, they can first call acquire() to obtain the access token. If it can be obtained normally, access is allowed. If the token is not enough, the current thread will be blocked. When a thread that obtains the token releases a token through the release() method, the thread blocked in the acquire() method has a chance to obtain the released token.
public class SemaphoreDemo {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(10, true);//Initialize 10 resources and use fair locks
();//Every time a resource is obtained, if it cannot be obtained, the thread will block
();//Release a resource
}
}
(2) Semaphore method
Semaphore actually does not have a real token sent to the thread. Semaphore just counts and maintains a number of allocables, or carries out license management. Semaphore can implement traffic control in scenarios with limited public resources, such as database connections.
1. Semaphore(permits, fair): permits represent the number of tokens, fair means fairness
2.acquire(permits): Get the specified number of tokens, and if the number is insufficient, the current thread will be blocked.
3.tryAcquire(permits): Try to get the specified number of tokens. This process is non-blocking. It returns true for success and false for failure.
4.release(permits): Release the specified number of tokens
5.drainPermits(): The current thread gets all remaining tokens
6.hasQueuedThread(): determines whether there is a thread waiting for the token on the current Semaphore instance
(3) Analysis of Semaphore principle
Semaphore is also implemented based on shared locks in AQS. The parameter permits passed when creating a Semaphore instance is actually the state property in AQS. Each time Semaphore's acquire() method is called, the state value will be decremented.
So fundamentally speaking, Semaphore is implemented by rewriting two methods of AQS:
Method 1: tryAcquireShared(), preempt the shared lock
Method 2: tryReleaseShared(), release the shared lock
public class Semaphore implements {
private final Sync sync;
//Creates a Semaphore with the given number of permits and nonfair fairness setting.
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
//Acquires a permit from this semaphore, blocking until one is available,
//or the thread is Thread#interrupt interrupted.
public void acquire() throws InterruptedException {
//Execute AQS template method acquireSharedInterruptibly()
(1);
}
//Releases a permit, returning it to the semaphore.
public void release() {
//Execute AQS template method releaseShared()
(1);
}
//Synchronization implementation for semaphore.
//Uses AQS state to represent permits. Subclassed into fair and nonfair versions.
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
//Set the value of state to the number of incoming tokens
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) {
throw new Error("Maximum permit count exceeded");
}
if (compareAndSetState(current, next)) {
return true;
}
}
}
...
}
...
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
...
//Acquires in shared mode, aborting if interrupted.
//Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.
//Otherwise the thread is queued, possibly repeatedly blocking and unblocking,
//invoking #tryAcquireShared until success or the thread is interrupted.
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (()) {
throw new InterruptedException();
}
//Execute the tryAcquireShared() method implemented by subclass of Semaphore's inner class Sync to preempt the shared lock
if (tryAcquireShared(arg) < 0) {
//Execute AQS doAcquireSharedInterruptibly() method
doAcquireSharedInterruptibly(arg);
}
}
//Releases in shared mode.
//Implemented by unblocking one or more threads if #tryReleaseShared returns true.
public final boolean releaseShared(int arg) {
//Execute the tryReleaseShared() method implemented by Semaphore's internal class Sync to release the shared lock
if (tryReleaseShared(arg)) {
//Execute the doReleaseShared() method of AQS
doReleaseShared();
return true;
}
return false;
}
...
}
Token acquisition process
(1) Semaphore's token acquisition process
(2) Semaphore's fair strategy
(3) Semaphore's unfair strategy
(4) Processing after tryAcquireShared()
(1) Semaphore's token acquisition process
When calling Semaphore's acquire() method to obtain the token: First, AQS's template method acquireSharedInterruptibly() will be executed, and then the tryAcquireShared() method implemented by the Sync subclass is executed to preempt the lock. If the preemption lock fails, the doAcquireSharedInterruptibly() method of AQS is executed. This method encapsulates the current thread into a Node node and joins the waiting queue, and then hangs the thread.
(2) Semaphore's fair strategy
When executing the tryAcquireShared() method of the Sync subclass FairSync to try to obtain the token, first use the hasQueuedPredecessors() of AQS to determine whether there are threads in the waiting queue. If there are already threads in the waiting queue, the current thread will inevitably fail to obtain the token. Otherwise, decrement the value of state + determine whether state is less than 0 + CAS setting the value of state.
(3) Semaphore's unfair strategy
When executing the tryAcquireShared() method of the Sync subclass NonfairSync to try to obtain the token, the nonfairTryAcquireShared() method of Sync will be directly executed to obtain the token, that is, decrementing the value of the state + determine whether the state is less than 0 + CAS sets the value of the state .
(4) Processing after tryAcquireShared()
Whether it is fair strategy or non-fair strategy, the corresponding tryAcquireShared() method preempts the token through spin (CAS setting state). The tryAcquireShared() method will not return a value less than 0 until the number of tokens is insufficient. Then trigger the doAcquireSharedInterruptibly() method to execute AQS, which will encapsulate the current thread into a Node node and join the waiting queue, and then hang the thread.
public class Semaphore implements {
private final Sync sync;
//Creates a Semaphore with the given number of permits and nonfair fairness setting.
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
//Get the token in an unfair lock
protected int tryAcquireShared(int acquires) {
//Execute Sync's nonfairTryAcquireShared() method
return nonfairTryAcquireShared(acquires);
}
}
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
//Get the token in a fair lock
protected int tryAcquireShared(int acquires) {
for (;;) {
//If there are already threads waiting for the queue, it means that the token will inevitably fail to obtain.
if (hasQueuedPredecessors()) {
return -1;
}
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
//Acquires a permit from this semaphore, blocking until one is available,
//or the thread is Thread#interrupt interrupted.
public void acquire() throws InterruptedException {
//Execute AQS template method acquireSharedInterruptibly()
(1);
}
//Synchronization implementation for semaphore.
//Uses AQS state to represent permits. Subclassed into fair and nonfair versions.
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
//Set the value of state to the number of incoming tokens
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
...
}
...
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
...
//Acquires in shared mode, aborting if interrupted.
//Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.
//Otherwise the thread is queued, possibly repeatedly blocking and unblocking,
//invoking #tryAcquireShared until success or the thread is interrupted.
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (()) {
throw new InterruptedException();
}
//Execute the tryAcquireShared() method implemented by subclass of Semaphore's inner class Sync to preempt the shared lock
if (tryAcquireShared(arg) < 0) {
//Execute AQS doAcquireSharedInterruptibly() method
doAcquireSharedInterruptibly(arg);
}
}
//Queries whether any threads have been waiting to acquire longer than the current thread.
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t && ((s = ) == null || != ());
}
//Acquires in shared interruptible mode.
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter();//Encapsulate the Node node of the current thread as Shared type
boolean failed = true;
try {
//The first loop r = -1, so the shouldParkAfterFailedAcquire() method of AQS will be executed
//Set the status of the effective predecessor node of the node node to SIGNAL
for (;;) {
final Node p = ();//The predecessor node of the node node
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
= null; // help GC
failed = false;
return;
}
}
//Execute shouldParkAfterFailedAcquire() method to set the state of the predecessor node of the node to SIGNAL
//Execute the parkAndCheckInterrupt() method to hang the current thread
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
throw new InterruptedException();
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
...
}
Token release process
(1) Semaphore's token release process
(2) Semaphore's token releases essence
(1) Semaphore's token release process
When calling Semaphore's release() method to release the token: First, the AQS template method releaseShared() will be executed, and then the Sync-implemented tryReleaseShared() method will be executed to release the lock (absorbing state value). If the lock is released successfully, execute AQS's doReleaseShared() method to wake up the thread.
(2) Semaphore's token releases essence
The essence of the release() method of Semaphore's release token is to accumulate the state field, and then wake up the successor node of the waiting queue head node + wake up the waiting thread.
Note: The thread that executes the acquire() method is not necessary to call the release() method. Any thread can call the release() method, or the number of tokens can be reduced through the reducePermits() method.
public class Semaphore implements {
private final Sync sync;
//Creates a Semaphore with the given number of permits and nonfair fairness setting.
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//Releases a permit, returning it to the semaphore.
public void release() {
//Execute AQS template method releaseShared()
(1);
}
//Synchronization implementation for semaphore.
//Uses AQS state to represent permits. Subclassed into fair and nonfair versions.
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
//Set the value of state to the number of incoming tokens
setState(permits);
}
//Try to release the lock, that is, accumulate the state value
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) {
throw new Error("Maximum permit count exceeded");
}
if (compareAndSetState(current, next)) {
return true;
}
}
}
...
}
...
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
...
//Releases in shared mode.
//Implemented by unblocking one or more threads if #tryReleaseShared returns true.
public final boolean releaseShared(int arg) {
//Execute the tryReleaseShared() method implemented by Semaphore's internal class Sync to release the shared lock
if (tryReleaseShared(arg)) {
//Execute AQS doReleaseShared() method to wake up the thread waiting in the queue
doReleaseShared();
return true;
}
return false;
}
//Release action for shared mode -- signals successor and ensures propagation.
//Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.
private void doReleaseShared() {
for (;;) {
//The head node changes every time the loop
//Because calling unparkSuccessor() method will wake up the blocking thread in the doAcquireSharedInterruptibly() method
//The blocking thread will modify the header node through setHead() when executing the setHeadAndPropagate() method
Node h = head;//Get the latest header node
if (h != null && h != tail) {//Waiting for the node where the suspended thread exists in the queue
int ws = ;
if (ws == ) {//The header node is in normal state, which means that the corresponding thread can be woken up
if (!compareAndSetWaitStatus(h, , 0)) {
continue;//loop to recheck cases
}
//Wake up the successor node of the head node
//The wake-up thread will execute the setHeadAndPropagate() method in the doAcquireSharedInterruptibly() method to modify the header node
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, )) {
//If ws = 0 indicates the initial state, the modification node is PROPAGATE state
continue;//loop on failed CAS
}
}
if (h == head) {//Judge whether the head node has changed
break;//loop if head changed
}
}
}
//Wakes up node's successor, if one exists.
private void unparkSuccessor(Node node) {
int ws = ;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
Node s = ;
if (s == null || > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = ) {
if ( <= 0) {
s = t;
}
}
}
if (s != null) {
();
}
}
...
}
8. Introduction to CyclicBarrier
(1) The role of CyclicBarrier
(2) The basic principles of CyclicBarrier
(1) The role of CyclicBarrier
CyclicBarrier literally means a recyclable barrier. The main function of CyclicBarrier is to make a group of threads block when reaching a barrier. The barrier will not be opened until the last thread reaches the barrier. Then all threads intercepted by the barrier continue to execute together. The thread entry barrier is implemented through the await() method of CyclicBarrier.
(2) The basic principles of CyclicBarrier
Suppose that 3 threads will call the await() method of CyclicBarrier during operation, and the time taken by each thread from the beginning to the execution of the await() method may be different. In the end, when the thread with the longest execution time reaches the barrier, it will wake up. Other threads that arrive at the barrier earlier continue to execute.
CyclicBarrier contains two levels of meaning:
First, the Barrier barrier point. Threads call await() method will block at the barrier point until all threads reach the barrier point before releasing it.
The second is the Cyclic cycle. When all threads pass through the current barrier point, they can enter the next round of barrier point to wait, and can cycle continuously.
The await() method source code
(1) Member variables of CyclicBarrier
(2) CyclicBarrier's await() method source code
(3) Comparison between CountDownLatch and CyclicBarrier
(1) Member variables of CyclicBarrier
//A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
//CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
//The barrier is called cyclic because it can be re-used after the waiting threads are released.
public class CyclicBarrier {
...
private static class Generation {
boolean broken = false;
}
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = ();// Used to wake up each other between threads
private final int parties;//Number of threads participating
private int count;//The initial value is parties, and every time await() is called, it will be reduced by 1
private final Runnable barrierCommand;//Callback task
private Generation generation = new Generation();
...
}
CyclicBarrier is implemented based on ReentrantLock + Condition.
Parties means that the number of threads reaching the barrier point is required each time. Only when the number of threads reaching the barrier point meets the specified number of parties, all threads will be awakened.
count is a counter with initial value of parties. Each thread calls the await() method to decrement count by 1. When count is 0, all threads will be awakened and the current barrier cycle generation will be terminated. Then all threads will enter the next barrier cycle. , and count will be restored to parties.
(2) CyclicBarrier's await() method source code
When a thread calls the await() method of CyclicBarrier, it triggers the call to the dowait() method of CyclicBarrier.
The dowait() method of CyclicBarrier decrements the count counter. If count is decremented to 0, nextGeneration() of CyclicBarrier will be called to wake up all threads. At the same time, if the asynchronous callback task barrierCommand is not empty, the task will be executed. If the count has not been decremented to 0, the await() method of Condition is called to block the current thread.
In addition to being awakened by the nextGeneration() method of CyclicBarrier, the blocked thread will also be awakened by the interrupt() method of Thread and will be awakened by the interrupt exception. These wake-ups will call the breakBarrier() method of CyclicBarrier.
In the nextGeneration() method of CyclicBarrier and the breakBarrier() method of CyclicBarrier, all blocked waiting threads will be awakened through the signalAll() method of Condition.
//A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
//CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
//The barrier is called cyclic because it can be re-used after the waiting threads are released.
public class CyclicBarrier {
...
private static class Generation {
boolean broken = false;// Used to mark whether the barrier is interrupted
}
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = ();// Used to wake up each other between threads
private final int parties;//Number of threads participating
private int count;//The initial value is parties, and every time await() is called, it will be reduced by 1
private final Runnable barrierCommand;//Callback task
private Generation generation = new Generation();
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
= parties;
= parties;
= barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
//Waits until all #getParties have invoked await on this barrier.
public int await() throws InterruptedException, BrokenBarrierException {
try {
//Execute the dowait() method of CyclicBarrier
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
//Main barrier code, covering the various policies.
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = ;
();//Use Condition to get the lock first
try {
//Get the current generation
final Generation g = generation;
//Confirm whether the current generation barrier is valid. If the generation broke is true, a barrier interrupt exception will be thrown
if () {
throw new BrokenBarrierException();
}
if (()) {
breakBarrier();
throw new InterruptedException();
}
//Count the number of threads that have reached the current generation
int index = --count;
//If index is 0, it means that all threads have reached the barrier point
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null) {
//Trigger callback
();
}
ranAction = true;
//Execute the nextGeneration() method to wake up all threads and enter the next barrier cycle at the same time
nextGeneration();
return 0;
} finally {
if (!ranAction) {
breakBarrier();
}
}
}
//loop until tripped, broken, interrupted, or timed out
//If index > 0, block the current thread
for (;;) {
try {
if (!timed) {
//Use the Await() method of Condition to release the lock while blocking the current thread.
//In this way, other threads can obtain the lock execution index = --count
();
} else if (nanos > 0L) {
nanos = (nanos);
}
} catch (InterruptedException ie) {
if (g == generation && ! ) {
breakBarrier();
throw ie;
} else {
().interrupt();
}
}
if () {
throw new BrokenBarrierException();
}
if (g != generation) {
return index;
}
if (timed && nanos <= 0L) {
//Interrupt barrier, set to true
breakBarrier();
throw new TimeoutException();
}
}
} finally {
();
}
}
//Updates state on barrier trip and wakes up everyone.
//Called only while holding lock.
private void nextGeneration() {
//Wake up all waiting threads through the signalAll() of Condition
();
//Restore count
count = parties;
//Enter new generation
generation = new Generation();
}
//Sets current barrier generation as broken and wakes up everyone.
//Called only while holding lock.
private void breakBarrier() {
= true;
count = parties;
//Wake up all waiting threads through the signalAll() of Condition
();
}
...
}
(3) Comparison between CountDownLatch and CyclicBarrier
1. CyclicBarrier can be reused and can respond to interrupts
2. CountDownLatch's counter can only be used once, but can be reset through reset() method.
10. Use CountDownLatch to wait for the completion of registration
The NameNode of Hadoop HDFS (Distributed Storage System) is divided into two main and backup nodes. Each DataNode will register with two NameNodes when it is started. At this time, you can use CountDownLatch to wait for the completion of registration with the main and backup node.
//DataNode startup class
public class DataNode {
//Is it still running
private volatile Boolean shouldRun;
//Components responsible for communicating with a set of NameNodes (main NameNode + backup NameNode)
private NameNodeGroupOfferService offerService;
//Initialize DataNode
private void initialize() {
= true;
= new NameNodeGroupOfferService();
();
}
//Run DataNode
private void run() {
try {
while(shouldRun) {
(10000);
}
} catch (Exception e) {
();
}
}
public static void main(String[] args) {
DataNode datanode = new DataNode();
();
();
}
}
//The thread component responsible for communication with a NameNode
public class NameNodeServiceActor {
//Register with a NameNode
public void register(CountDownLatch latch) {
Thread registerThread = new RegisterThread(latch);
();
}
//The thread responsible for registering, pass in a CountDownLatch
class RegisterThread extends Thread {
CountDownLatch latch;
public RegisterThread(CountDownLatch latch) {
= latch;
}
@Override
public void run() {
try {
//Send rpc interface call request to NameNode to register
("Send a request to NameNode for registration...");
(1000);
();
} catch (Exception e) {
();
}
}
}
}
//Thread components responsible for communicating with a set of NameNodes (main NameNode + backup NameNode)
public class NameNodeGroupOfferService {
//ServiceActor component responsible for communicating with NameNode master node
private NameNodeServiceActor activeServiceActor;
//ServiceActor component responsible for communicating with NameNode backup node
private NameNodeServiceActor standbyServiceActor;
//Constructor
public NameNodeGroupOfferService() {
= new NameNodeServiceActor();
= new NameNodeServiceActor();
}
//Start the OfferService component
public void start() {
// Directly use two ServiceActor components to register with the two NameNode nodes respectively.
register();
}
//Register with the two NameNode nodes of the master and backup
private void register() {
try {
CountDownLatch latch = new CountDownLatch(2);
(latch);
(latch);
();//Blocking and waiting for the main and backup to complete registration
("The main and backup NameNode has been registered...");
} catch (Exception e) {
();
}
}
}
11. Use CyclicBarrier to divide and conquer work tasks by multiple threads
//Output result:
//Thread 1 performs part of its work...
//Thread 2 performs part of its own work...
//Thread 3 performs part of its own work...
//All threads complete their own tasks and can merge the results...
//The final result merge is completed, and thread 3 can exit...
//The final result merge is completed, and thread 1 can exit...
//The final result merge is completed, and thread 2 can exit...
public class CyclicBarrierDemo {
public static void main(String[] args) {
final CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
public void run() {
("All threads complete their own tasks and can merge the results...");
}
});
new Thread() {
public void run() {
try {
("Thread 1 performs part of its own work...");
();
("The final result merge is completed, and thread 1 can exit...");
} catch (Exception e) {
();
}
}
}.start();
new Thread() {
public void run() {
try {
("Thread 2 performs part of its own work...");
();
("The final result merge is completed, and thread 2 can exit...");
} catch (Exception e) {
();
}
}
}.start();
new Thread() {
public void run() {
try {
("Thread 3 performs part of its own work...");
();
("The final result merge is completed, and thread 3 can exit...");
} catch (Exception e) {
();
}
}
}.start();
}
}
12. Return results using CyclicBarrier aggregation service interface
Of course, CountDownLatch can also be used to implement the return result of the aggregate service interface;
public class ApiServiceDemo {
public Map<String, Object> queryOrders() throws Exception {
final List<Object> results = new ArrayList<Object>();
final Map<String, Object> map = new ConcurrentHashMap<String, Object>();
CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
("price", (0));
("order", (1));
("stats", (2));
}
});
//Request price interface
new Thread() {
public void run() {
try {
("Request price service...");
(1000);
(new Object());
();
} catch (Exception e) {
();
}
};
}.start();
//Request Order Interface
new Thread() {
public void run() {
try {
("Request Order Service...");
(1000);
(new Object());
();
} catch (Exception e) {
();
}
};
}.start();
//Request statistics interface
new Thread() {
public void run() {
try {
("Request Order Statistics Service...");
(1000);
(new Object());
();
} catch (Exception e) {
();
}
};
}.start();
while(() < 3) {
(100);
}
return map;
}
}
13. Use Semaphore to wait for the specified number of threads to complete the task
Semaphore can be implemented to wait for a specified number of threads to complete the task before executing it.
//The output result is as follows:
//Thread 2 performs a calculation task
//Wait for 1 thread to complete the task...
//Thread 1 executes a calculation task
public class SemaphoreDemo {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(0);
new Thread() {
public void run() {
try {
(2000);
("Thread 1 executes a calculation task");
();
} catch (Exception e) {
();
}
}
}.start();
new Thread() {
public void run() {
try {
(1000);
("Thread 2 executes a calculation task");
();
} catch (Exception e) {
();
}
}
}.start();
(1);
("Should you wait for 1 thread to complete the task...");
}
}