Location>code7788 >text

Problems with CAS and solutions in Java

Popularity:104 ℃/2024-10-09 23:01:44

CAS

present (sb for a job etc)

CAS guarantees the atomicity of operations on shared variables.

CAS, known as Compare And Swap, is the main implementation of optimistic locks.CAS synchronizes variables between multiple threads without the use of locks.CAS is used inside the AQS inside the ReentrantLock and inside the Atomic class.

The CAS algorithm involves three operands: a memory value V to be read or written, a value A to be compared, and a new value B to be written.The value of V is updated with the new value B in an atomic manner only if the value of V is equal to that of A. Otherwise, it will continue to retry until the value is successfully updated.

AtomicInteger as an example, AtomicInteger's getAndIncrement() method is the CAS implementation at the bottom, the key code is compareAndSwapInt(obj, offset, expect, update), the meaning is that, if the value within the obj and the expect is equal, it proves that no other thread has changed this variable, then update it to update, if not equal, then it will continue to retry until the value is successfully updated.

Problems with CAS

CAS is unlocked and guaranteed to be one-time, but requires multiple comparisons

Long cycle times and high overhead

(For example, AtomicInteger because the execution is do while, if the comparison is unsuccessful has been in the loop, the worst case, is that a thread has been fetched and the expected value are not the same, so it will be an infinite loop)

Solution: You can use LongAdder in java8, segmented CAS and automatic segmented migration.

Only one atomic operation on a shared variable is guaranteed

When performing an operation on a shared variable, we can guarantee atomic operations by looping through the CAS

However, when operating on multiple shared variables, cyclic CAS cannot guarantee the atomicity of the operation, and locks can only be used to guarantee atomicity.

Solution: You can use AtomicReference, this is a custom object encapsulation, multiple variables can be put in a custom object, and then he will check the object reference is not the same. If more than one thread at the same time on an object variable reference assignment, with AtomicReference CAS operation can solve the problem of concurrent conflicts.

ABA issues

Suppose two threads T1 and T2 access the same variable V. When T1 accesses the variable V, it reads the value of V as A; at this time, thread T1 is preempted, and T2 starts to execute, and T2 first changes the value of the variable V from A to B, and then changes the value of the variable V from B to A; at this time, T1 again preempts the initiative and continues to execute, and it finds that the value of V is still A, and it thinks that there is no change, so it continues to execute. This process, the variable V from A to B, and then from B to A is figuratively called the ABA problem.

Solution: You can introduce version number change this problem, every time you change the version number +1

Starting with Java 1.5, the Atomic package of the JDK provides a class AtomicStampedReference to solve the ABA problem. The function of the compareAndSet method of this class is to first check whether the current reference is equal to the expected reference and to check whether the current flag is equal to the expected flag, and if all are equal, then atomically set the value of the reference and the flag to the given updated value.

lock-free concurrency

CAS can guarantee the atomicity of operations on shared variables, while volatile can achieve visibility and ordering. Combining CAS and volatile can achieve lock-free concurrency, which is suitable for scenarios with low competition and multi-core CPUs.

CAS is efficient because it does not use the synchronized keyword, CAS will not let the thread into the blocking state, so it also avoids synchronized user state and kernel state switching brought about by the performance consumption problem, but also to avoid the thread hangs and other problems. If the competition is very fierce, then CAS will appear threads a lot of retries, because many threads to compete, then it also leads to the possibility of many threads to set the value of the failure, then while the cycle of retries, that is, a large number of threads to retry the operation, the success of the survival of the threads is not much, then this will make the performance greatly reduced. So if the competition is too fierce still use CAS mechanism, will lead to its performance than synchronized even lower.

wrap-up

CAS converts comparisons and swaps into atomic operations, which are directly guaranteed by the processor (supported by the CPU), comparing the old estimate with the newest value in memory; if they are the same, the swaps are performed and the newest value is assigned to the variable in memory;

CAS must use volatile to read the latest value of a shared variable to achieve the [compare and swap] effect.

When using CAS, the number of threads should not exceed the number of CPU cores, each CPU core can parallelize a certain thread at the same time, if it exceeds, it can't run even if it wants to, a context switch has to happen. Context switching of threads is very costly, to save information about the thread, and to restore information about the thread when it resumes from blocking to runnable.

