Location>code7788 >text

Golang stand-alone lock implementation

Popularity:881 ℃/2025-03-18 21:51:21

1. Introduction to the concept of lock

First of all, why do you need a lock?

In concurrent programming, multiple threads or processes mayAccess and modify the same shared resource simultaneously(such as variables, data structures, files), etc., if the appropriate synchronization mechanism is not introduced, the following problems will be caused:

  • Data competition: Multiple threads modify a resource at the same time. The final result is related to the execution order of the threads, and the result is unpredictable.
  • Inconsistent data: One thread is modifying the resource, while another thread reads unmodified data, resulting in reading the wrong data.
  • Resource competition: Multi-threading competition for the same resource is wasted system performance.

Therefore, we need a lock to ensure that only one person can write data at the same time and ensure the correctness and consistency of shared resources under concurrent access.

Here, two common concurrency control processing mechanisms are introduced, namelyOptimistic lockandPessimistic lock

  • Optimistic lock: Assuming that in concurrent operations, resource preemption is not very fierce and the possibility of data being modified is not very high. At this time, there is no need to lock the shared resource area and then operate it.First modify the data, and finally determine whether the data has been modified, if there is no modification, the modification is submitted, otherwise try again.
  • Pessimistic lock: In contrast to optimistic lock, it assumes that the resource competition in the scenario is fierce, and access to shared resource areas must require holding locks.

For different scenarios, we need to adopt strategies that adapt to local conditions, and compare optimistic and pessimistic factors. Their advantages and disadvantages are obvious:

Strategy advantage shortcoming
Optimistic lock No need for actual locks, high performance If there is a conflict, it is necessary to perform again. Retrying multiple times may cause significant performance degradation.
Pessimistic lock Accessing data must be held to ensure the correctness of data in concurrent scenarios During locking, other threads waiting for lock need to be blocked, with low performance

2、

Go's implementation of stand-alone locks takes into account the changes in the degree of resource competition between coroutines in the actual environment, and formulates a set ofLock upgradeThe process. The specific plan is as follows:

  • First take an optimistic attitude, Goroutine will keepSpin state,passCASThe operation attempts to acquire the lock.
  • When multiple acquisitions fail, the optimistic attitude will turn from a pessimistic attitude, determine the current competition for concurrent resources will be fierce, and enter a blocking state and wait to be awakened.

fromOptimism turns to pessimismThe judgment rules are as follows, and if one of them is satisfied, a change will occur:

  • Goroutine spin attemptMore than 4 times
  • There is a G waiting for execution in the execution queue of P (avoiding spin affecting GMP scheduling performance)
  • The CPU is single-core (other Goroutines cannot execute, spin is meaningless)

In addition, in order to prevent blocked coroutines from waiting for too long and not getting locks, resulting in a decline in the overall user experience, and introducedhungerConcept:

  • Hunger state: If the waiting time of Goroutine is blocked >1ms, the coroutine is considered to be in a hungry state
  • Hunger Mode: Indicates whether the current lock is in a specific mode. In this mode, the handover of the lock isfair, hand it over to the longest waiting coroutine in order.

Hunger mode and normal modeChange rulesas follows:

  • Normal mode -> Hunger mode: There is a blocking coroutine,Blocking time exceeds 1ms

  • Hungry mode -> Normal mode: if the blocking queue is cleared, or the waiting time of the coroutine that acquires the lock is less than 1ms, then restore

Next, enter the source code and watch the specific implementation.

2.1. Data structure

Located in the bagsync/The definition of lock is as follows:

type Mutex struct {
	state int32
	sema  uint32
}
  • state: The status information of the current lock is identified, including whether it is in hungry mode, whether there is a wake-up blocking coroutine, whether it is locked, and how many coroutines are waiting for locks.
  • seme: Semaphore used to block and wake up coroutines.

