Location>code7788 >text

AQS source code analysis

Popularity:937 ℃/2025-01-25 11:15:37

Master AQS

What is AQS

AQS is a basic framework for building a lock and synchronizer. In fact, the core code of AQS mainly realizes the threadBlocking and wake-up mechanismsas well asBasic management of resource status

AQS's core function

  1. Thread queue management: Oblocking and awakening through the CLH queue management thread.

  2. Basic management of resource status:passstateIndicates resource status, throughcompareAndSetStateImplement atomic operation.

  3. Thread wake-up and scheduling:passunparkSuccessorAfter waking up, follow threads.

Since the purpose of AQS is to be a basic framework for building locks and synchronizers, some of the above functions of this type of framework can be directly implemented by AQS, such as thread awakening and blocking, and thread queuing management can also be implemented directly in AQS. Part of it, the basic management of resource status cannot be directly programmed in AQS, so AQS usesTemplate method, And abstract State, all operations on State are implemented by AQS subclasses

Why should a thread queue in the first place if it cannot acquire the lock?

If the thread continues to occupy the CPU without the resource, instead of entering the waiting queue to manage, it is actually a waste of CPU resources and allowing the CPU to execute the code of other threads. The process, and the obstruction needs to be wake up when there is resources, so this function is used to achieve this function with a data structure like queue.

Core API

For example, the following two, there are actually many, but the functions are similar, but you need to consider exclusive locks and shared locks and whether they support interrupts.

  • acquire
  • release

(Random mention) Why support interrupts?

Because it is the bottom framework, you need to provide more options for users, so it supports interruption and non -interrupt

(Care) The difference between interruption and irreproducible?

  1. Uninterrupted mode (Uninterruptible):likeacquireMethod, the thread ignores interrupts when acquiring resources, and must acquire resources before it can exit.
  2. Interruptible mode:likeacquireInterruptiblyMethod, the thread can respond to interrupts and exit early when acquiring resources.
The reasons for the uninterrupted mode of design:
  1. Simplify the logic
    • In many scenarios, the lock acquisition process is expected to be simple and clear, that is, "either acquire it successfully, or block until the resource is acquired."
    • Ignoring interrupts can avoid the complexity of handling interrupt logic and reduce the possibility of developer errors.
  2. Ensure mission integrity
    • Certain critical tasks do not want to be interrupted by interrupt signals, especially if the resource state must remain consistent.
  3. Reduce performance overhead
    • Response interruption requires additional judgment logic, and ignoring interrupts can simplify implementation and improve performance.
(Just a casual mention) How to implement the interrupt method

For Java, it is simple. It is nothing more than getting the status of the current thread. In terms of code, it is () to get whether the current thread is interrupted.

How to manage (exclusive lock) of the Acquire

AQS maps blocked threads through Node object instances, that is, there is a Thread attribute in the Node class.

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(), arg))
            selfInterrupt();
    }
tryAcquire

Implemented by subclasses

Add the process of adding a new blocking thread adDWaiter
  1. Generate a node (constructor parameters: currentThread, mode ("exclusive" or "shared"))
  2. If tail is null, directly enq(node) and return node
    (tail is a variable of the AQS instance and there is a head)
  3. If non -NULL, it will use CAS to modify the corresponding TAIL (set tail to node). At the same time
  4. CAS fails and takes the degraded path (enq)

But the logic above still has a concurrent problem, for example, the following two situations:

  1. What if the two threads judge the current tail to NULL at the same time?
  2. If two threads execute to obtain tail at the same time, only one CAS succeeds, what about the other one?

So the complete process

Core method degradation path (enq)

The core logic of enq is that spin + CAS is a similar mechanism to Synchronized in lightweight locking, but enq is an infinite loop.

Asking the question: Why not use enq directly? What are the benefits of this design?

In factFast pathandfull pathDesign ideas

The fast path refers to this section in addWaiter

 Node pred = tail;
        if (pred != null) {
             = pred;
            if (compareAndSetTail(pred, node)) {
                 = node;
                return node;
            }
        }

