Location>code7788 >text

JUC Concurrency—Source Code Analysis II

Popularity:303 ℃/2025-02-18 22:35:03

Outline

The basic principle

2. ReentractReadWriteLock based on AQS implementation

How to compete for lock writing

How to compete for reading locks

Fair locks and unfair locks

Lock downgrade

Introduction to instructions

Source code implementation

 

The basic principle

(1)Read lock and write lock relationship

On the surface, the read lock and the write lock are two locks, but in fact they are just two views of the same lock. When initializing the read lock and the write lock, they share a Sync, that is, the same lock and two types of threads. Among them, read threads and read threads are not mutually exclusive, read threads and write threads are mutually exclusive, and write threads and write threads are mutually exclusive.

 

(2) Lock status design

Like exclusive locks, read and write locks also use state variables to represent the state of the lock. Just split the state variable into two halves: the high 16 bits represent the read lock state, and the low 16 bits represent the write lock state.

 

A read-write lock is used to quickly determine the state of read and write through bit operations. Assuming the current state = s, the write state is equal to s & ((1 << 16) - 1), and the read state is equal to s >>> 16. When the write state increases by 1, state = s + 1. When the read state is added 1, state = s + (1 << 16).

 

Split an int-type state variable in half, instead of using two int-type variables to represent the state of read lock and write lock respectively, because it is impossible to operate two int-type variables at the same time with one CAS.

1 << 0 equals 1, (1 << 0) - 1 = 0
 1 << 1 equals 10, (1 << 1) - 1 = 01
 1 << 2 equals 100, (1 << 2) - 1 = 011
 1 << 4 equals 1000, (1 << 4) - 1 = 0111
 1 << 8 equals 100000000, (1 << 8) - 1 = 0111111111
 1 << 16 equals 100000000000000000000, (1 << 16) - 1 = 01111111111111111111
 //So s & ((1 << 16) - 1) is equivalent to erasing all the high 16 bits of s, leaving only the low 16 bits

 //If s = 11111, then s >>> 2 = 00111
 //So s >>> 16 means unsigned complement 0 and shift right to 16 bits

(3) Acquisition and release of write lock

A write lock is a reentrant exclusive lock that can only be acquired by one thread at the same time. If the current thread has acquired the write lock, the write state is increased: s + 1. If the current thread is acquiring the write lock, the read lock has been acquired or it is not the thread that has already obtained the write lock, then it enters a waiting state.

 

(4) Acquisition and release of read lock

A read lock is a reentrant shared lock that can be acquired by multiple threads at the same time. When the write state is 0, the read lock will always be successfully acquired, and all it does is to increase the read state. If the current thread has acquired the read lock, the read state is increased: state = s + (1 << 16). If the current thread is acquiring the read lock and the write lock has been acquired by other threads, the thread enters a waiting state.

 

2. ReentractReadWriteLock based on AQS implementation

(1) ReentractReadWriteLock constructor

(2) Read lock and write lock of ReentractReadWriteLock

(3) ReentractReadWriteLock's two pairs of template methods based on AQS

 

(1) ReentractReadWriteLock constructor

The readerLock variable represents a read lock, the writerLock variable represents a write lock, and the sync variable represents a fair lock or a non-fair lock.

public class ReentrantReadWriteLock implements ReadWriteLock, {
     //Inner class providing readlock
     private final readerLock;
     //Inner class providing writelock
     private final writerLock;
     //Performs all synchronization mechanics
     final Sync sync;

     //Creates a new ReentrantReadWriteLock with default (nonfair) ordering properties.
     public ReentrantReadWriteLock() {
         this(false);
     }

     //Creates a new ReentrantReadWriteLock with the given fairness policy.
     //@param fair {@code true} if this lock should use a fair ordering policy
     public ReentrantReadWriteLock(boolean fair) {
         sync = fair ? new FairSync() : new NonfairSync();
         readerLock = new ReadLock(this);
         writerLock = new WriteLock(this);
     }
    
     //Get write lock
     public writeLock() {
         return writerLock;
     }
    
     //Acquiring the read lock
     public readLock() {
         return readerLock;
     }
     ...
 }

(2) Read lock and write lock of ReentractReadWriteLock

Both read locks and write locks will be used to obtain and release locks through internal class Sync. Write a lock to call AQS's acquire() method, and then call tryAcquire() method. Adding a read lock will call AQS's acquireShared() method, and then call acquireShared() method.

public class ReentrantReadWriteLock implements ReadWriteLock,  {
    ...
    //Performs all synchronization mechanics
    final Sync sync;

    //The lock returned by method {@link ReentrantReadWriteLock#readLock}.
    public static class ReadLock implements Lock,  {
        private final Sync sync;
      
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = ;
        }
      
        //Acquires the read lock.
        //Acquires the read lock if the write lock is not held by another thread and returns immediately.
        //If the write lock is held by another thread then the current thread becomes disabled 
        //for thread scheduling purposes and lies dormant until the read lock has been acquired.
        public void lock() {
            (1);
        }

        //Acquires the read lock unless the current thread is Thread#interrupt interrupted.
        public void lockInterruptibly() throws InterruptedException {
            (1);
        }

        //Acquires the read lock only if the write lock is not held by another thread at the time of invocation.
        public boolean tryLock() {
            return ();
        }

        //Acquires the read lock if the write lock is not held by another thread 
        //within the given waiting time and the current thread has not been Thread#interrupt interrupted.
        public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
            return (1, (timeout));
        }

        //Attempts to release this lock.
        //If the number of readers is now zero then the lock is made available for write lock attempts.
        public void unlock() {
            (1);
        }
        ...
    }

    //The lock returned by method {@link ReentrantReadWriteLock#writeLock}.
    public static class WriteLock implements Lock,  {
        private final Sync sync;
      
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = ;
        }

        //Acquires the write lock.
        //Acquires the write lock if neither the read nor write lock are held by another thread and returns immediately, 
        //setting the write lock hold count to one.
        //If the current thread already holds the write lock then the hold count is incremented by one and the method return immediately.
        //If the lock is held by another thread then the current thread becomes disabled 
        //for thread scheduling purposes and lies dormant until the write lock has been acquired, 
        //at which time the write lock hold count is set to one.
        public void lock() {
            (1);
        }

        //Acquires the write lock unless the current thread is Thread#interrupt interrupted.
        public void lockInterruptibly() throws InterruptedException {
            (1);
        }

        //Acquires the write lock only if it is not held by another thread at the time of invocation.
        public boolean tryLock( ) {
            return ();
        }

        //Acquires the write lock if it is not held by another thread 
        //within the given waiting time and the current thread has not been Thread#interrupt interrupted.
        public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
            return (1, (timeout));
        }

        //Attempts to release this lock.
        //If the current thread is the holder of this lock then the hold count is decremented. 
        //If the hold count is now zero then the lock is released. 
        //If the current thread is not the holder of this lock then IllegalMonitorStateException is thrown.
        public void unlock() {
            (1);
        }
        ...
    }
    ...
}