Atomic

AtomicInteger

concern

public class Demo01 {
    // Define a shared variable num
    private static int num = 0; // define a shared variable num

    public static void main(String[] args) throws InterruptedException {
        // Task: add 10000 times to num.
        Runnable mr = () -> {
            for (int i = 0; i < 10000; i++) {
                num++; // num++ is not an atomic operation, leading to atomicity problems
            }
        };

        ArrayList<Thread> ts = new ArrayList<>();
        // Open 5 threads at the same time to perform the task
        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(mr);
            ();
            (t);
        }

        for (Thread t : ts) {
            (); }
        }
        // So the final output will be num < 50000
        ("num = " + num);
    }
}

Change to atomic class

public class Demo01 {

    public static void main(String[] args) throws InterruptedException {
        //
        AtomicInteger atomicInteger = new AtomicInteger();
        // mandates:additive 10000 substandard
        Runnable mr = () -> {
            for (int i = 0; i < 10000; i++) {
                (); //该additive操作是一个原子性的操作
            }
        };

        ArrayList<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(mr);
            ();
            (t);
        }

        for (Thread t : ts) {
            ();
        }
        //Since it is an atomic operation,The value will always be50000
        ("number = " + ());
    }
}

underlying source code

The AtomicInteger class contains an internal class called UnSafe, which guarantees atomic operations when assigning variables;

/* */
private volatile int value; // value is initially 0

public final int incrementAndGet() {
    // this: the new atomicInteger object.
    // valueOffset: memory offset.
    // 1: this method increments itself, so it's 1.
    return (this, valueOffset, 1) + 1; }
}
  • Variable valueOffset: the offset address of the variable value in memory, because Unsafe fetches data based on the memory offset address.

  • The variable value is modified with volatile: it guarantees memory visibility across multiple threads.

/* */

// var1: this from above, the atomicInteger object; var2: the valueOffset; var4: the value to add
public final int getAndAddInt(Object var1, long var2, int var4) {
    // var5 old predicted value
    int var5;
    do {
        // this and memory valueOffset, to find the current value of this value (the old estimate)
        var5 = (var1, var2);
    } while (! (var1, var2, var5, var5 + var4));

    return var5;
}

Variable Interpretation:

  • var5: is the value copied from main memory to working memory

  • val1: the AtomicInteger object itself

  • var2: the valueOffset of this object value

  • var4: Quantity to be changed

  • var5: the true value in memory found with var1 and var2

compareAndSwapInt(var1, var2, var5, var5 + var4) means compare the current value of the object with var5.

  • If they are the same, update to var5 + var4 and return true

  • If different, continue to take values and then compare again until the update is complete

Need to compare the value in working memory with the value in main memory

Assuming that compareAndSwapInt returns false, keep executing the while method until the desired value is the same as the true value.

Suppose thread A and thread B perform getAndInt operations at the same time (running on separate CPUs)

  1. The original value inside the AtomicInteger is 3, i.e., the value of the AtomicInteger in main memory is 3. According to the JMM model, Thread A and Thread B each hold a copy of the value of 3, which is stored in their respective working memory
  2. Thread A gets value 3 via getIntVolatile(var1 , var2), at which point thread A hangs (the thread loses CPU execution)
  3. Thread B also through the getIntVolatile (var1, var2) method to get the value is also 3, at this time just thread B has not been hung, and the implementation of the compareAndSwapInt method, compare the value of the memory is also 3, the success of the modification of the value of the memory for the 4, the thread B hit the end of the finish line, everything is OK!
  4. At this point, thread A resumed, the CAS method, the comparison found in their hands of the number 3 and the number 4 in the main memory is inconsistent, indicating that the value has been modified by other threads ahead of the other threads, then thread A failed to modify this time, can only be re-read after the first time, that is, to continue to execute the do while
  5. Thread A refetches the value value, and because the variable value is modified by volatile, any changes made to it by other threads are always visible to Thread A. Thread A continues to perform compareAndSwapInt to compare and replace until it succeeds.