The complete path refers to the logic of ENQ

The advantage of this design is that

  1. Improve performance: In most cases, it is already there, so it is not necessary to execute the IF logic judgment of the IF logic judgment of head. Some threads, spinning to the complete path, etc. will lead to more intense competition, and naturally can improve part of the performance. Therefore, this type of design makes some scene performance better
  2. Simplicity and complex situation: This is mainly the study of coding ideas. This design is typical"Optimize common paths and degrade to handle complex situations"The idea is a balanced performance and thread security engineering practice. If you encounter more complex business in the project, you can also use this idea to dissect the scene
acquireQueued
  final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = ();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                     = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

Get the predecessor node and check whether it is the leader of the team:

final Node p = ();
 if (p == head && tryAcquire(arg)) {
     setHead(node); // Become the new head node
      = null; // Help GC
     failed = false;
     return interrupted;
 }
  • If the predecessor of the current node is the head node, try to obtain the resource.
  • After success, set the current node as a new head node to disconnect the reference of the original node.

Determine whether suspension is needed:

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    interrupted = true;
  • If acquisition fails, check whether the predecessor node requires suspension.
  • Hanging threads, waiting to be awakened.

Treatment interrupt:

  • acquireQueuedDo not respond to interruption, but it will record the interrupt state.
  • If the thread is interrupted while waiting,interruptedThe variable will be set totrue

Failure treatment:

if (failed)
    cancelAcquire(node);
  • If the thread does not successfully obtain the resource (failed == true), callcancelAcquireRemove the current node.

How to manage (exclusive lock) release

  1. if(tryRelease(arg))-->The subclass is responsible for modifying the state value.
  2. If head exists and !=0, wake up subsequent nodes unparkSuccessor
The meaning of waitStatus

waitStatusThe role: waitStatusIt is an important status field of each node in AQS, used to describe the status of the node:

  • 0: The node is in normal state.
  • < 0(like = -1): Indicates that the successor node needs to be awakened.
  • > 0: The node is canceled (such as interruption, timeout, etc.).
unparkSuccessor
  1. Try to clean up the current nodewaitStatus
if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);
  • If the current nodewaitStatusNegative (usually), indicating that the successor node needs to be notified.
  • Before the notice, reset it to0, Avoid repeating notifications.
  1. Find the successor node that needs to be awakened:
  • First try to use
  • iffornullorwaitStatus > 0(cancelled state), search for valid nodes from the tail forward.
  1. Wake up the thread:
  • After finding a valid node, pass through()Wake up the corresponding thread.
1. Why do we need to traverse from the end?

This is to find what really needs to be awakenedEffective successor node
The reason is:

  1. Nodes may be canceled:
    • Some nodes may be canceled due to timeouts or interruptions while waiting (waitStatus > 0Indicates canceled status).
    • If accessed directly, it may be a canceled node that cannot be woken up.
  2. It may be empty:
    • In some special cases, the connection relationship of the queue may not be completely established (for example, the front and rear nodes are not updated in time).
    • In order to ensure that the effective node is not omitted, it is necessary to find it reverse from the tail of the queue.

final question

  1. Why does the release of the node be released without the release of the node?

Because the corresponding logic will be executed in acquire, mainly in acquireQueue. In acquireQueue, if it is judged that the predecessor node of the current node is head, the help GC operation will be performed, and acquireQueue is an infinite loop unless the predecessor node is head&& After getting the resource, it is his turn to execute it, and then the original head will be released. Therefore, our release only needs to ensure that subsequent nodes can be correctly awakened.

  1. acquireQueuedWill it block the thread?

Will block the thread.
Although the middle thread of the code will try to obtain resources, it does not always occupy the CPU for endless attempts.

Specifically:

  • If the predecessor node of the current thread is nothead, Or the resource cannot be successfully obtained, the thread will callparkAndCheckInterruptMethod that suspends (blocks) the thread until it is explicitly awakened.
  • therefore,acquireQueuedThe implementation is not by waiting, but bySpin + obstructionway to wait efficiently.