(3) ReentractReadWriteLock's two pairs of template methods based on AQS

The exclusive lock and the write lock of read and write lock are implemented based on AQS's acquire/release template method. The read lock of read and write lock is implemented based on AQS' acquireShared/releaseShared template method.

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
     ...
     //Acquires in exclusive mode, ignoring interrupts.
     //Implemented by invoking at least once #tryAcquire, returning on success.
     //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquire until success.
     //This method can be used to implement method Lock#lock.
     public final void acquire(int arg) {
         //tryAcquire() requires subclass rewrite
         if (!tryAcquire(arg) && acquireQueued(addWaiter(), arg)) {
             selfInterrupt();
         }
     }
    
     protected boolean tryAcquire(int arg) {
         throw new UnsupportedOperationException();
     }
    
     //Releases in exclusive mode.
     //Implemented by unblocking one or more threads if #tryRelease returns true.
     //This method can be used to implement method Lock#unlock.
     public final boolean release(int arg) {
         //tryRelease() requires subclass rewrite
         if (tryRelease(arg)) {
             Node h = head;
             if (h != null && != 0) {
                 unparkSuccessor(h);
             }
             return true;
         }
         return false;
     }
    
     protected boolean tryRelease(int arg) {
         throw new UnsupportedOperationException();
     }
    
     //Acquires in shared mode, ignoring interrupts.
     //Implemented by first invoking at least once #tryAcquireShared, returning on success.
     //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquireShared until success.
     public final void acquireShared(int arg) {
         //tryAcquireShared() requires subclass rewrite
         if (tryAcquireShared(arg) < 0) {
             doAcquireShared(arg);
         }
     }
    
     protected int tryAcquireShared(int arg) {
         throw new UnsupportedOperationException();
     }
    
     //Releases in shared mode.
     //Implemented by unblocking one or more threads if #tryReleaseShared returns true.
     public final boolean releaseShared(int arg) {
         //tryReleaseShared() requires subclass rewrite
         if (tryReleaseShared(arg)) {
             doReleaseShared();
             return true;
         }
         return false;
     }
    
     protected boolean tryReleaseShared(int arg) {
         throw new UnsupportedOperationException();
     }
     ...
 }

 

How to compete for lock writing

(1) Getting WriteLock

(2) Release of WriteLock

 

(1) Getting WriteLock

WriteLock's lock() method will call AQS's acquire() template method to acquire the lock, and AQS's acquire() method will in turn call the tryAcquire() method inherited from AQS's Sync class.

 

In the tryAcquire() method of the Sync class, the getState() method returns the value of the current state variable. The exclusiveCount() method will look for the number of threads currently obtaining the write lock from the state variable, and the writerShouldBlock() method will determine whether the current write thread should block when preempting the lock.

 

Case 1: c != 0 && w == 0

This means that a thread holds a read lock at this time, so the current thread cannot obtain the write lock, and returns false. It can be seen that after a thread acquires the read lock, it cannot continue to re-enter and acquires the write lock (it cannot be upgraded). However, from the subsequent analysis, after a thread acquires the write lock, it can continue to re-enter and acquire the read lock (the lock can be downgraded).

 

Case 2: c != 0 && w != 0

This means that there is a thread holding a write lock at this time and it is impossible for a thread to hold a read lock. Therefore, it is necessary to determine whether the thread holding the write lock is the current thread itself. If not, it will return false.

 

Situation 3: c != 0 && w != 0 && current holds the lock

This shows that the current thread is holding a write lock at this time, which is a reentered write lock. It is necessary to judge the number of reentries, and the number of reentries of the lock cannot be greater than 65535.

 

Situation 4: c == 0

This means that no thread holds the lock at this time, so the current thread can preempt the lock through CAS operation.

public class ReentrantReadWriteLock implements ReadWriteLock, {
     ...
     //Performs all synchronization mechanics
     final Sync sync;

     //The lock returned by method {@link ReentrantReadWriteLock#writeLock}.
     public static class WriteLock implements Lock, {
         private final Sync sync;
      
         protected WriteLock(ReentrantReadWriteLock lock) {
             sync = ;
         }

         //Acquires the write lock.
         //Acquires the write lock if neither the read nor write lock are held by another thread and returns immediately,
         //setting the write lock hold count to one.
         //If the current thread already holds the write lock then the hold count is incremented by one and the method return immediately.
         //If the lock is held by another thread then the current thread becomes disabled
         //for thread scheduling purposes and lies dormant until the write lock has been acquired,
         //at which time the write lock hold count is set to one.
         public void lock() {
             //Execute AQS acquire() template method to obtain the write lock (exclusive lock)
             (1);
         }
         ...
     }
    
     //Synchronization implementation for ReentrantReadWriteLock.
     //Subclassed into fair and nonfair versions.
     abstract static class Sync extends AbstractQueuedSynchronizer {
         static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
         static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

         //Returns the number of exclusive holds represented in count
         static int exclusiveCount(int c) {
             //Acquiring the write lock status: c & ((1 << 16) - 1)
             //That is, find the number of threads currently obtaining the write lock from the state variable
             return c & EXCLUSIVE_MASK;
         }

         //Acquiring the write lock (exclusive lock)
         protected final boolean tryAcquire(int acquires) {
             //Walkthrough:
             // read count nonzero or write count nonzero and owner is a different thread, fail.
             // count would saturate, fail. (This can only happen if count is already nonzero.)
             //3. Otherwise, this thread is eligible for lock if it is either a reentrant acquire or queue policy allows it.
             //If so, update state and set owner.
             Thread current = ();
             int c = getState();//Acquire the lock's status
             int w = exclusiveCount(c);//Acquiring the state of the write lock
             if (c != 0) {
                 //If c != 0 && w == 0, it means that a thread holds a read lock, so the thread currently obtaining the write lock will be blocked, and false will be returned
                 //If c != 0 && w != 0 && current does not acquire the lock, it means that other threads hold the write lock (it is impossible for a thread to hold the read lock), so the thread that currently acquires the write lock will be blocked and will return  false
                 if (w == 0 || current != getExclusiveOwnerThread()) {
                     return false;
                 }
                 //Judge that the number of reentries cannot be greater than 65535
                 if (w + exclusiveCount(acquires) > MAX_COUNT) {
                     throw new Error("Maximum lock count exceeded");
                 }
                 //Reentrant acquire
                 setState(c + acquires);
                 return true;
             }
             //At this time c == 0, which means that no thread holds the lock, and the lock can be preempted through CAS operation
             if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
                 return false;
             }
             setExclusiveOwnerThread(current);
             return true;
         }
         ...
     }
 }


 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
     ...
     //Acquiring the write lock (exclusive lock)
     //Acquires in exclusive mode, ignoring interrupts.
     //Implemented by invoking at least once #tryAcquire, returning on success.
     //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquire until success.
     //This method can be used to implement method Lock#lock.
     public final void acquire(int arg) {
         //tryAcquire() requires subclass rewriting. At this time, the tryAcquire() method of the inner class Sync of ReentrantReadWriteLock is executed.
         if (!tryAcquire(arg) && acquireQueued(addWaiter(), arg)) {
             selfInterrupt();
         }
     }
     ...
 }