WillstateThink of as a binary string, the rules for storing information are as follows:

  • The first bit sign is locked, 0 means no, 1 means locked (mutexLocked)
  • The second bit identifies whether there is a wake-up blocking coroutine (mutexWoken)
  • The third digit is in starvation mode (mutexStarving)
  • Starting from the fourth bit, the number of coroutines in the blocking state is recorded
const (
	 mutexLocked = 1 << iota // mutex is locked
	 mutexWoken
	 mutexStarving
	 mutexWaiterShift = iota
	 starvationThresholdNs = 1e6 // Hunger Threshold
 )

2.2. Obtain the lock Lock()

func (m *Mutex) Lock() {
	if atomic.CompareAndSwapInt32(&, 0, mutexLocked) {
		return
	}
	()
}

Try to pass directlyCASThe operation directly acquires the lock. If successful, it will return. Otherwise, it means that the lock has been acquired and step intoLockSlow

2.3、LockSlow()

The source code is long, please explain it in splitting:

var waitStartTime int64
	starving := false
	awoke := false
	iter := 0
	old := 

(1) The basic constants are defined, with the following meanings:

  • waitStartTime: Record the current coroutine waiting time, and will only be used if it is blocked.
  • awoke: Identify whether the current coroutine is awakened by Unlock
  • iter: Record the current number of coroutine spin attempts
  • old: Record the status information of the old lock

for {
    	 //In locked and not in hungry state, and the current coroutine allows to continue spinning
		 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
 
			 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				 atomic.CompareAndSwapInt32(&, old, old|mutexWoken) {
				 awoke = true
			 }
			 runtime_doSpin()
			 iter++
			 old =
			 Continue continue
		 }
		 //...
	 }

(2) Enter the loop trying to acquire the lock, and two ifs represent:

  • If lockedLocked, and notIn a state of hunger, and the current coroutineAllow to continue spinning(Non-single-core CPU, spin number <=4, and the local queue of scheduler P does not have G waiting for execution), then enter:
    • If the current coroutineNot awakened from the waiting queue,andThere is no awakened waiting coroutine,andThere are coroutines located in blocking, trySet the mutexWoken flag to 1, if successful:
      • Identifies the current coroutine as the awakened coroutine. (Although it is not actually awakened from the blockage)
    • Tell P that the current coroutine is in a spin state
    • renewiterCounter, witholdRecorded status information of the current lock, perform the next retry loop

The only doubt that exists here is, why should awoke be identified as true?

First, because the current lock isNot in hunger mode, so the current preemption lock mode isUnfairIf the current locked blocking queue has not been awakened, then it is requiredDon't wake up, try to get the currently attempted coroutine to acquire the lock, avoiding the coroutine's awakening for resource competition.


for {
    	//...
    	new := old
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		if awoke {
			new &^= mutexWoken
		}
		//...
}		

(3) Status update:

When the coroutine comes out of step 2, it can only be stated that it is in one of the following two states:

  • Can't spin, or the lock enters hunger mode, the lock needs to be given to others, anywayCan't get the lock(pessimistic).
  • The lock was released.

Regardless of the matter, some status updates are required to prepare for the next plan.

Use new to store the new status information that a lock is about to enter, and update the rules:

  • If the lock is not in hunger mode: it means that the lock may be released or it may be that it has spinned too much, regardless of whether the lock can be obtained next.Locks will be acquired by a coroutine, thereforemutexLockedis 1.
  • If the lock may be in a hungry state, or the lock is not released: that means you can't get the lock.Coming into blocking state, blocking coroutine counter +1.
  • If the current coroutine is awakened and is already in a hungry state and the lock is still locked:Lock into absolutely fair hunger mode
  • If the current coroutine is awakened: ClearmutexWokenIdentify bits, because nextA coroutine may need to be awakened(Hunger Mode).

Although there are a lot of updates,In summary

  • If the lock is released, then just mark it as the lock will be acquired next.
  • If the lock is not released and the current coroutine has been waiting for a long time, the lock will enter a hungry state, and the blocking coroutine needs to be awakened next.