But AtomicInteger there will be CAS loop overhead problem, so JDK8 introduced LongAdder to solve this problem

LongAdder

LongAdder uses segmented CAS and automatic segmented migration to dramatically improve the performance of multi-threaded, highly concurrent CAS operations.

Realization process:

  1. In the LongAdder's underlying implementation, there is first a base value, at the beginning of the multi-threaded to keep accumulating the value of the value, are on the base of the accumulation, such as the beginning of the accumulation of base = 5.
  2. Then if the number of concurrent update threads is found to be too large, the mechanism of segmented CAS will begin to be implemented, that is, internally there will be an array of Cells, each array is a numerical segment.
  3. At this point, let a large number of threads to different Cell internal value value CAS accumulation operation, so that the CAS calculation pressure is spread to different Cell segmentation value!
  4. This can greatly reduce the problem of infinite loop when updating the same value concurrently by multiple threads, and greatly improve the performance and efficiency of updating values concurrently by multiple threads!
  5. Internal implementation of the mechanism of automatic segment migration, that is, if the value of a Cell failed to perform CAS, then it will automatically go to another Cell segment value value CAS operation. This also solves the problem of thread empty spin, spin non-stop waiting for the CAS operation, so that a thread over the CAS operation can be completed as soon as possible.

Finally, to get the total value of the current accumulation from the LongAdder, the base value and all Cell segment values are added up and returned.

Overall LongAdder reduces the number of retries for optimistic locks

add source code

public void add(long x) {
        //as is a cells reference
        //b is the base value to get
        // v is the expected value
        // m is the length of the array of cells - 1 (the length of the cells must be a power of 2)
        // a is the cell that is currently hit.
        Cell[] as; long b, v; int m; Cell a.

        // Condition 1: true-> means the cells have already been initialized, and the current thread should write the data to the corresponding cells
        // false-> means cells are not initialized, all threads should write data to base.
        // Condition 2: To execute to condition 2, condition 1 is false.
        // true-> means there is contention, may need to retry or expand.
        // false-> means that the current existing CAS replaced the data successfully.
        if ((as = cells) ! = null || !casBase(b = base, b + x)) {
            //When to enter?
            //1. Condition one true-> means that cells has been initialized and the current thread should write the data to the corresponding cell
            //2. Condition two true-> means there is a contention, may need to retry or expand.

            // true means no contention, false means contention occurred.
            boolean uncontended = true;

            // Condition 1: true -> cells are uninitialized, which means that they were written via 2, a multi-threaded writebase with contention.
            // false -> cells are initialized, so yes, then the current thread should find its own cell to write to.
            if (as == null || (m = - 1) < 0 ||
            // if condition one is false, go to condition two.
            // Condition two: getProbe(): get the current thread's hash value getProbe() &m will <= m . So as[getProbe() & m]) would represent the cell that the current thread wants to throw the data into
            // true-> means that the cell corresponding to the subscript in the current thread is empty and needs to be created by longAccumulate
            // false-> means that the cell corresponding to the subscript in the current thread is not empty, and you want to add the x-value to the cell in the next step
                (a = as[getProbe() & m]) == null ||

                //If condition two is false, go to condition three.
                // Condition three: the process of adding the x value to the cell
                // true-> means cas add failed, meaning there is competition for the cell corresponding to the current thread
                // false-> means cas succeeded.
                ! (uncontended = (v = , v + x)))

                // When will this method be called?
                //1. Condition 1: true -> cells are uninitialized, indicating that at this point it is coming in via 2, a multi-threaded writebase where contention occurs. That means we need to retry or initialize cells later.
                //2. Condition 2: true -> indicates that the cell corresponding to the subscript of the current thread is empty, and needs to be created by longAccumulate.
                //3. Condition 3: true-> indicates that cas addition failed, meaning that there is competition for the cell in the current thread. You need to retry or expand it later.
                longAccumulate(x, null, uncontended);
        }
    }

longAccumulate method