(2) Release of WriteLock

WriteLock's unlock() method will call AQS' release() template method to release the lock, and AQS' release() method will in turn call the tryRelease() method of the Sync class inherited from AQS.

 

In the tryRelease() method of the Sync class, first use getState() - releases to decrement the number of times the lock is written. Since the number of reentries of the write lock is saved at the low position, it is only possible to subtract the decimal indirectly. Then use exclusiveCount() to obtain the number of reentries of the write lock. If it is 0, it means that the lock is released successfully. Finally, modify the value of the state variable through the setState() method. Since the write lock is an exclusive lock, setting the value of the state variable does not require a CAS operation.

public class ReentrantReadWriteLock implements ReadWriteLock, {
     ...
     //Performs all synchronization mechanics
     final Sync sync;

     //The lock returned by method {@link ReentrantReadWriteLock#writeLock}.
     public static class WriteLock implements Lock, {
         private final Sync sync;
      
         protected WriteLock(ReentrantReadWriteLock lock) {
             sync = ;
         }

         //Attempts to release this lock.
         //If the current thread is the holder of this lock then the hold count is declared.
         //If the hold count is now zero then the lock is released.
         //If the current thread is not the holder of this lock then IllegalMonitorStateException is thrown.
         public void unlock() {
             //Execute AQS release() method to release the write lock (exclusive lock)
             (1);
         }
         ...
     }
    
     //Synchronization implementation for ReentrantReadWriteLock.
     //Subclassed into fair and nonfair versions.
     abstract static class Sync extends AbstractQueuedSynchronizer {
         static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
         static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
      
         //Returns the number of exclusive holds represented in count
         static int exclusiveCount(int c) {
             //Acquiring the write lock status: c & ((1 << 16) - 1)
             //That is, find the number of threads currently obtaining the write lock from the state variable
             return c & EXCLUSIVE_MASK;
         }
     
         //Note that tryRelease and tryAcquire can be called by Conditions.
         //So it is possible that their arguments contain both read and write holds
         //that are all released during a condition wait and re-established in tryAcquire.
         protected final boolean tryRelease(int releases) {
             if (!isHeldExclusively()) {
                 throw new IllegalMonitorStateException();
             }
             int nextc = getState() - releases;//Decrement the number of times the lock is locked
             boolean free = exclusiveCount(nextc) == 0;//Calculate the number of reentries of the write lock
             if (free) {
                 setExclusiveOwnerThread(null);
             }
             setState(nextc);
             return free;
         }
         ...
     }
     ...
 }

 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
     ...
     //Release the write lock (exclusive lock)
     //Releases in exclusive mode.
     //Implemented by unblocking one or more threads if #tryRelease returns true.
     //This method can be used to implement method Lock#unlock.
     public final boolean release(int arg) {
         //tryRelease() requires subclass rewrite. At this time, the tryRelease() method of the inner class Sync of ReentrantReadWriteLock is executed.
         if (tryRelease(arg)) {
             Node h = head;
             if (h != null && != 0) {
                 unparkSuccessor(h);
             }
             return true;
         }
         return false;
     }
     ...
 }

 

How to compete for reading locks

(1)ReadLock acquisition

(2) Release of ReadLock

(3) fullTryAcquireShared() method

 

(1)ReadLock acquisition

ReadLock's lock() method will call AQS's acquireShared() template method to obtain the read lock, and AQS's acquireShared() method will in turn call Sync class tryAcquireShared() method.

 

In the tryAcquireShared() method of the Sync class inherited from AQS: First, it will determine whether there is a thread holding a write lock + whether the thread holding a write lock is the current thread. If a thread holds a write lock, but not the current thread holds a write lock, it will block the current thread. Then determine whether the current thread acquires the read lock, whether the number of read lock reentries is less than 65535, and whether the read lock is successfully preempted by modifying the state value by CAS.

 

If the current thread acquires the read lock, the number of read lock reentries should not be blocked, the number of read lock reentries is less than 65535, and the CAS preempts the read lock successfully, then use ThreadLocal to record the number of times the thread reentries the read lock. Otherwise, continue to call the fullTryAcquireShared() method to obtain the lock through spin attempt.

 

If the tryAcquireShared() method of Sync returns -1, the doAcquireShared() method of AQS is called to queue and wait for the queue and block the current thread.

 

In the waiting queue, if the thread waiting for the read lock is awakened, it will continue to loop and wake up all subsequent threads waiting for the read lock until a thread waiting for the write lock is encountered.

public class ReentrantReadWriteLock implements ReadWriteLock, {
     ...
     //Performs all synchronization mechanics
     final Sync sync;

     //The lock returned by method {@link ReentrantReadWriteLock#readLock}.
     public static class ReadLock implements Lock, {
         private final Sync sync;
      
         protected ReadLock(ReentrantReadWriteLock lock) {
             sync = ;
         }
      
         //Acquires the read lock.
         //Acquires the read lock if the write lock is not held by another thread and returns immediately.
         //If the write lock is held by another thread then the current thread becomes disabled
         //for thread scheduling purposes and lies dormant until the read lock has been acquired.
         public void lock() {
             //Execute AQS acquireShared() method to obtain read lock (shared lock)
             (1);
         }
         ...
     }
    
     //Synchronization implementation for ReentrantReadWriteLock.
     //Subclassed into fair and nonfair versions.
     abstract static class Sync extends AbstractQueuedSynchronizer {
         static final int SHARED_SHIFT = 16;
         static final int SHARED_UNIT = (1 << SHARED_SHIFT);
         static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        
         //Returns the number of exclusive holds represented in count
         static int exclusiveCount(int c) {
             //Acquiring the write lock status: c & ((1 << 16) - 1)
             //That is, find the number of threads currently obtaining the write lock from the state variable
             return c & EXCLUSIVE_MASK;
         }
      
         //Returns the number of shared holds represented in count
         static int sharedCount(int c) {
             //Acquiring the read lock status: c >>> 16
             // That is, find the number of threads currently obtaining the read lock from the state variable
             return c >>> SHARED_SHIFT;
         }
   
         //A counter for per-thread read hold counts.
         //Maintained as a ThreadLocal; cached in cachedHoldCounter
         static final class HoldCounter {
             int count = 0;
             //Use id, not reference, to avoid garbage retention
             final long tid = getThreadId(());
         }
       
