Location>code7788 >text

Golang's GMP Scheduling Model and Source Code Analysis

Popularity:27 ℃/2024-11-17 16:10:27

0. Introduction

As we know, multi-threading and multi-process models are widely used in this contemporary operating system to improve the concurrency efficiency of the system. With the continuous development of the Internet, in the face of today's highly concurrent scenarios, it is unrealistic to create a thread for each task, and the use of threads requires the system to constantly switch between the user state and the kernel state, resulting in unnecessary loss, so the introduction of concurrent threads. Concurrent threads exist in user space and are a lightweight concurrent execution unit with less overhead for creation and context, and how to manage a large number of concurrent threads is an important topic. This note is used to share the author's understanding of the GMP model for learning the Go language concurrent scheduling, as well as the source code implementation. The current version of Go language used is 1.22.4.

This note references the following article:

[Golang Three Levels - Collector's Edition] Golang Scheduler GMP Principles and Scheduling Analysis | Go Technology Forums

Golang GMP Principles

Golang-gopark function and goready function principle analysis

1. GMP model disassembly

The job of the goroutine scheduler is to assign the ready-to-run goroutine to a worker thread, and the main concepts involved are as follows:

1.1、G

G stands for Goroutine, the Go language's abstraction of the concept of concurrency, which has the following characteristics:

  • It's a lightweight thread
  • Has its own stack, state, and executed task functions
  • Each G will be assigned to an available P and run on M

Its structural definition is located in runtime/:

type g struct {
    // ...
    m         *m      
    // ...
    sched     gobuf
    // ...
}

type gobuf struct {
    sp   uintptr
    pc   uintptr
    ret  uintptr
    bp   uintptr // for framepointer-enabled architectures
}

Here, we are centrally concerned with its embedded m and agobufType sched.gobufIt is mainly used for Gorutine context switching, which preserves the state of the CPU registers during G execution, enabling G to correctly resume the context during suspension, scheduling and resumption of operation.

G has the following main states:

const (
	_Gidle = iota // 0
	_Grunnable // 1
	_Grunning // 2
	_Gsyscall // 3
	_Gwaiting // 4
    //...
	_Gdead // 6
    //...
	_Gcopystack // 8
    _Gpreempted // 9
	//...
)
  • Gidle: Indicates that this G has just been allocated and has not yet been initialized.

  • Grunnable: Indicates that this G is in the run queue, it is not currently executing user code, and the stack is unoccupied.

  • Grunning: indicates that this G may be executing user code, that the stack is occupied by this G, that it is not in the run queue, and that it is assigned to an M and a P (the sum is valid).

  • Gsyscall: Indicates that this G is executing a system call, it is not executing user code, and the stack is occupied by this G. It is not in the run queue and it is assigned to an M.

  • Gwaiting: Indicates that this G is blocked at runtime, it is not executing user code, and it is not in the run queue, but it should be logged somewhere so that it can be woken up if necessary. (ready()) gc, channel communication, or locking operations often enter this state.

  • Gdead: Indicates that this G is not currently in use, it may have just been initialized or it may have been destroyed.

  • Gcopystack: Indicates that the stack of this G is being moved.

  • Gpreempted: Indicates that this G is hung due to preemption and that the G stops on its own, waiting for further recovery. It is similar toGwaitingButGpreemptedThere hasn't been an administrator responsible for bringing its status back, only somesuspendGoperation changes the state of this G fromGpreemptedconvert toGwaiting, so that the scheduler will take over this G.

While reading the source code about the scheduling logic, we can search for thecasgstatusmethod to locate the function that makes the state of G change, for example:casgstatus(gp, _Grunning, _Gsyscall)means that by transforming the state of that G from Grunning to Gsyscall, the corresponding function can be found to learn.

1.2、M

M is Machine, also Worker Thread, which represents the thread of the operating system.Go runtime creates or destroys M when needed, and arranges G to be executed on M, making full use of the power of multi-core CPUs. It has the following characteristics:

  • M is the bridge between Go and the operating system, and it is responsible for executing the G assigned to it.
  • The number of M's is adjusted according to system resources.
  • M may be passed by a specific GLockOSThreadlocking, this binding of G and M ensures that a particular Goroutine can continuously use the same thread.

The structure is defined as follows:

type m struct{
	g0      *g     // goroutine with scheduling stack
	curg          *g       // current running goroutine
	tls           [tlsSlots]uintptr // thread-local storage (for x86 extern register)
	p             puintptr // attached p for executing go code (nil if not executing go code)
	oldp          puintptr // the p that was attached before executing a syscall
	//...
}

Every M structure will have an M struct namedg0of G, which is a special Goroutine that does not complexly execute the user's code, but is responsible for scheduling G. g0 will assign G bindings to M for execution.tlsis "Local Thread Storage", which stores specific information about the current thread, while thetlsThe first slot of the array is usually used to storeg0of the stack pointer.

M exists in a state named"Spin state."M in the spin state will keep looking for a runnable G in the global queue to execute and de-spin.

1.3、P

P is Processor, which stands for logical processor and is a virtual concept for Goroutine scheduling. Each P is responsible for allocating the resources to execute the Goroutine, which has the following characteristics:

  • P is the execution context of G, which has a local queue storing G, and a corresponding task scheduling mechanism responsible for executing a specific G on M .
  • The amount of P is determined by the environment variableGOMAXPROCSdecision, it makes no more sense if its number is greater than the number of physical threads in the CPU.
  • P is a necessary resource to execute Go code, and M must have a P bound in order to execute Go code. However, M can execute system calls or be blocked without a P binding.
type p struct {
	status      uint32
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	m           muintptr
	runnext guintptr
	//...
}
  • runq stores the goroutine queue that this P has, with a maximum length of 256
  • runqhead and runqtail point to the head and tail of the queue, respectively
  • runnext stores the next executable goroutine.

P also contains several states, as follows:

const (
	_Pidle = iota
	_Prunning
	_Psyscall
	_Pgcstop
	_Pdead
)
  • Pidle: indicates that P is not being run by the user code or the scheduler, usually this P is in the idle P list for use by the scheduler, but it may transition between other states as well. p consists of the idle queueidle listor other object that transforms its state owns it, itsrunqIt's empty.
  • Prunning: indicates that P is owned by M and is running user code or the scheduler. Only the M that owns this P is allowed to change the state of P. The M can convert P to Pidle (when not working), Psyscall (when entering a system call), Pgcstop (when settling for garbage collection). the M can also hand over ownership of P to another M (e.g., scheduling a locked G)
  • Psyscall: indicates that P is not running user code, and is associated with but not owned by the M in the system call. P in the Psyscall state may be grabbed by another M. Converting P to another M is lightweight and P maintains its association with the original M.
  • Pgcstop: indicates that P is suspended for STW (Stop The World) (perform garbage collection).
  • Pdead: indicates that the P is no longer being used (GOMAXPROCS is reduced). A dead P will be deprived of resources, but a small amount of resources such as the Trace Buffer will be retained for subsequent trace analysis needs.

1.4、Schedt

schedtis an encapsulation of the global goroutine queue

type schedt struct {
    // ...
    lock mutex
    // ...
    runq     gQueue
    runqsize int32![](/blog/3542244/202411/)

    // ...
}
  • lock: is the lock that operates the global queue
  • runq: queue for storing G
  • runqsize: capacity of the global G-queue

2. Workflow of the scheduling model

We can use the following diagram to represent the flow of this scheduling model as a whole:

In the next section, we will focus on how the GMP scheduling model accomplishes a round of scheduling, i.e., how the switch from g0 to g and back again is accomplished, and what roughly happens during that time.

2.1, State transitions of G

We've just mentioned that every M has a name for theg0of the Goroutine, goes in charge of scheduling the execution of the ordinary g bound to M. There is a transition between g0 and the ordinary g. When the code on the ordinary g is executed, the execution is handed over to the g, and when the g finishes executing the code, or for some reason needs to be hung, exited from the execution, etc., the execution is handed over to the g0 again.

g0 and P are in a collaborative relationship, where P's queue determines which goroutines can be called when bound to P, and g0 is the key goroutine that executes the scheduling logic and is responsible for freeing P's resources when necessary.

When g0 needs to hand off execution to g, it calls a program namedgogomethod, passing in the stack pointer for g, to execute the user's code.

func gogo(buf *gobuf)

Whenever it is necessary to reassign execution to g0, a program namedmcallThe methodology.

func mcall(fn func(*g))

mcall is called when go needs to perform a concatenated swap, it passes in a callback functionfn, which carries a pointer to the currently running g, does the following three main things:

  • Save the current information of g, i.e., store the information of PC/SP into g->sched to ensure that the execution site of g can be recovered subsequently.
  • Switch the current M's stack from g to g0
  • Execute a new function fn on the stack of g0, usually with further arrangements in fn for where g is going, and callschedulefunction for the current M to find another G that can be executed.

2.2. Types of scheduling

We now know what functions g and g0 use to switch states. Next we are going to explore what circumstances they are switching, i.e., what are the scheduling strategies.

The GMP scheduling model has a total of four scheduling strategies, which are:Proactive schedulingpassive dispatchnormal dispatchseize the opportunity to schedule

  • Active scheduling: the method provided to the user, when the user has called the () method, at this time the current g will give up the right to execute, and schedule g into the task queue waiting for the next time to be scheduled.
  • Passive scheduling: when a certain execution condition is not met, usually the channel read/write condition is not met, the gopark() function will be executed, and then g will be put into a waiting state.
  • Normal Dispatch: g Normal execution is completed, transferring execution rights.
  • Preemptive scheduling: there exists a global monitor moniter, it will check every period of time to see if there is G running too long, if found, will notify P to carry out and M unbinding, let P. Here the need for the existence of the global monitor is because when the G into the system call, the thread M will be in a deadlock, can not take the initiative to check, need to be assisted by external help.

2.3. Macro scheduling process

Next we focus on the overall round scheduling process, which can be represented in the following figure for the round of scheduling for g0 and g.

scheduleAs the start of each round of scheduling, it looks for a G that can be executed and then calls theexecuteBind that g to a thread M and then execute thegogomethod to actually run a goroutine. when a conversion is needed, the goroutine executes the underlyingmcallmethod, saves the stack information, and then executes the callback functionfn, i.e., one of the methods in the green box, to hand execution back to g0.

2.3.1、schedule()

schedule()method is positioned atruntime/procin which the non-mainstream process part is ignored, the source code reads as follows:

//Find a G that is ready to run.
func schedule() {
mp := getg().m

//...

top.
pp := ()
= false

// If this M is spinning, but the queue contains G, then throw an exception.
if & & ( ! = 0 || ! = ) {
throw("schedule: spinning with local work")
}

gp, inheritTime, tryWakeP := findRunnable() //blocking to find G


    //...

// Currently M is about to run a G, unspinning it.
if {
resetspinning()
}

//...

execute(gp, inheritTime)
}

This method essentially looks for a G that can be run and gives it to that thread to run. We mentioned at the beginning that threads will exist in an instance called"Spin state."state, it will keep spinning to find a G that can be executed to execute, and when it succeeds in finding it, it will be released from the spin state.

There is a point that we should pay attention to. Isn't the thread in the spin state taking up empty computational resources? Doesn't that reduce the performance of the system?

In fact, this is a neutralization strategy, if every time a new Goroutine needs to be executed, we create a thread M to execute it, and then delete it after execution without reusing it, then it will bring a lot of creation and destruction resource consumption. We hope that when a new Goroutine comes, there will be an M to execute it immediately, so that the idle and temporarily untasked M can go to find the Goroutine by itself, which reduces the resource consumption of creation and destruction. But we can't have too many threads in the spin state, or we'd be creating another place to consume too much.

Let's follow up on that.resetspinning(), to see what the strategy for its implementation is.

1、resetspinning()

func resetspinning() {
gp := getg()
//...
= false
nmspinning := (-1)
//...
wakep()
}



// Try to add a P to execute the G. This method is called when a G state is runnable.
func wakep() {
    // Return if the number of M's in the spin is not 0.
if () ! = 0 || ! (0, 1) {
return
}

// Disable preemption until pp's ownership is transferred to the next M in startm, otherwise preemption here will cause pp to get stuck waiting to enter the _Pgcstop state.
mp := acquirem()

var pp *p
lock(&)
    // Attempt to acquire a p from the idle p queue
pp, _ = pidlegetSpinning(0)
if pp == nil {
if (-1) < 0 {
throw("wakep: negative nmspinning")
}
unlock(&)
releasem(mp)
unlock(&) releasem(mp)
}

unlock(&)

startm(pp, true, false)

releasem(mp)
}

existresetspinningin which we first unwind the current M from its spin state and then try to wake up a P, i.e., enter thewakep()Methods in.

if () != 0 || !(0, 1) {
		return
	}

Within the wakep method, we first check the number of M's that are currently in spin, and if >0, we do not wake up a new P. This is to prevent too many spinning M's from consuming CPU resources at the same time.

pp, _ = pidlegetSpinning(0)
	if pp == nil {
		if (-1) < 0 {
			throw("wakep: negative nmspinning")
		}
		unlock(&)
		releasem(mp)
		return
	}

Then it tries to get a P from the free P queue, and if there are no free Ps, then it reduces the number of spin threads (it just reduces the number, but I don't understand what the spin threads do next) and returns.

startm(pp, true, false)

If a free P is acquired, a thread M is assigned to it.

2、findRunnable()

findRunnable is the most central method in the round scheduling process, and it is used to find an executable G.

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
mp := getg().m
top.
    pp := ()
//...
 
    // Check the global G queue every 61 scheduling cycles to prevent relying only on the local queue in certain cases.
if %61 == 0 && > 0 {
lock(&)
gp := globrunqget(pp, 1)
unlock(&)
if gp ! = nil {
return gp, false, false
}
}
    //...
    // local runq
if gp, inheritTime := runqget(pp); gp ! = nil {
return gp, inheritTime, false
}

// global runq
if ! = 0 {
lock(&)
gp := globrunqget(pp, 0)
unlock(&)
unlock(&)) if gp ! = nil {
return gp, false, false
}
}

    // It is an optimization of netpoll to check for a ready network concatenation in a non-blocking way before formally going to steal G.
if netpollinited() && netpollAnyWaiters() && () ! = 0 {
if list, delta := netpoll(0); ! () { // non-blocking
gp := ()
injectglist(&list)
netpollAdjustWaiters(delta)
trace := traceAcquire()
casgstatus(gp, _Gwaiting, _Grunnable)
if () {
(gp, 0)
traceRelease(trace)
}
return gp, false, false
}
}

    // If the current M is out of spin state, or the number of M in spin state is less than half of the number of active P, then G stealing is performed. (to prevent spinning M from taking up too much CPU resources when the parallelism of the system is low)
if || 2*() < () {
if ! {
()
}

gp, inheritTime, tnow, w, newWork := stealWork(now)
if gp ! = nil {
// Successfully stole.
return gp, inheritTime, false
return gp, inheritTime, false }
if newWork {
// There may be new timer or GC work; restart to
// Discover.
goto top
}

now = tnow
if w ! = 0 && (pollUntil == 0 || w < pollUntil) {
// Earlier timer to wait for.
pollUntil = w
}
}

    //...

Its main implementation steps are as follows:

(i) 61st Movement Control
if %61 == 0 &&  > 0 {
		lock(&)
		gp := globrunqget(pp, 1)
		unlock(&)
		if gp != nil {
			return gp, false, false
		}
	}

First check the number of schedules for P. If this is the 61st schedule for P and the global G queue has length >0, a G will be fetched from the global queue. this is to prevent that, in a given situation, only the G of the local queue is run and the global queue is ignored.

Its internal call to theglobrunqgetThe main process of the methodology is as follows:

// Try to get a batch of G from G's global queue
func globrunqget(pp *p, max int32) *g {
assertLockHeld(&)
// check if the global queue is empty
if == 0 {
return nil
}

    // Calculate the number of G's to fetch
n := /gomaxprocs + 1
if n > {
n =
}
if max > 0 && n > max {
n = max
}
    // Ensure that the number of G's fetched from the queue is not more than half of the current number of G's in the local queue, to avoid load imbalance caused by transferring all the G's from the global queue to the local queue
if n > int32(len())/2 {
n = int32(len()) / 2
}
-= n

gp := ()
n-- for ; n > 0; n-- { n = int32(len() / 2
for ; n > 0; n-- {
gp1 := ()
runqput(pp, gp1, false)
}
return gp
}
// Calculate the number of G's to be acquired
n := /gomaxprocs + 1
if n > {
n =
}
if max > 0 && n > max {
n = max
}
if n > int32(len()) / 2 {
n = int32(len()) / 2
}

n is the number of G's to be fetched from the global G queue, you can see that it will fetch at least one G and at mostrunqsize/gomaxprocs+1G, which ensures that a P does not acquire too many Gs and thus affect load balancing. And n is not allowed to acquire more than half of the global G queue at a time to ensure load balancing.

gp := ()
	n--
	for ; n > 0; n-- {
		gp1 := ()
		runqput(pp, gp1, false)
	}

After deciding how many Gs to fetch, the first G is returned directly via a pointer, and the rest are added to P's local queue.

In the current (i) call, the function sets the max value to 1, so only 1 G will be returned from the global queue.


Although it will not be implemented in (i)runqput, but let's see how G is added to P's local queue.

// runqput tries to put G on the local queue
// If next is False, runqput adds G to the end of the local queue
// If True, runqput will add G to the slot of the next G that will be dispatched
//If the run queue is full, then g will be put back into the global queue
func runqput(pp *p, gp *g, next bool) {
    // If randomizeScheduler is full, then g will be put back into the global queue.
if randomizeScheduler && next && randn(2) == 0 {
next = false
}

if next {
oldnext := newnext.
oldnext :=
oldnext := if ! (oldnext, guintptr((gp))) {
goto retryNext
}
if oldnext == 0 {
if oldnext == 0 { return
}
// Kick the old runnext out to the regular run queue.
gp = ()
}

retry.
h := (&) // Load the head of the queue.
t :=
if t-h < uint32(len()) { //check if local queue is full
[t%uint32(len())].set(gp) //unfull insert gp into the specified position in runqtail
(&, t+1) //update runtail to indicate that the inserted G is available for consumption
return
}
if runqputslow(pp, gp, h, t) { // if local queue is full, try to put it back to global queue
return
}
// the queue is not full, now the put above must succeed
goto retry
}
if randomizeScheduler && next && randn(2) == 0 {
		next = false
	}

In the first step, we see that even thoughnextis set to true, which requires that the G should be placed in the local P queue of therunnextin the slot.There is also a probability that next will be set to false

if next {
	retryNext:
		oldnext := 
		if !(oldnext, guintptr((gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		// Kick the old runnext out to the regular run queue.
		gp = ()
	}

If next is still true, it will first get the G (oldnext) in the runnext slot in the original P scheduler, and then it will keep trying to replace the old G with the new one until it succeeds. When it succeeds, the old G will be put into the local queue of P in the following operation flow.

retry.
h := (&) // load queue header position
t :=
if t-h < uint32(len()) { //check if local queue is full
[t%uint32(len())].set(gp) //unfull insert gp into the specified position in runqtail
(&, t+1) //update runtail to indicate that the inserted G is available for consumption
return
}
if runqputslow(pp, gp, h, t) { // if local queue is full, try to put it back to global queue
return
}
// the queue is not full, now the put above must succeed
goto retry
}

In the process of adding G into the local queue of P, it is necessary to obtain the coordinates of the head and tail of the queue, which are used to determine whether the local queue is full or not, and if it is not full, then G is inserted into the tail of the local queue. Otherwise, executerunqputslowmethod to try to put back into the global queue.


We'll follow up on that.runqputslowThe execution flow of the method.

//commander-in-chief (military)Gand a group of work(local queueG)Place in global queue
func runqputslow(pp *p, gp *g, h, t uint32) bool {
	var batch [len()/2 + 1]*g //Half of the local queueG

	// First, grab a batch from local queue.
	n := t - h
	n = n / 2
	if n != uint32(len()/2) {
		throw("runqputslow: queue is not full")
	}
	for i := uint32(0); i < n; i++ {
		batch[i] = [(h+i)%uint32(len())].ptr()
	}
	if !(&, h, h+n) { // cas-release, commits consume
		return false
	}
	batch[n] = gp

	if randomizeScheduler { //disorder
		for i := uint32(1); i <= n; i++ {
			j := cheaprandn(i + 1)
			batch[i], batch[j] = batch[j], batch[i]
		}
	}

	// Link the goroutines.
	for i := uint32(0); i < n; i++ {
		batch[i].(batch[i+1])
	}
	var q gQueue
	(batch[0])
	(batch[n])

	// Now put the batch on global queue.
	lock(&)
	globrunqputbatch(&q, int32(n+1))
	unlock(&)
	return true
}

Its execution process is as follows:

var batch [len()/2 + 1]*g // local queue half of G

First create a batch array that is half of the number of G's currently contained in the local queue with capacity P to store the G's that will be transferred.

n := t - h
	n = n / 2
	if n != uint32(len()/2) {
		throw("runqputslow: queue is not full")
	}
	for i := uint32(0); i < n; i++ {
		batch[i] = [(h+i)%uint32(len())].ptr()
	}

Next, start storing pointers to half of the local queue, G, in batch.

if randomizeScheduler { //disorder
		for i := uint32(1); i <= n; i++ {
			j := cheaprandn(i + 1)
			batch[i], batch[j] = batch[j], batch[i]
		}
	}

The order in the batch will then be upset to ensure randomness.

// Link the goroutines.
	for i := uint32(0); i < n; i++ {
		batch[i].(batch[i+1])
	}
	var q gQueue
	(batch[0])
	(batch[n])

	// Now put the batch on global queue.
	lock(&)
	globrunqputbatch(&q, int32(n+1))
	unlock(&)
	return true

The last part is to connect the individual G's in the batch with pointers, converting thelinked listform and linked in the global queue.

runqputThe process of connecting is longer and is summarized in the diagram below:

(ii) Local queue acquisition
// local runq
	if gp, inheritTime := runqget(pp); gp != nil {
		return gp, inheritTime, false
	}

Had it not been the 61st call.findrunnablewill try to get a G from the local queue for scheduling. Let's look at the execution of the runqget method.

// Get g from the local runnable queue.
func runqget(pp *p) (gp *g, inheritTime bool) {
// If there is a runnext, it is the next G to run.
next :=
    // If runnext is non-zero and the CAS operation fails, it can only be stolen by another P, because other Ps can compete to set runnext to zero, but only the current P can set it to non-zero.
// Therefore, if CAS fails, there is no need to retry the operation.
if next ! = 0 && (next, 0) {
return (), true
}

for {
h := (&) // load-acquire, synchronize with other consumers
t := if t == h {
if t == h {
return nil, false
}
gp := [h%uint32(len())].ptr()
if (&, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}

If it is possible to get the runnext of P, return this one G. Otherwise, get the G at the head of the local queue.

(iii) Global queue fetch
// global runq
	if  != 0 {
		lock(&)
		gp := globrunqget(pp, 0)
		unlock(&)
		if gp != nil {
			return gp, false, false
		}
	}

If G cannot be fetched from the local queue, then the local queue for P is empty and an attempt is made to fetch G from the global queue, calling theglobrunqgetmethod gets G from the global queue, note that at this point, because max is set to 0 to indicate that it is not in effect, the methodIt may be possible to get multiple Gs from the global queue into P's local queue within the. The specific code for this method has been explained in (i).

(iv) Cyber event acquisition
    // Checking for ready network concatenators in a non-blocking way before formally going to steal G. This is an optimization of netpoll.
if netpollinited() && netpollAnyWaiters() && () ! = 0 {
if list, delta := netpoll(0); ! () { // non-blocking
gp := ()
injectglist(&list)
netpollAdjustWaiters(delta)
trace := traceAcquire()
casgstatus(gp, _Gwaiting, _Grunnable)
if () {
(gp, 0)
traceRelease(trace)
}
return gp, false, false
}
}

If there is no G to fetch from either the local queue or the global queue, at this point we will enter a special mechanism of the GMP scheduling model:WorkStealing, i.e., stealing the G of its local queue from other P schedulers to its own local queue, which is a mechanism unique to the GMP scheduling model to more fully utilize threads to improve the overall efficiency of the system.

Before that, an attempt is made to get ready network concatenations in a non-blocking way, and if there are any, the network concatenation is executed first.

Why do you need to introduce detection of network concurrent events specifically in Crip scheduling? Shouldn't this part be decoupled?

This is one of my own reflections on what I think should be an aspect of Go's runtime design principles. runtime's main task is to take care of theconcurrent schedulingcap (a poem)Resource managementIn practice, however, the handling of network events is usually closely related to the scheduling of concurrent programs.Non-blocking network polling mechanism(netpoll) allows for rapid wake-up and scheduling of the appropriate concatenation when a network event occurs, and after a local and global queue check, a network concatenation check ensures a fast response to network I/Os.

(v) Work theft
	if  || 2*() < () {
		if ! {
			()
		}

		gp, inheritTime, tnow, w, newWork := stealWork(now)
		if gp != nil {
			// Successfully stole.
			return gp, inheritTime, false
		}
		//...
	}

When there is no G in both the local queue and the global queue, the work stealing mechanism is performed at this point to try to steal G from the other scheduler P .

if  || 2*() < () {
		if ! {
			()
		}

If the currentThe number of spin M < half of the number of free P, which then sets the current M to the spin state.

gp, inheritTime, tnow, w, newWork := stealWork(now)
		if gp != nil {
			// Successfully stole.
			return gp, inheritTime, false
		}

call (programming)stealWorkCarry out the theft.


func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
pp := getg(). ()

ranTimer := false

    // Steal up to 4 tries from other P's
const stealTries = 4
for i := 0; i < stealTries; i++ {
        //prioritize checking the other P's Timer queue before doing the last traversal
stealTimersOrRunNextG := i == stealTries-1
// Randomize the starting point of the traversal
for enum := (cheaprand()); ! (); () {
//...
p2 := allp[()]
if pp == p2 {
continue
}


//...

//If P is non-idle, then try to steal it
if ! (()) {
if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp ! = nil {
return gp, false, now, pollUntil, ranTimer
}
}
}
}

// If no runnable Goroutine or Timer is found in all attempts, return nil and pollUntil (the time of the next poll).
return nil, false, now, pollUntil, ranTimer
}
const stealTries = 4
	for i := 0; i < stealTries; i++ {

The current P will attempt to steal from other P's local queues up to 4 times.

for enum := (cheaprand()); !(); () {
			//...
			p2 := allp[()]
			if pp == p2 {
				continue
			}

			
			//...

			//in the event thatPis non-idle,Then try to steal the
			if !(()) {
				if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
					return gp, false, now, pollUntil, ranTimer
				}
			}
		}

utilizationrunqstealmethod for stealing.


//through (a gap)p2Stealing half the work topcenter
func runqsteal(pp, p2 *p, stealRunNextG bool) *g {
	t :=
	n := runqgrab(p2, &, t, stealRunNextG)
	if n == 0 {
		return nil
	}
	n--
	gp := [(t+n)%uint32(len())].ptr()
	if n == 0 {
		return gp
	}
	h := (&) // load-acquire, synchronize with consumers
	if t-h+n >= uint32(len()) {
		throw("runqsteal: runq overflow")
	}
	(&, t+n) // store-release, makes the item available for consumption
	return gp
}

runqstealmethod will steal half of its G from p2's local queue and put it into p's local queue, and we follow up with therunqgrabMethods;


func runqgrab(pp *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
h := (&) // load-acquire, synchronize with other consumers
t := (&) // load-acquire, synchronize with the producer
n := t - h
n = n - n/2
if n == 0 {
if stealRunNextG {
// try to steal the next G to be scheduled by P
if next := ; next ! = 0 {
                    //If P is running, in order to avoid frequent task state "jitter" and scheduling contention caused by each other's tasks, sleep for a while and wait for P to finish scheduling before trying to get it again.
if == _Prunning {
if !osHasLowResTimer {
usleep(3)
} else {
osyield()
}
}
                    // Try to steal the task
if ! (next, 0) {
continue
}
                    //Theft successful
batch[batchHead%uint32(len(batch))] = next
batch[batchHead%uint32(len(batch))] = next
}
}
return 0
}
        // If n is more than halfway through the queue, start over with inconsistent h and t due to concurrent accesses.
if n > uint32(len()/2) { // read inconsistent h and t
continue
}
        // Batch grab the tasks from runq
for i := uint32(0); i < n; i++ {
g := [(h+i)%uint32(len())]
batch[(batchHead+i)%uint32(len(batch))] = g
}
if (&, h, h+n) { // cas-release, commits consume
return n
}
}
}

through (a gap)n=n-n/2We can learn that is acquires half the number of G's.

pass (a bill or inspection etc)stealWork->runqsteal->runqgrabof the method link that completes the process of carrying the local queue G of the other P to the local queue of the current P.

(vi) General overview

Finally, we use mapping to review the overallfindRunnableof the implementation process.

2.3.2、execute()

When we successfully pass thefindRunnable()When you find a G that can be executed, you call the current Gexecute()method and start going to call this G.

func execute(gp *g, inheritTime bool) {
mp := getg().m


// Bind G and M
= gp
= mp
    // Change the status of G
casgstatus(gp, _Grunnable, _Grunning)
= 0
= false
gp.stackguard0 = + stackGuard
if !inheritTime {
        // Update the scheduling times for P
().schedtick++
}
//....
//Execute G's task
gogo(&)
}

can be seenexecutes main task is to bind the current G to M, i.e., assign G to this thread M, then adjust its state to the execution state, and finally call thegogomethod completes the run on the user method.

2.3.3、mcall()

We know from subsection 2.3.2 that the executed execute function completes the switch between g0 and g, hands over execution of M to g, and then calls thegogomethod to run g. When it is time to switch the execution of M from g to g0 again, you need to execute themcall()method to complete the switch.mcall()method, which we mentioned in subsection 2.1, is implemented in assembly language, and its main roles are to save information about the stack of g, to switch the current stack from g to g0, and to execute the mcall method passed in on the stack of g0.fnCallback function.

When to callmcall()In this case, it involves the scheduling type that we talked about in subsection 2.2. Next, we analyze the source code one by one.

1. Proactive scheduling

Active scheduling is the letting method provided to the user, executing the runtime package under theGoschedMethods.

func Gosched() {
	checkTimeouts()
	mcall(gosched_m)
}

The Gosched method then calls mcall and passes in the callback functiongosched_m

// Gosched continuation on g0.
func gosched_m(gp *g) {
goschedImpl(gp, false)
}

func goschedImpl(gp *g, preempted bool) {
//...
casgstatus(gp, _Grunning, _Grunnable)// Change the Goroutine status from running to runnable.
//...

dropg() // unbind G and M
lock(&)
globrunqput(gp)// put G into the global queue for the next dispatch
unlock(&)

//...

schedule()// Call the scheduler and select the next Goroutine to run from the global queue or local queue
}

gosched_mcompletes the transition to the state of G and then calls thedropgUnbundle M and G, put G back inside the global queue, and finally call schedule for a new round of scheduling.

2. Passive scheduling

When the current G needs to be called passively, it calls thegoprak(), put it in blocking state and wait for someone to wake it up.

func gopark(unlockf func(*g, ) bool, lock , reason waitReason, traceReason traceBlockReason, traceskip int) {
	//...
	mcall(park_m)
}

// park continuation on g0.
func park_m(gp *g) {
	mp := getg().m

	trace := traceAcquire()

	casgstatus(gp, _Grunning, _Gwaiting)
	//...

	dropg()

	//...
	schedule()
}

goparkThe internal call to themcall(park_m)park_mSet the state of G to WAITING and unbind M and G, then open a new round of scheduling.

The G that enters the wait needs to be passively awakened by other events, at which point it calls thegoreadyMethods.

func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}


The //ready function is used to mark the specified Goroutine (gp) as "ready" and place it in the run queue. It is used when a Goroutine transitions from the _Gwaiting state to the _Grunnable state to ensure that the scheduler can select and execute it.
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
status := readgstatus(gp)

// Mark runnable.
mp := acquirem() // Acquire the current thread (M) and disable it from being preempted to avoid incorrectly keeping P in a local variable.
    // Confirm the status of G
if status&^_Gscan ! = _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
//...
casgstatus(gp, _Gwaiting, _Grunnable)
//....
    // Put this G into the current P's run queue
runqput((), gp, next)
    //Check if there is a free P, and if so wake it up so it can process the newly added runnable Goroutine.
wakep()
    // Release the current lock on M to re-allow preemption.
releasem(mp)
}

readymethod switches the state of G back to run state and puts G into P's run queue. As we can see from the code, the woken up G will not be executed immediately, but will be added to the local queue to wait for the next scheduling.

3. Normal scheduling

If G is executed normally, it will call thegoexit1()method completes the switch between g and g0.

func goexit1() {
	//...
	mcall(goexit0)
}


// goexit continuation on g0.
func goexit0(gp *g) {
	gdestroy(gp)
	schedule()
}

Eventually, the concatenation G is destroyed and a new round of scheduling is opened.

4. Preemptive scheduling

Preemption scheduling is the most complex because it requires the global monitor m to check if all Ps are blocked for a long period of time, which takes time to retrieve and cannot be directly pinpointed to which P needs to be preempted. The global monitor callsretake()The method to go for the check is as follows:

The //retake function is used to handle some of the scheduling policies in Go's scheduler to ensure that Goroutine execution is not blocked for long periods of time. It does this by checking all processors (P), attempting to interrupt overly long system calls and regaining control of P under the right conditions.
func retake(now int64) uint32 {
n := 0
lock(&allpLock)
for i := 0; i < len(allp); i++ {
pp := allp[i]
if pp == nil {
continue
}
pd := &.
pd := & s :=
sysretake := false
if s == _Prunning || s == _Psyscall {
            //// Check the runtime length of `P` if its state is `_Prunning` or `_Psyscall`.
t := int64()
if int64() ! = t {
= uint32(t)
= now
} else if +forcePreemptNS <= now {
                // Exceed maximum runtime, preempt P
preemptone(pp)
// If in syscall state, `preemptone()` can't interrupt P because there is no M bound to P.
sysretake = true
}
}
if s == _Psyscall {
// If `P` stays in the syscall for more than 1 monitor cycle, attempt to retake.
t := int64()
if !sysretake && int64() ! = t {
= uint32(t)
= now
continue
}
            // If the current run queue of P is empty, cut exists at least one spin of M, and the wait time is not exceeded then skip the recycling
if runqempty(pp) && ()+() > 0 && +10*1000*1000 > now {
continue
}
// In order to get ``, first release `allpLock`''
unlock(&allpLock)

            // Recycling operation...
            handoffp(pp)
}
}
unlock(&allpLock)
return uint32(n)
}
for i := 0; i < len(allp); i++ {
		pp := allp[i]
		if pp == nil {
			continue
		}

Get P one by one and check.

if s == _Prunning || s == _Psyscall {
            //// Check the runtime length of `P` if its status is `_Prunning` or `_Psyscall`.
t := int64()
if int64() ! = t {
= uint32(t)
= now
} else if +forcePreemptNS <= now {
                // Exceed maximum runtime, preempt P
preemptone(pp)
// If in syscall state, `preemptone()` can't interrupt P because there is no M bound to P.
sysretake = true
}
}

When the runtime of P exceeds the maximum runtime, the call topreemptoneWay to try to grab the P.

It is worth noting that thepreemptoneThe method is designed to"Do your best."that, because of the existence of concurrencyWe can't be sure that it will notify us of the G that we need to unbundle, because the following conditions may exist:

  • When we try to issue a preemption to notify the G on P that it needs to stop running, it is possible that in the process of issuing the notification, this G finishes running and calls to the next G. We may have notified the wrong G.
  • When G enters the state of a system call, P and M are unbound and we can't notify G anymore.
  • Even if the target's G is notified, it may be executing a newstack, at which point the request is ignored.

Therefore.preemptonemethodologieswill only try to put in the case that it has not unbound itself from M and that g on m is not g0 at this time.Set to true to indicate that a notification was sent and return true. the specific preemption will likely occur at some point in the future.

if s == _Psyscall {
// If `P` stays in the system call for more than 1 monitor cycle, try to retract it.
t := int64()
if !sysretake && int64() ! = t {
= uint32(t)
= now
continue
}
            // If the current run queue of P is empty, cut exists at least one spin of M, and the wait time is not exceeded then skip the recycling
if runqempty(pp) && ()+() > 0 && +10*1000*1000 > now {
continue
}
// In order to get ``, first release `allpLock`''
unlock(&allpLock)

            // Recycle operation...
    if (&, s, _Pidle) {
        //....
        handoffp(pp)
    }

}

Preemptive scheduling is performed when the following three conditions are met:

  • p's local queue has G waiting to be executed.
  • There are currently no free p and m
  • Execution of system calls takes more than 10ms

At this point the preemptive scheduling is called, first setting the state of p to idle, indicating that it can be acquired for binding by other M's, and then calling thehandoffpMethods.

func handoffp(pp *p) {
	// handoffp must start an M in any situation where
	// findrunnable would return a G to run on pp.

	// if it has local work, start it straight away
	if !runqempty(pp) ||  != 0 {
		startm(pp, false, false)
		return
	}
	// if there's trace work to do, start it straight away
	if (traceEnabled() || traceShuttingDown()) && traceReaderAvailable() != nil {
		startm(pp, false, false)
		return
	}
	// if it has GC work, start it straight away
	if gcBlackenEnabled != 0 && gcMarkWorkAvailable(pp) {
		startm(pp, false, false)
		return
	}
	// no local work, check that there are no spinning/idle M's,
	// otherwise our help is not required
	if ()+() == 0 && (0, 1) { // TODO: fast atomic
		(0)
		startm(pp, true, false)
		return
	}
	lock(&)
	if () {
		 = _Pgcstop
		--
		if  == 0 {
			notewakeup(&)
		}
		unlock(&)
		return
	}
	if  != 0 && (&, 1, 0) {
		(pp)
		--
		if  == 0 {
			notewakeup(&)
		}
	}
	if  != 0 {
		unlock(&)
		startm(pp, false, false)
		return
	}
	// If this is the last running P and nobody is polling network,
	// need to wakeup another M to poll network.
	if () == gomaxprocs-1 && () != 0 {
		unlock(&)
		startm(pp, false, false)
		return
	}

	// The scheduler lock cannot be held when calling wakeNetPoller below
	// because wakeNetPoller may call wakep which may call startm.
	when := nobarrierWakeTime(pp)
	pidleput(pp, 0)
	unlock(&)

	if when != 0 {
		wakeNetPoller(when)
	}
}

When we fulfill one of the following cases, a new M is assigned to the current P for scheduling:

  • The global queue is not empty or the local queue is not empty, i.e., there is a G that can be run.
  • Need to have trace to execute.
  • There are garbage collection jobs that need to be performed.
  • There is no spinning thread M at the current moment and there is no idle P (indicating that the task is busy at the current moment).
  • The current P is the only P that is running and has network events waiting to be processed.

When one of the five conditions is met, it all goes to thestartm()method to assign an M to the current P.


func startm(pp *p, spinning, lockheld bool) {
	mp := acquirem()
	if !lockheld {
		lock(&)
	}
	if pp == nil {
		if spinning {
		}
		pp, _ = pidleget(0)
		if pp == nil {
			if !lockheld {
				unlock(&)
			}
			releasem(mp)
			return
		}
	}
	nmp := mget()
	if nmp == nil {
		id := mReserveID()
		unlock(&)

		var fn func()
		if spinning {
			fn = mspinning
		}
		newm(fn, pp, id)

		if lockheld {
			lock(&)
		}
		releasem(mp)
		return
	}
	//...
	releasem(mp)
}
if pp == nil {
		if spinning {
		}
		pp, _ = pidleget(0)
		if pp == nil {
			if !lockheld {
				unlock(&)
			}
			releasem(mp)
			return
		}
	}

If pp is nil, then it will automatically be set to the first p in the free p queue. If it is still nil, it means that there is no free p, and the method will exit.

nmp := mget()
	if nmp == nil {
		id := mReserveID()
		unlock(&)

		var fn func()
		if spinning {
			fn = mspinning
		}
		newm(fn, pp, id)

		if lockheld {
			lock(&)
		}
		releasem(mp)
		return
	}

Then it will try to get the current free m, and if it doesn't exist then it will create a new one.

At this point, about the GMP model of the excerpt part of the explanation is completed, there may be a lot of my understanding of the wrong place welcome to discuss, thank you for watching.