(4) Try to update the information:

if atomic.CompareAndSwapInt32(&, old, new) {
			//...
		} else {
			old = 
		}

Next, try to update new into state. If the update fails, it means that another coroutine has intervened. In order to prevent data consistency loss, you must do it all again.

(5) The status update is successful,Decide whether to sleep or get the lock successfully

Step into the if main branch of step 4, there are two states:

if atomic.CompareAndSwapInt32(&, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
    //...
		} else {
			//...
		}

Because the current state may be that the lock has been released, check whether the lock has been released before it is updated and it is not hungry mode. If soIt means that the lock has been successfully acquired, the function ends.

if atomic.CompareAndSwapInt32(&, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&, queueLifo, 2)
    //....
		} else {
			//...
		}

Otherwise, it means that the current coroutine is about to enter a blocking state. Record the time when the blocking starts. This is used to wake up to determine whether you are hungry. Then enter the blocking sleep.

(6) If step 5 enters blocking, then after being awakened:

if atomic.CompareAndSwapInt32(&, old, new) {
			 if old&(mutexLocked|mutexStarving) == 0 {
				 break // locked the mutex with CAS
			 }
			 // If we were already waiting before, queue at the front of the queue.
			 queueLifo := waitStartTime != 0
			 if waitStartTime == 0 {
				 waitStartTime = runtime_nanotime()
			 }
			 runtime_SemacquireMutex(&, queueLifo, 2)
    			 //wake
    		   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			 old =
     //If the lock is in hunger mode
			 if old&mutexStarving != 0 {
                 //Exception handling of lock
				 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					 throw("sync: inconsistent mutex state")
				 }
                 //The semaphore to be updated
				 delta := int32(mutexLocked - 1<<mutexWaiterShift)
				 if !starving || old>>mutexWaiterShift == 1 {
					 delta -= mutexStarving
				 }
				 atomic.AddInt32(&, delta)
				 break
			 }
			 awoke = true
			 iter = 0
    
     //...
		 } else {
			 //...
		 }

Wake up from blocking, first calculate the blocking time of some coroutines, as well as the current latest lock state.

likeLock in hunger mode: Then the current coroutine will directly acquire the lock. The current coroutine is awakened because of hunger mode, and there is no other coroutine preemption lock. Then update the semaphore, record the blocking coroutine number -1, and set the locked state of the lock to 1. If the coroutine is currently awakened from hunger mode,The waiting time is less than 1ms or the last waiting coroutine, then the lock will be converted from hunger mode to normal mode. At this point, the function is successfully obtained and exited.

Otherwise, it's just a normal random wake-up, so you start trying to preempt and return to step 1.

2.4. Release the lock Unlock()

func (m *Mutex) Unlock() {
     //Directly release the lock
	 new := atomic.AddInt32(&, -mutexLocked)
	 if new != 0 {
		 (new)
	 }
 }

Through atomic operation, the lockedmutexLockedSet the flag to 0. If the lock status is not 0 after setting 0, it means that there is a coroutine that needs to acquire the lock.unlockSlow

2.5、unlockSlow()

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		fatal("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		old := new
		for {
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&, old, new) {
				runtime_Semrelease(&, false, 2)
				return
			}
			old = 
		}
	} else {
		runtime_Semrelease(&, true, 2)
	}
}

(1) First, the exception state processing is performed. If a lock has been released, then the fatal is directly and the program terminates.

if (new+mutexLocked)&mutexLocked == 0 {
		fatal("sync: unlock of unlocked mutex")
	}

(2) If the lock is not in a hungry state:

  • If the number of waiting coroutines at this time is 0, or the lock is locked, contains the awakened coroutines, and the lock is in hunger mode: it means that a new coroutine has intervened in the process and has completed the handover, and you can exit directly.
  • Wake up a coroutine in a blocked state.

Otherwise, in a state of hunger, awaken the coroutine that has been waiting for the longest time.

3、