         //ThreadLocal subclass. Easiest to explicitly define for sake of deserialization mechanics.
         static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
             public HoldCounter initialValue() {
                 return new HoldCounter();
             }
         }

         //The number of reentrant read locks held by current thread.
         //Initialized only in constructor and readObject.
         //Removed whenever a thread's read hold count drops to 0.
         private transient ThreadLocalHoldCounter readHolds;
       
         //The hold count of the last thread to successfully acquire readLock.
         //This saves ThreadLocal lookup in the common case where the next thread to release is the last one to acquire.
         //This is non-volatile since it is just used as a heuristic, and would be great for threads to cache.
         private transient HoldCounter cachedHoldCounter;

         //firstReader is the first thread to have acquired the read lock.
         //firstReaderHoldCount is firstReader's hold count.
         private transient Thread firstReader = null;
         private transient int firstReaderHoldCount;

         Sync() {
             readHolds = new ThreadLocalHoldCounter();
             setState(getState());// ensures visibility of readHolds
         }
      
         //Acquiring the read lock (shared lock)
         protected final int tryAcquireShared(int unused) {
             //Walkthrough:
             // write lock hold by another thread, fail.
             //, this thread is eligible for lock wrt state,
             //so ask if it should block because of queue policy.
             //If not, try to grant by CASing state and updating count.
             //Note that step does not check for reentrant acquires,
             // which is posted to full version to avoid having to check hold count in the more typical non-reentrant case.
             // step 2 fails either because thread apparently not eligible or CAS fails or count saturated,
             //chain to version with full retry loop.
             Thread current = ();
             int c = getState();
             //If the holder thread of the write lock is not the current thread, it will be blocked directly
             //This means that if a thread acquires the write lock first, then it can reenter and acquire the read lock
             if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) {
                 return -1;
             }
             int r = sharedCount(c);//Acquiring the status of the read lock
             //First determine whether the current thread acquires the read lock and should block, then determine whether the number of reentries of the read lock is less than 65535, and finally preempt the read lock by modifying the state value through CAS
             if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
                 //Use ThreadLocal to record the number of times each thread reenter the read lock
                 if (r == 0) {
                     firstReader = current;
                     firstReaderHoldCount = 1;
                 } else if (firstReader == current) {
                     firstReaderHoldCount++;
                 } else {
                     HoldCounter rh = cachedHoldCounter;
                     if (rh == null || != getThreadId(current)) {
                         cachedHoldCounter = rh = ();
                     } else if ( == 0) {
                         (rh);
                     }
                     ++;
                 }
                 return 1;
             }
             //If the current thread fails to acquire the read lock, call the fullTryAcquireShared() method to obtain the lock through spin attempt
             return fullTryAcquireShared(current);
         }
         ...
     }
     ...
 }

 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
     ...
     //Acquiring the read lock (shared lock)
     //Acquires in shared mode, ignoring interrupts.
     //Implemented by first invoking at least once #tryAcquireShared, returning on success.
     //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquireShared until success.
     public final void acquireShared(int arg) {
         //tryAcquireShared() requires subclass rewrite. At this time, the tryAcquireShared() method of the inner class Sync of ReentrantReadWriteLock is executed.
         if (tryAcquireShared(arg) < 0) {
             //Call the doAcquireShared() method of AQS to join the queue and wait for the queue and block the current thread
             doAcquireShared(arg);
         }
     }
    
     //Acquires in shared uninterruptible mode.
     private void doAcquireShared(int arg) {
         final Node node = addWaiter();
         boolean failed = true;
         try {
             boolean interrupted = false;
             for (;;) {
                 final Node p = ();
                 if (p == head) {
                     //Note: If the thread waiting for the read lock is awakened, then the loop will continue to wake up all subsequent threads waiting for the read lock.
                     int r = tryAcquireShared(arg);
                     if (r >= 0) {
                         setHeadAndPropagate(node, r);
                          = null; // help GC
                         if (interrupted) {
                             selfInterrupt();
                         }
                         failed = false;
                         return;
                     }
                 }
                 //Execute shouldParkAfterFailedAcquire() method to set the state of the predecessor node of the node to SIGNAL
                 //Execute the parkAndCheckInterrupt() method to hang the current thread
                 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                     interrupted = true;
                 }
             }
         } finally {
             if (failed) {
                 cancelAcquire(node);
             }
         }
     }
     ...
 }

(2) Release of ReadLock

The unlock() method of ReadLock will call AQS's releaseShared() template method to release the lock, and AQS's releaseShared() method will in turn call Sync class's tryReleaseShared() method.

 

In the tryReleaseShared() method of the Sync class: First, ThreadLocal will be combined with ThreadLocal to process the number of times the current thread reenters the read lock, and then set the state value through spin + CAS to release the read lock. Finally, the doReleaseShared() method of AQS wakes up blocking thread.

 

The difference between tryRelease() and tryReleaseShared(): The read lock is a shared lock, held by multiple threads, so releasing the read lock needs to be done through spin + CAS. A write lock is an exclusive lock and is held by a single thread, so no CAS operation is required when releasing the write lock.

public class ReentrantReadWriteLock implements ReadWriteLock, {
     ...
     //Performs all synchronization mechanics
     final Sync sync;

     //The lock returned by method {@link ReentrantReadWriteLock#readLock}.
     public static class ReadLock implements Lock, {
         private final Sync sync;
      
         protected ReadLock(ReentrantReadWriteLock lock) {
             sync = ;
         }
      
         //Attempts to release this lock.
         //If the number of readers is now zero then the lock is made available for write lock attempts.
         public void unlock() {
             //Execute AQS releaseShared() method to release the read lock (shared lock)
             (1);
         }
         ...
     }
    