//When will this method be called?
//1. condition one: true -> cellsuninitialized,Indicates that at this point it is through the 2,multithreaded writingbasecompetition。Description of follow-up needs  retry or initializationcells
//2. condition two:true->Explain that the current thread corresponds to the subscriptcellempty,needlongAccumulate establish
//3. condition three:true->indicatecasAdd Failure,means that the current thread corresponds to thecellcompetitive。后续need  retry or expansion

//wasUncontended:only ifcellsinitialization之后,and the current thread failed to compete for the modification,only thenfalse 
final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //The current thread'shash(be) worth
        int h;
        //prerequisite true->indicatebe facing (us)线程还未分配hash(be) worth
        if ((h = getProbe()) == 0) {
            //consequently,Implementation allocationhash(be) worth的逻辑
            (); // force initialization
            h = getProbe();
            //for what reason??Because until then the current thread has nothash(be) worth,i.e.0,Then the current thread must be writing tocells[0]location
            //If not assignedhash(be) worth的都写到cells[0],Well, there it is.cells[0]competition。那么就不应该把这substandard竞争当成真正competition,consequently修改because oftrue
            wasUncontended = true;
        }
        //indicate expansion意向,false:一定不会expansion;true:可能会expansion
        boolean collide = false;                // True if last slot nonempty
        
        //spin
        for (;;) {
            // as indicatecellsquote
            // a indicatebe facing (us)线程命center的cell
            // n indicatecellsArray length
            // v indicate期望(be) worth
            Cell[] as; Cell a; int n; long v;
            
            //case1:prerequisite1:true -> cells已经initialization,The current thread should write data to the correspondingcellcenter
            //       prerequisite2:true -> Array length大于0,Same as above
            if ((as = cells) != null && (n = ) > 0) {
                //When will you come?case1 
                //2. condition two:true->Explain that the current thread corresponds to the subscriptcellempty,needlongAccumulate establish
                //3. condition three:true->indicatecasAdd Failure,means that the current thread corresponds to thecellcompetitive。后续need  retry or expansion
                //case 1.1:true -> The current thread corresponds to the subscript of thecellempty,needestablishcell
                if ((a = as[(n - 1) & h]) == null) {
                    //true->Currently unlocked and unoccupied,false->Lock occupied
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        //establishcell
                        Cell r = new Cell(x);   // Optimistically create
                        //condition one:cellsBusy == 0
                        //            true->Currently unlocked,The current thread can compete for this lock
                        //condition two:casCellsBusy(),competition lock
                        //            true->The current thread acquires the lock
                        if (cellsBusy == 0 && casCellsBusy()) {
                            //定义是否establishsuccesses的标记
                            boolean created = false;
                            try {               // Recheck under lock
                                //rsindicatebe facing (us)cellsquote
                                //m cellslengths
                                //j be facing (us)线程命center下标
                                Cell[] rs; int m, j;
                                
                                //condition onecondition two恒成立
                                //condition three:rs[j = (m - 1) & h] == null? existcase1.1I've already judged this position.,for what reason?这里还要判断?
                                //The reason is that the multi-threaded concurrency case,有线程可能已经exist执行下述流程,此时existcase1.1judge sth. to benull,But by this point a thread has already been executed,consequentlyneed重新判断
                                if ((rs = cells) != null &&
                                    (m = ) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                //release a lock
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    //expansion意向改because offalse
                    collide = false;
                }
                //case 1.2: wasUncontended:only ifcellsinitialization之后,and the current thread failed to compete for the modification,only thenfalse 
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //case1.3:When did you get here?,The current thread resets overhash(be) worth。新命center的cell不empty
                //        true->Write data tocellsuccesses,Then you can quit.
                //        false->indicate重置hash后命center的新的cell也competitive,retry1substandard,will be implementedcase1.4
                else if ((v = , ((fn == null) ? v + x :
                                             (v, x))))
                    break;
                //case1.4:n>=NCPU
                //            true->Array length大于等于CPUquantities
                //            false->Array length还可以expansion
                //        cells != as?
                //            true->其它线程已经expansion过(modal particle intensifying preceding clause),The current thread should then reset thehashretry
                else if (n >= NCPU || cells != as)
                    //expansion意向改because offalse
                    collide = false;            // At max size or stale
                //case1.5:!collide
                //         true->indicate设置expansion意向because oftrue,但不一定expansion,因because ofneedspin重新尝试
                else if (!collide)
                    collide = true;
                //case 1.6:真正expansion的逻辑
                //    condition one:cellsBusy == 0
                //            true->Currently unlocked,The current thread can compete for this lock
                //condition two:casCellsBusy(),competition lock
                //            true->The current thread acquires the lock,be facing (us)线程执行可以expansion逻辑
                //false就说明有其他线程exist执行expansion
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        //Here is the same logic as before,need再substandard判断
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];//expansionlengths翻倍,lengths是2idempotent (math.)
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                
                //Reset the current threadhash(be) worth
                h = advanceProbe(h);
            }
            
            //case2: Apparently not untilcase2,case1just (emphasis)false,i.e.cells还uninitialized,asbecause ofnull
            //         condition one:true -> Currently unlocked
            //        condition two:cells == as? 原因exist多线程并发情况下,有线程可能已经exist执行下述流程,此时existcase1judge sth. to benull,by the time thatcase2可能已经expansion完成(modal particle intensifying preceding clause),cells可能就不because ofnull(modal particle intensifying preceding clause)
            //        condition three:true -> indicate获取锁successes,casCellsBusy() = 1。
            //                false -> indicate其它线程正exist持有锁
            else if (cellsBusy == 0 && cells == as && casCellsBusy() = 1。
            ) {
                boolean init = false;
                try {                           // Initialize table
                    //because of(modal particle intensifying preceding clause)防止其它线程已经initialization(modal particle intensifying preceding clause),be facing (us)线程再substandardinitialization,data loss
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    //release a lock
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //case3:什么时候会到这个prerequisite。
            //1. be facing (us)casCellsBusy()The lock has been held.,说明其他线程正existinitializationcells
            //2. cells被其他线程initialization(modal particle intensifying preceding clause)
            //那么此时就应该去累加数据(modal particle intensifying preceding clause)
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        (v, x))))
                break;                          // Fall back on using base
        }
    }