For the operations of shared resource areas, they can be divided into two categories: read and write. Assume that in a scenario, the shared resource area continuesThe read operation is much larger than the write operation,If each coroutine read operation needs to acquire mutex locks, the performance loss will be very large.

RWMutexIt is a lock that can be used to improve performance in read operations > write operations, and can be regarded as consisting of a read lock and a write lock. The operating rules are as follows:

  • A read lock allows multiple read coroutines to read the shared resource area at the same time. If a coroutine needs to modify the data in the resource area, it needs to be blocked.
  • Write locks are strictly exclusive. When the shared resource area is loaded with a write lock,No other goroutine is allowed to be accessed

It can be seen that in the worst case, when all coroutines require write operations, the read and write lock will degenerate into ordinary Mutex.

3.1. Data structure

type RWMutex struct {
	 w Mutex // held if there are pending writers
	 writerSem uint32 // semaphore for writers to wait for completing readers
	 readerSem uint32 // semaphore for readers to wait for completing writers
	 readerCount atomic.Int32 // number of pending readers
	 readerWait atomic.Int32 // number of departing readers
 }
 const rwmutexMaxReaders = 1 << 30 //Maximum number of read coroutines
  • w: A mutually exclusive write lock
  • writerSem: The semaphore associated with the blocked write coroutine
  • readerSem: The semaphore associated with blocked read coroutines
  • readerCount: Under normal circumstances, record the number of coroutines being read;But if the current write coroutine is holding a lock, then the actual number of read coroutines isreaderCount - rwmutexMaxReader
  • readerWait: Record the number of releases the next write coroutine and needs to wait for the read coroutine to complete

3.2. Read lock process RLock()

func (rw *RWMutex) RLock() {
	if (1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireRWMutexR(&, false, 0)
	}
}

rightreaderCount+1 means a new read coroutine is added. If the result is <0, it means that the current lock is being occupied by the write coroutine, causing the current read coroutine to block.

3.3. Read and release lock process RUnlock()

func (rw *RWMutex) RUnlock() {
	if r := (-1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		(r)
	}
}

rightreaderCount-1 means to reduce one read coroutine. If the result is <0, it means that the current lock is being occupied by the write coroutine and enters runlockslow.

3.4、rUnlockSlow()

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		()
		fatal("sync: RUnlock of unlocked RWMutex")
	}
	if (-1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&, false, 1)
	}
}

First, error processing is performed. If you find that the current coroutine occupies an overread lock, or the number of coroutines in the read process is upper limit, the system will have an exception, fatal.

Otherwise, rightreaderWait-1. If the result is 0, it means that the current coroutine is the last coroutine to intervene in the read lock process. At this time, a write lock needs to be released.

3.5. Write lock process Lock()

func (rw *RWMutex) Lock() {
	// First, resolve competition with other writers.
	()
	// Announce to readers there is a pending writer.
	r := (-rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && (r) != 0 {
		runtime_SemacquireRWMutex(&, false, 0)
	}
}

First try to obtain the write lock. If the acquisition is successful, you need toreaderCount-Maximum number of read coroutines, indicating that the lock is now occupied by the read coroutines.

r represents the number of coroutines in the read process. If r is not 0, thenreaderWaitAdd r, and then write it after all these reading coroutines are read. Block this write coroutine. (Reading and writing locks are not fair reading and writing, and reading coroutines are preferred.

3.6. Write release lock process Unlock()

func (rw *RWMutex) Unlock() {
	// Announce to readers there is no active writer.
	r := (rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		()
		fatal("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&, false, 0)
	}
	// Allow other writers to proceed.
	()
}

ReplacereaderCountSet to normal, indicating that the write lock has been released. If the read coroutine exceeds the maximum limit, it is abnormal.

Then wake up all blocked read coroutines. (Reading coroutines first

Unlock.

4. Reference to study articles:

/s?__biz=MzkxMjQzMjA0OQ==&mid=2247483797&idx=1&sn=34274d44bced0835ea302376a137219b