     //Synchronization implementation for ReentrantReadWriteLock.
     //Subclassed into fair and nonfair versions.
     abstract static class Sync extends AbstractQueuedSynchronizer {
         ...
         //Release the read lock (shared lock)
         protected final boolean tryReleaseShared(int unused) {
             Thread current = ();
             //First combine ThreadLocal to process the number of times the current thread reenter the read lock
             if (firstReader == current) {
                 if (firstReaderHoldCount == 1) {
                     firstReader = null;
                 } else {
                     firstReaderHoldCount--;
                 }
             } else {
                 HoldCounter rh = cachedHoldCounter;
                 if (rh == null || != getThreadId(current)) {
                     rh = ();
                 }
                 int count = ;
                 if (count <= 1) {
                     ();
                     if (count <= 0) {
                         throw unmatchedUnlockException();
                     }
                 }
                 --;
             }
             //Then set the state value by spin + CAS to release the read lock
             for (;;) {
                 int c = getState();
                 int nextc = c - SHARED_UNIT;
                 if (compareAndSetState(c, nextc)) {
                     //Releasing the read lock has no effect on readers,
                     //but it may allow waiting writers to proceed if both read and write locks are now free.
                     return nextc == 0;
                 }
             }
         }
         ...
     }
     ...
 }

 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
     ...
     //Release the read lock (shared lock)
     //Releases in shared mode.
     //Implemented by unblocking one or more threads if #tryReleaseShared returns true.
     public final boolean releaseShared(int arg) {
         //tryReleaseShared() requires subclass rewriting. At this time, the tryReleaseShared() method of the inner class Sync of ReentrantReadWriteLock is executed.
         if (tryReleaseShared(arg)) {
             //Execute AQS doReleaseShared() method to wake up the blocking thread
             doReleaseShared();
             return true;
         }
         return false;
     }
    
     //Release action for shared mode -- signals successor and ensures propagation.
     //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.
     private void doReleaseShared() {
         //Ensure that a release propagates, even if there are other in-progress acquires/releases.
         //This proceeds in the usual way of trying to unparkSuccessor of head if it needs signal.
         //But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues.
         //Additionally, we must loop in case a new node is added while we are doing this.
         //Also, unlike other uses of unparkSuccessor, we need to know if CAS to reset status fails, if so rechecking.
         for (;;) {
             Node h = head;
             if (h != null && h != tail) {
                 int ws = ;
                 if (ws == ) {
                     if (!compareAndSetWaitStatus(h, , 0)) {
                         //loop to recheck cases
                         continue;
                     }
                     //Execute AQS unparkSuccessor() method
                     unparkSuccessor(h);
                 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, )) {
                     //loop on failed CAS
                     continue;
                 }
             }
             //loop if head changed
             if (h == head) {
                 break;
             }
         }
     }
    
     //Wakes up node's successor, if one exists.
     private void unparkSuccessor(Node node) {
         //If status is negative (., possible needing signal) try to clear in anticipation of signalling.
         //It is OK if this fails or if status is changed by waiting thread.
         //The state of the Node node is watiStatus can be divided into the following types:
         //Default (0), CANCELED (1), SIGNAL (-1), CONDITION (-2), PROPAGATE (-3)
         // By default, watiStatus should be 0 or empty
         //Get the status of the header node
         int ws = ;
         //The header node status needs to be set to 0
         if (ws < 0) {
             compareAndSetWaitStatus(node, ws, 0);
         }

         //Thread to unpark is held in successor, which is normally just the next node.
         //But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor.
         //Get the successor node of the header node
         Node s = ;
         //If the successor node of the header node is null or its state is CANCELED
         if (s == null || > 0) {
             s = null;
             //Then start scanning from the tail node and find the node closest to the head node. The + state is not a canceled node.
             for (Node t = tail; t != null && t != node; t = ) {
                 if ( <= 0) {
                     s = t;
                 }
             }
         }
         if (s != null) {
             //Wake up the thread corresponding to the successor node of the incoming head node
             ();
         }
     }
     ...
 }

(3) fullTryAcquireShared() method

In the following two cases, this method will return -1 and let the current thread join the waiting queue for waiting.

Case 1: At this time, a thread has obtained the write lock, but it is not the current thread that has obtained the write lock and reenters the read lock.

Case 2: The readerShouldBlock() method returns true and does not reenter the read lock.

public class ReentrantReadWriteLock implements ReadWriteLock, {
     ...
     //Synchronization implementation for ReentrantReadWriteLock.
     //Subclassed into fair and nonfair versions.
     abstract static class Sync extends AbstractQueuedSynchronizer {
         static final int SHARED_SHIFT = 16;
         static final int SHARED_UNIT = (1 << SHARED_SHIFT);
         static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        
         //Returns the number of exclusive holds represented in count
         static int exclusiveCount(int c) {
             //Acquiring the write lock status: c & ((1 << 16) - 1)
             //That is, find the number of threads currently obtaining the write lock from the state variable
             return c & EXCLUSIVE_MASK;
         }
      
         //Returns the number of shared holds represented in count
         static int sharedCount(int c) {
             //Acquiring the read lock status: c >>> 16
             // That is, find the number of threads currently obtaining the read lock from the state variable
             return c >>> SHARED_SHIFT;
         }
   
         //A counter for per-thread read hold counts.
         //Maintained as a ThreadLocal; cached in cachedHoldCounter
         static final class HoldCounter {
             int count = 0;
             //Use id, not reference, to avoid garbage retention
             final long tid = getThreadId(());
         }
       
         //ThreadLocal subclass. Easiest to explicitly define for sake of deserialization mechanics.
         static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
             public HoldCounter initialValue() {
                 return new HoldCounter();
             }
         }

         //The number of reentrant read locks held by current thread.
         //Initialized only in constructor and readObject.
         //Removed whenever a thread's read hold count drops to 0.
         private transient ThreadLocalHoldCounter readHolds;
       
         //The hold count of the last thread to successfully acquire readLock.
         //This saves ThreadLocal lookup in the common case where the next thread to release is the last one to acquire.
         //This is non-volatile since it is just used as a heuristic, and would be great for threads to cache.
         private transient HoldCounter cachedHoldCounter;

         //firstReader is the first thread to have acquired the read lock.
         //firstReaderHoldCount is firstReader's hold count.
         private transient Thread firstReader = null;
         private transient int firstReaderHoldCount;

         Sync() {
             readHolds = new ThreadLocalHoldCounter();
             setState(getState());// ensures visibility of readHolds
         }
         ...
        
         //Full version of acquire for reads,
         //that handles CAS misses and reentrant reads not dealt with in tryAcquireShared.
         final int fullTryAcquireShared(Thread current) {
             //This code is in part redundant with that in tryAcquireShared
             //but is simpler overall by not complicating tryAcquireShared with interactions
             //between retries and lazy reading hold counts.
             HoldCounter rh = null;
             for (;;) {
                 int c = getState();
                 if (exclusiveCount(c) != 0) {
                     //If another thread currently acquires the write lock, then return -1, the current thread will hang block and enter the waiting queue
                     if (getExclusiveOwnerThread() != current) {
                         return -1;
                     }
                     //else we hold the exclusive lock; blocking here would cause deadlock.
                 } else if (readerShouldBlock()) {
                     //If readerShouldBlock() returns true, it means that the current thread needs to be blocked to obtain the read lock
                
                     //Combined with ThreadLocal to handle the number of times the current thread reentered read lock
                     //Make sure we're not acquiring read lock reentrantly
                     if (firstReader == current) {
                         //assert firstReaderHoldCount > 0;
                     } else {
                         if (rh == null) {
                             rh = cachedHoldCounter;
                             if (rh == null || != getThreadId(current)) {
                                 rh = ();
                                 if ( == 0) {
                                     ();
                                 }
                             }
                         }
                         //If it's not reenter
                         if ( == 0) {
                             return -1;
                         }
                     }
                 }
                 if (sharedCount(c) == MAX_COUNT) {
                     throw new Error("Maximum lock count exceeded");
                 }
                 if (compareAndSetState(c, c + SHARED_UNIT)) {
                     //Combined with ThreadLocal to handle the number of times the current thread reentered read lock
                     if (sharedCount(c) == 0) {
                         firstReader = current;
                         firstReaderHoldCount = 1;
                     } else if (firstReader == current) {
                         firstReaderHoldCount++;
                     } else {
                         if (rh == null) {
                             rh = cachedHoldCounter;
                         }
                         if (rh == null || != getThreadId(current)) {
                             rh = ();
                         } else if ( == 0) {
                             (rh);
                         }
                         ++;
                         cachedHoldCounter = rh; // cache for release
                     }
                     return 1;
                 }
             }
         }
         ...
     }
     ...
 }

 