AtomicStampedReference

AtomicStampedReference solves the ABA problem by maintaining a pair object that contains an object reference and an integer "stamp" that can be automatically updated.

public class AtomicStampedReference<V> {
    private static class Pair<T> {
        final T reference; //Maintaining Object References
        final int stamp; //Used to flag versions
        private Pair(T reference, int stamp) {
             = reference;
             = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }
    private volatile Pair<V> pair;
    ....
    
    /**
      * expectedReference :Update the original value before
      * newReference : The new value to be updated
      * expectedStamp : Looking forward to the updated logo version
      * newStamp : Logo version to be updated
      */
    public boolean compareAndSet(V expectedReference,
                             V newReference,
                             int expectedStamp,
                             int newStamp) {
        // Get the current(elemental value,version number)treat (sb a certain way)
        Pair<V> current = pair;
        return
            // The quote hasn't changed.
            expectedReference == &&
            // version number没变
            expectedStamp == &&
            // New reference equals old reference
            ((newReference == &&
            // 新version number等于旧version number
            newStamp == ) ||
            // construct newPairtreat (sb a certain way)象并CASupdate
            casPair(current, (newReference, newStamp)));
    }

    private boolean casPair(Pair<V> cmp, Pair<V> val) {
        // invocationsUnsafe(used form a nominal expression)compareAndSwapObject()methodologiesCASupdatepair(used form a nominal expression)引用为新引用
        return (this, pairOffset, cmp, val);
    }
  • Returns true if neither the element value nor the version number has changed and is also the same as the new one;

  • If neither the element value nor the version number has changed and is not identical to the new one, construct a new Pair object and perform a CAS update of the pair.

As you can see, the implementation in java is consistent with the ABA solution we talked about above.

  • First, use version number control;
  • Second, instead of reusing references to nodes (Pair), a new Pair is created each time to serve as the object of CAS comparison, instead of reusing the old one;
  • Finally, the element value and version number are passed externally instead of a reference to the node (Pair).

About the Author.

From the first-line programmer Seven's exploration and practice, continuous learning iteration in the~

This article is included in my personal blog:https://

Public number: seven97, welcome to follow~