Fair locks and unfair locks

(1) Implementation code of fair lock

(2) Implementation code of unfair lock

 

(1) Implementation code of fair lock

For fair locks, the readerShouldBlock() method used to determine whether the reader should block when preemption of the lock, and the writerShouldBlock() method used to determine whether the write thread should block when preemption of the lock, will be judged by the hasQueuedPredecessors() method. Is there any threads in the current queue queue? As long as there are other threads in the queue, both the write thread and the read thread must be queued to end and cannot seize the lock.

public class ReentrantReadWriteLock implements ReadWriteLock, {
     ...
     //Fair version of Sync
     static final class FairSync extends Sync {
         final boolean writerShouldBlock() {
             return hasQueuedPredecessors();
         }
         final boolean readerShouldBlock() {
             return hasQueuedPredecessors();
         }
     }
    
     //Synchronization implementation for ReentrantReadWriteLock.
     //Subclassed into fair and nonfair versions.
     abstract static class Sync extends AbstractQueuedSynchronizer {
         ...
         //Returns true if the current thread, when trying to acquire the read lock,
         //and otherwise eligible to do so, should block because of policy for overtaking other waiting threads.
         abstract boolean readerShouldBlock();

         //Returns true if the current thread, when trying to acquire the write lock,
         //and otherwise eligible to do so, should block because of policy for overtaking other waiting threads.
         abstract boolean writerShouldBlock();
         ...
     }
     ...
 }

 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
     ...
     //Queries whether any threads have been waiting to acquire longer than the current thread.
     //Judge whether there are threads in the current queue queuing
     public final boolean hasQueuedPredecessors() {
         Node t = tail; // Read fields in reverse initialization order
         Node h = head;
         Node s;
         //So!hasQueuedPredecessors() is equivalent to:
         //h == t || ( != null && == ())
         return h != t && ((s = ) == null || != ());
     }
     ...
 }

(2) Implementation code of unfair lock

For unfair locks, the writerShouldBlock() method will directly return false, because the case where the writer thread can grab the unfair lock must be: no other thread holds the lock or the thread reenters the write lock itself, so there is no need to block. The readerShouldBlock() method will call a method to decide whether to block the read thread, avoiding the problem of infinite waiting for the write lock (deadlock hunger problem).

 

The method called by the readerShouldBlock() method is AQS apparentlyFirstQueuedIsExclusive() method. If the successor node of the currently waiting queue header node is a write lock node, then the method returns true, indicating that the current read thread to acquire the read lock needs to be queued. If the successor node of the currently waiting queue header node is a read lock node, then this method returns false, indicating that the current read thread to acquire the read lock can preempt the lock.

 

Since read threads and read threads are not mutually exclusive, if there is currently a thread holding a read lock, and the new read thread still unfairly snatches the lock, it may cause the write thread to never get the write lock.

public class ReentrantReadWriteLock implements ReadWriteLock, {
     ...
     static final class NonfairSync extends Sync {
         //Write thread call
         final boolean writerShouldBlock() {
             return false; // writers can always barge
         }
      
         //Read thread call
         final boolean readerShouldBlock() {
             //As a heuristic to avoid independent writer starvation,
             //block if the thread that momentarily appears to be head of queue, if one exists, is a waiting writer.
             //This is only a probabilistic effect since a new reader will not block
             //if there is a waiting writer behind other enabled readers that have not yet drained from the queue.
             return apparentlyFirstQueuedIsExclusive();
         }
     }
     ...
 }

 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
     ...
     //If the successor node of the currently waiting queue head node is a write lock node, then this method returns true, indicating that the current read thread to acquire the read lock needs to be queued;
     //If the successor node of the currently waiting queue head node is a read lock node, then this method returns false, indicating that the current read thread to acquire the read lock can preempt the lock;
     final boolean apparentlyFirstQueuedIsExclusive() {
         Node h, s;
         return (h = head) != null &&
             (s = ) != null &&
             !() &&
              != null;
     }
     ...
 }

 

Lock downgrade

(1) What is lock downgrade in ReentrantReadWriteLock

(2) Example of lock downgrading in ReentrantReadWriteLock

 

(1) What is lock downgrade in ReentrantReadWriteLock

The lock downgrade here refers to the downgrade from a write lock to a read lock. That is, if thread A acquires the write lock, it is allowed to acquire the read lock if the write lock is not released. If thread A acquires the write lock, then releases the write lock, and then acquires the read lock, this is not a lock downgrade. Of course, if thread A acquires the read lock, it is not allowed to acquire the write lock if the read lock is not released.

 

(2) Example of lock downgrading in ReentrantReadWriteLock

Lock downgrade is to improve performance. If you only use write locks, then performing use(data) reading data for a long time will block other read threads. Therefore, by downgrading the write lock to a read lock, the read thread will not be blocked when executing use(data).

Object data;
 public void processData() {
     ();//You must first obtain the read lock, because you need to read the data later, for example, when the update is true, you need to
     if (!update) {//Improve it to be modified
         ();//The read lock must be released first before the write lock can be obtained.
         ();//Lock downgrading starts from here to get the write lock
         try {
             if (!update) {
                 //Prepare to modify the data and write the data data
                 data = ...;
                 update = true;
             }
             ();//On the basis of obtaining the write lock, continue to acquire the read lock
         } finally {
             ();//Release the write lock, the write lock is downgraded to the read lock, and complete the lock downgrading
         }
     }
     try {
         //Use data, read data for a long time
         use(data);
     } finally {
         ();//Release the read lock
     }
 }

 

Introduction to instructions

(1) Condition interface

(2) Condition description

 

(1) Condition interface

public interface Condition {
    //Causes the current thread to wait until it is signalled or Thread#interrupt interrupted.
    void await() throws InterruptedException;
    //Causes the current thread to wait until it is signalled.
    void awaitUninterruptibly();
    //Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    //Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    //Causes the current thread to wait until it is signalled or interrupted, or the specified deadline elapses.
    boolean awaitUntil(Date deadline) throws InterruptedException;
    //Wakes up one waiting thread.
    void signal();
    //Wakes up all waiting threads.
    void signalAll();
}

(2) Condition description

1. Comparison between Condition and wait/notify

Condition's function is similar to wait/notify, and can implement wait/notify mode. wait/notify must be used with synchronized, and Condition must also be used with Lock. Condition avoids wait/notify producers notifying producers and consumers notifying consumers.

 

2. Use of Condition

The Condition object is usually used as a member variable. When the await() method of Condition is called, the current thread will release the lock and hang wait. When other threads call the signal() method of Condition to notify the current thread (release the lock), the current thread will return from the await() method of Condition and has already obtained the lock when it returns.

 

Three. Two threads are using Condition's interactive process

Thread 1 -> Acquire the lock -> Release the lock + await() blocking and waiting ->

Thread 2 -> Get lock -> signal() wakes up thread 1 + releases lock ->

Thread 1 -> Wake up + Try to acquire the lock -> Release the lock

 

4. Whether read and write locks and exclusive locks support Condition

Both exclusive locks and read and write locks support Condition, but read locks of read and write locks do not support Condition.

 

ReentrantLock supports Condition.

ReentrantReadWriteLock supports Condition.

 

5. Implemented by AQS's internal class ConditionObject

Each Condition object has a Condition queue, which is the key to the Condition object's waiting/notification function.

 

Six. Condition application scenarios

LinkedBlockingQueue, ArrayBlockQueue, and CyclicBarrier all use Condition to implement thread waiting.

public class ConditionDemo() {
     static ReentrantLock lock = new ReentrantLock();
     static Condition condition = ();

     public static void main(String[] args) throws Exception {
         new Thread() {
             public void run() {
                 ();
                 ("The first thread locked");
                 try {
                     ("The first thread releases the lock and blocks waiting");
                     ();
                     ("The first thread reacquisitions the lock");
                 } catch (Exception e) {
                     ();
                 }
                 ("The first thread releases the lock");
                 ();
             };
         }.start();
         (3000);
         new Thread() {
             public void run() {
                 ();
                 ("Second thread locked");
                 ("The second thread wakes up the first thread");
                 ();
                 ();
                 ("The second thread releases the lock");
             };
         }.start();
     }
 }

 

Source code implementation

(1) Create a ConditionObject object

(2) Condition queue of ConditionObject

(3) ConditionObject's wait method await()

(4) Notification method of ConditionObject signal()

 

(1) Create a ConditionObject object

Calling ReentrantLock's newCondition() method can create a Condition object. ReentrantLock's newCondition() method will call Sync's newCondition() method, and Sync's newCondition() method will create a ConditionObject object.

public class ReentrantLock implements Lock, {
     ...
     //Synchronizer providing all implementation mechanics
     private final Sync sync;

     //Returns a {@link Condition} instance for use with this Lock instance.
     public Condition newCondition() {
         //Execute the newCondition() method of ReentrantLock internal class Sync
         return ();
     }
    
     //Base of synchronization control for this lock.
     //Subclassed into fair and nonfair versions below.
     //Uses AQS state to represent the number of holds on the lock.
     abstract static class Sync extends AbstractQueuedSynchronizer {
         ...
         final ConditionObject newCondition() {
             return new ConditionObject();
         }
         ...
     }
     ...
 }
    
 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
     ...
     public class ConditionObject implements Condition, {
         //First node of condition queue.
         private transient Node firstWaiter;
      
         //Last node of condition queue.
         private transient Node lastWaiter;

         //Creates a new ConditionObject instance.
         public ConditionObject() {
              
         }
         ...
     }
     ...
 }

(2) Condition queue of ConditionObject

In the ConditionObject object, there will be a Condition queue, and the Condition queue maintained by the ConditionObject object is a one-way linked list.

 

The properties firstWaiter and lastWaiter of the ConditionObject object represent the head and tail nodes of the queue. When a thread calls the await() method, the thread will be encapsulated as a Node node. Then the Node node will be added to the tail of the Condition queue as a new node.

 

Since the Condition object has a reference to the head and tail nodes of the Condition queue, you just need to point the nextWaiter of the original tail node to the new node and update the tail node. This update process does not require the use of CAS guarantee, because the thread calling the await() method has already acquired the lock.

 

(3) ConditionObject's wait method await()

The main logic of the await() method of ConditionObject:

1. Add the current thread to the Condition queue through the addConditionWaiter() method

2. Release the lock through fullyRelease() method

3. Stop the current thread through the () method

4. After being awakened by the signal() method, try to acquire the lock through the acquireQueued() method.

 

In fact, it is equivalent to moving the head node of the waiting queue (the node where the lock is acquired) to the Condition queue. However, the head node waiting for the queue will not be directly added to the Condition queue, but will encapsulate the current thread into a new Node node and join it to the tail of the Condition queue.

 

Notice:The thread calling the await() method has actually successfully acquired the lock, and the thread corresponds to the header node of the waiting queue. The await() method will encapsulate the current thread into a node and add it to the Condition queue, then release the lock, wake up the corresponding thread of the successor node waiting for the queue head node, and then suspend the current thread and enter the waiting state. When the thread is awakened by the signal() method, it will try to acquire the lock through the acquireQueued() method. Therefore, the thread calling the await() method is awakened after blocking, and it is also possible that the lock fails to acquire and continue to block.

 

() Summary of principles:Add yourself to the Condition queue, release the lock, and hang yourself.

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
     ...
     public class ConditionObject implements Condition, {
         //First node of condition queue.
         private transient Node firstWaiter;
      
         //Last node of condition queue.
         private transient Node lastWaiter;
         ...
        
         //Implements interruptible condition wait.
         //If current thread is interrupted, throw InterruptedException.
         public final void await() throws InterruptedException {
             if (()) {
                 throw new InterruptedException();
             }
             //1. Execute the addConditionWaiter() method of ConditionObject, encapsulate the current thread into a Node node and add it to the Condition queue
             Node node = addConditionWaiter();
             //2. Call AQS's fullyRelease() method to release the lock
             int savedState = fullyRelease(node);
             int interruptMode = 0;
             while (!isOnSyncQueue(node)) {
                 //3. Block the current thread
                 (this);
                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
                     break;
                 }
             }
             //4. After the current thread is awakened by the signal() method, execute AQS acquireQueued() method to try to acquire the lock
             if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
                 interruptMode = REINTERRUPT;
             }
             if ( != null) {// clean up if cancelled
                 unlinkCancelledWaiters();
             }
             if (interruptMode != 0) {
                 reportInterruptAfterWait(interruptMode);
             }
         }

         //Encapsulate the current thread into a Node node and join the Condition queue
         private Node addConditionWaiter() {
             Node t = lastWaiter;
             //If lastWaiter is cancelled, clean out.
             if (t != null && != ) {
                 unlinkCancelledWaiters();
                 t = lastWaiter;
             }
             Node node = new Node((), );
             if (t == null) {
                 firstWaiter = node;
             } else {
                  = node;
             }
             lastWaiter = node;
             return node;
         }
         ...
     }
    
     //Release the lock
     //Invokes release with current state value; returns saved state.
     //Cancels node and throws exception on failure.
     final int fullyRelease(Node node) {
         boolean failed = true;
         try {
             int savedState = getState();
             if (release(savedState)) {
                 failed = false;
                 return savedState;
             } else {
                 throw new IllegalMonitorStateException();
             }
         } finally {
             if (failed) {
                  = ;
             }
         }
     }
    
     //Releases in exclusive mode.
     //Implemented by unblocking one or more threads if #tryRelease returns true.
     //This method can be used to implement method Lock#unlock.
     public final boolean release(int arg) {
         if (tryRelease(arg)) {
             Node h = head;
             if (h != null && != 0) {
                 //Wake up the successor node waiting for the queue head node
                 unparkSuccessor(h);
             }
             return true;
         }
         return false;
     }
    
     //Acquires in exclusive uninterruptible mode for thread already in queue.
     //Used by condition wait methods as well as acquire.
     final boolean acquireQueued(final Node node, int arg) {
         boolean failed = true;
         try {
             boolean interrupted = false;
             for (;;) {
                 final Node p = ();
                 if (p == head && tryAcquire(arg)) {
                     setHead(node);
                      = null; // help GC
                     failed = false;
                     return interrupted;
                 }
                 //Execute shouldParkAfterFailedAcquire() method to set the state of the predecessor node of the node to SIGNAL
                 //Execute the parkAndCheckInterrupt() method to hang the current thread
                 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                     interrupted = true;
                 }
             }
         } finally {
             if (failed) {
                 cancelAcquire(node);
             }
         }
     }
     ...
 }

(4) Notification method of ConditionObject signal()

The main logic of the signal() method of ConditionObject:

1. First, the node with the longest waiting time is retrieved from the Condition queue, that is, the first node

2. Then transfer the node with the longest waiting time (first node) to the waiting queue (CLH queue) of AQS

3. Finally, wake up the thread corresponding to the first node

 

Since the thread corresponding to the first node is blocked after adding the Condition queue to the await() method, the thread corresponding to the first node will return to the await() method to continue execution, that is, it will execute AQS The acquireQueued() method attempts to acquire the lock.

 

The prerequisite for calling the signal() method is that the current thread must obtain the lock, so the signal() method first checks whether the current thread has acquired the lock, then obtains the first node of the Condition queue, and then moves the first node to Wait for the queue and wake up the thread corresponding to the first node.

 

By calling AQS's enq() method, the first node of the Condition queue will be added to the waiting queue. When the first node is moved to the waiting queue, the thread corresponding to the first node is awakened and tried to acquire the lock.

 

The thread corresponding to the awakened first node will exit from the while loop in the await() method. Because the waiting queue is already in the waiting queue, the isOnSyncQueue() method will return true, which will call AQS's acquireQueued() method to compete for the acquisition of the lock.

 

() Summary of principles:Convert the head node in the Condition queue into the tail node in the waiting queue.

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements {
     ...
     public class ConditionObject implements Condition, {
         //First node of condition queue.
         private transient Node firstWaiter;
      
         //Last node of condition queue.
         private transient Node lastWaiter;
         ...
        
         //Moves the longest-waiting thread, if one exists,
         //from the wait queue for this condition to the wait queue for the ownership lock.
         public final void signal() {
             //Check whether the current thread has acquired the lock through isHeldExclusively() method
             if (!isHeldExclusively()) {
                 throw new IllegalMonitorStateException();
             }
             Node first = firstWaiter;
             if (first != null) {
                 doSignal(first);
             }
         }
       
         //Removes and transfers nodes until hit non-cancelled one or null.
         //Split out from signal in part to encourage compilers to inline the case of no waiters.
         private void doSignal(Node first) {
             do {
                 //FirstWaiter moves back
                 if ((firstWaiter = ) == null) {
                     lastWaiter = null;
                 }
                 //FirstWaiter departs
                  = null;
             } while (!transferForSignal(first) && (first = firstWaiter) != null);
         }
       
         //Transfers a node from a condition queue onto sync queue.
         //Returns true if successful.
         final boolean transferForSignal(Node node) {
             //When the addConditionWaiter() method, the node is encapsulated as CONDITION type
             //If CAS fails, it means that the current node has been modified to CANCELED. At this time, you need to continue to look up the next node of the Condition queue.
             if (!compareAndSetWaitStatus(node, , 0)) {
                 return false;
             }
             //Transfer node to wait queue and return to the end node of wait queue
             Node p = enq(node);
             int ws = ;
             if (ws > 0 || !compareAndSetWaitStatus(p, ws, )) {
                 //Wake up node node
                 ();
             }
             return true;
         }
     
         //Implements interruptible condition wait.
         //If current thread is interrupted, throw InterruptedException.
         public final void await() throws InterruptedException {
             if (()) {
                 throw new InterruptedException();
             }
             //1. Execute the addConditionWaiter() method of ConditionObject, encapsulate the current thread into a Node node and add it to the Condition queue
             Node node = addConditionWaiter();
             //2. Call AQS's fullyRelease() method to release the lock
             int savedState = fullyRelease(node);
             int interruptMode = 0;
             //The node node is not in the waiting queue at the beginning, so the isOnSyncQueue() method returns false to block
             // Later, other threads call the signal() method, and the node node will be awakened, and then it is found that the node node is already in the waiting queue, so the isOnSyncQueue() method returns true
             while (!isOnSyncQueue(node)) {
                 //3. Block the current thread
                 (this);
                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
                     break;
                 }
             }
             //4. After the current thread is awakened by the signal() method, execute AQS acquireQueued() method to try to acquire the lock
             if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
                 interruptMode = REINTERRUPT;
             }
             if ( != null) {// clean up if cancelled
                 unlinkCancelledWaiters();
             }
             if (interruptMode != 0) {
                 reportInterruptAfterWait(interruptMode);
             }
         }
         ...
     }
    
     //Inserts node into queue, initializing if necessary. See picture above.
     private Node enq(final Node node) {
         for (;;) {
             Node t = tail;
             if (t == null) { // Must initialize
                 if (compareAndSetHead(new Node())) {
                     tail = head;
                 }
             } else {
                  = t;
                 if (compareAndSetTail(t, node)) {
                      = node;
                     return t;
                 }
             }
         }
     }
     ...
 }