Anaconda 学生排队 android教程 WorldCloud acm multithreading lambda collections tfs icons vue过滤器 vue开发 flink教程视频 jquery循环遍历 jquery获取元素宽度 idea导入多个项目 js获取body的高度 nikto扫描web漏洞 mysql组合索引 linux撤销 hadoop环境变量配置 kafka启动命令 python数据格式 python网络编程 python开发 python开发环境 python例子 python变量类型 java框架 java数组添加 java文件读写 java安装与配置 java抛出自定义异常 unix操作系统下载 端口关闭工具 python输入数字 din字体下载 3d软件下载 linux端口映射 qq钱包实名认证
当前位置: 首页 > 学习教程  > 编程语言

go学习笔记 协程调度器底层实现

2020/12/28 18:40:21 文章标签:

go的调度器实现在 runtime 包中,路径为: ./src/runtime/proc.go 文件中。我们都知道go的强大是因为可以起很多 goroutine 也即是我们所说的协程。那么协程和线程有什么联系呢?协程又是如何调度的呢? go语言其实是在操作系统提供的…

go的调度器实现在 runtime 包中,路径为: ./src/runtime/proc.go 文件中。我们都知道go的强大是因为可以起很多 goroutine 也即是我们所说的协程。那么协程和线程有什么联系呢?协程又是如何调度的呢?

go语言其实是在操作系统提供的内核线程之上搭建了一个特有得【两级线程】模型。下面再说两级线程模型前,有三个必知的核心元素。(G、M、P)
G:Goroutine的缩写,一个G代表了对一段需要被执行的Go语言代码的封装
M:Machine的缩写,一个M代表了一个内核线程
P:Processor的缩写,一个P代表了M所需的上下文环境

简单的来说,一个G的执行需要M和P的支持。一个M在与一个P关联之后形成了一个有效的G运行环境【内核线程 + 上下文环境】。每个P都会包含一个可运行的G的队列 (runq )。
好了下面我们来具体的看看 G、M、P

M (machine):

M是machine的头文字, 在当前版本的golang中等同于系统线程.M可以运行两种代码:

  • go代码, 即goroutine, M运行go代码需要一个P
  • 原生代码, 例如阻塞的syscall, M运行原生代码不需要P

M会从运行队列中取出G, 然后运行G, 如果G运行完毕或者进入休眠状态, 则从运行队列中取出下一个G运行, 周而复始。有时候G需要调用一些无法避免阻塞的原生代码, 这时M会释放持有的P并进入阻塞状态, 其他M会取得这个P并继续运行队列中的G.
go需要保证有足够的M可以运行G, 不让CPU闲着, 也需要保证M的数量不能过多。通常创建一个M的原因是由于没有足够的M来关联P并运行其中可运行的G。而且运行时系统执行系统监控的时候,或者GC的时候也会创建M

M的结构体定义:(在 ./src/runtime/runtime2.go 文件中)

type m struct {
   /*
        1.  所有调用栈的Goroutine,这是一个比较特殊的Goroutine。
        2.  普通的Goroutine栈是在Heap分配的可增长的stack,而g0的stack是M对应的线程栈。
        3.  所有调度相关代码,会先切换到该Goroutine的栈再执行。
    */
	g0      *g     // goroutine with scheduling stack
	morebuf gobuf  // gobuf arg to morestack
	divmod  uint32 // div/mod denominator for arm - known to liblink

	// Fields not known to debuggers.
	procid        uint64       // for debuggers, but offset not hard-coded
	gsignal       *g           // signal-handling g
	goSigStack    gsignalStack // Go-allocated signal handling stack
	sigmask       sigset       // storage for saved signal mask
	tls           [6]uintptr   // thread-local storage (for x86 extern register)
	mstartfn      func()
	curg          *g       // current running goroutine // 表示与当前M锁定那个g
	caughtsig     guintptr // goroutine running during fatal signal
	p             puintptr // attached p for executing go code (nil if not executing go code)
	nextp         puintptr
	oldp          puintptr // the p that was attached before executing a syscall
	id            int64
	mallocing     int32
	throwing      int32
	preemptoff    string // if != "", keep curg running on this m
	locks         int32
	dying         int32
	profilehz     int32
	spinning      bool // m is out of work and is actively looking for work
	blocked       bool // m is blocked on a note
	newSigstack   bool // minit on C thread called sigaltstack
	printlock     int8
	incgo         bool   // m is executing a cgo call
	freeWait      uint32 // if == 0, safe to free g0 and delete m (atomic)
	fastrand      [2]uint32
	needextram    bool
	traceback     uint8
	ncgocall      uint64      // number of cgo calls in total
	ncgo          int32       // number of cgo calls currently in progress
	cgoCallersUse uint32      // if non-zero, cgoCallers in use temporarily
	cgoCallers    *cgoCallers // cgo traceback if crashing in cgo call
	park          note
	alllink       *m // on allm
	schedlink     muintptr
	lockedg       guintptr
	createstack   [32]uintptr // stack that created this thread.
	lockedExt     uint32      // tracking for external LockOSThread
	lockedInt     uint32      // tracking for internal lockOSThread
	nextwaitm     muintptr    // next m waiting for lock
	waitunlockf   func(*g, unsafe.Pointer) bool
	waitlock      unsafe.Pointer
	waittraceev   byte
	waittraceskip int
	startingtrace bool
	syscalltick   uint32
	freelink      *m // on sched.freem

	// these are here because they are too large to be on the stack
	// of low-level NOSPLIT functions.
	libcall   libcall
	libcallpc uintptr // for cpu profiler
	libcallsp uintptr
	libcallg  guintptr
	syscall   libcall // stores syscall parameters on windows

	vdsoSP uintptr // SP for traceback while in VDSO call (0 if not in call)
	vdsoPC uintptr // PC for traceback while in VDSO call

	// preemptGen counts the number of completed preemption
	// signals. This is used to detect when a preemption is
	// requested, but fails. Accessed atomically.
	preemptGen uint32

	// Whether this is a pending preemption signal on this M.
	// Accessed atomically.
	signalPending uint32

	dlogPerM

	mOS

	// Up to 10 locks held by this m, maintained by the lock ranking code.
	locksHeldLen int
	locksHeld    [10]heldLockInfo
}

的字段众多,其中最重要的为下面四个:

g0: Go运行时系统在启动之初创建的,用于执行一些运行时任务。

mstartfn:表示M的起始函数。其实就是我们 go 语句携带的那个函数啦。

curg:存放当前正在运行的G的指针。

p:指向当前与M关联的那个P。

nextp:用于暂存于当前M有潜在关联的P。 (预联)当M重新启动时,即用预联的这个P做关联啦

spinning:表示当前M是否正在寻找G。在寻找过程中M处于自旋状态

lockedg:表示与当前M锁定的那个G。运行时系统会把 一个M 和一个G锁定,一旦锁定就只能双方相互作用,不接受第三者

M并没有像G和P一样的状态标记, 但可以认为一个M有以下的状态:

  • 自旋中(spinning): M正在从运行队列获取G, 这时候M会拥有一个P
  • 执行go代码中: M正在执行go代码, 这时候M会拥有一个P
  • 执行原生代码中: M正在执行原生代码或者阻塞的syscall, 这时M并不拥有P
  • 休眠中: M发现无待运行的G时会进入休眠, 并添加到空闲M链表中, 这时M并不拥有P

自旋中(spinning)这个状态非常重要, 是否需要唤醒或者创建新的M取决于当前自旋中的M的数量。

M在被创建之初会被加入到全局的M列表 【runtime.allm】 。接着,M的起始函数(mstartfn)和准备关联的P(p)都会被设置。最后,运行时系统会为M专门创建一个新的内核线程并与之关联。这时候这个新的M就为执行G做好了准备。其中起始函数(mstartfn)仅当运行时系统要用此M执行系统监控或者垃圾回收等任务的时候才会被设置。全局M列表的作用是运行时系统在需要的时候会通过它获取到所有的M的信息,同时防止M被gc

在新的M被创建后做一番初始化工作。其中包括了对自身所持的栈空间以及信号做处理的初始化。在上述初始化完成后 mstartfn 函数就会被执行 (如果存在的话)。【注意】:如果mstartfn 代表的是系统监控任务的话,那么该M会一直在执行mstartfn 而不会有后续的流程。否则 mstartfn 执行完后,当前M将会与那个准备与之关联的P完成关联。至此,一个并发执行环境才真正完成。之后就是M开始寻找可运行的G并运行之。

运行时系统管辖的M会在GC任务执行的时候被停止,这时候系统会对M的属性做某些必要的重置并把M放置入调度器的空闲M列表。【很重要】因为在需要一个未被使用的M时,运行时系统会先去这个空闲列表获取M。(只有都没有的时候才会创建M)

M本身是无状态的。M是否有空闲仅以它是否存在于调度器的空闲M列表 runtime.sched.midle  中为依据 (空闲列表不是那个全局列表哦)。

单个Go程序所使用的M的最大数量是可以被设置的。在我们使用命令运行Go程序时候,有一个引导程序先会被启动的。在这个歌引导程序中会为Go程序的运行简历必要的环境。引导程序对M的数量进行初始化设置,默认是 最大值 1W 【即是说,一个Go程序最多可以使用1W个M,即:理想状态下,可以同时有1W个内核线程被同时运行】。使用 runtime/debug.SetMaxThreads() 函数设置。

P (process):

P是process的头文字, 代表M运行G所需要的资源。一些讲解协程的文章把P理解为cpu核心, 其实这是错误的.虽然P的数量默认等于cpu核心数但可以通过环境变量GOMAXPROC修改, 在实际运行时P跟cpu核心并无任何关联。

P也可以理解为控制go代码的并行度的机制,
如果P的数量等于1, 代表当前最多只能有一个线程(M)执行go代码,
如果P的数量等于2, 代表当前最多只能有两个线程(M)执行go代码.
执行原生代码的线程数量不受P控制

因为同一时间只有一个线程(M)可以拥有P, P中的数据都是锁自由(lock free)的, 读写这些数据的效率会非常的高

P是使G能够在M中运行的关键。Go运行时系统适当地让P与不同的M建立或者断开联系,以使得P中的那些可运行的G能够在需要的时候及时获得运行时机。

P的结构体定义:(在 ./src/runtime/runtime2.go 文件中)

type p struct {
	id          int32
	status      uint32 // one of pidle/prunning/...
	link        puintptr
	schedtick   uint32     // incremented on every scheduler call
	syscalltick uint32     // incremented on every system call
	sysmontick  sysmontick // last tick observed by sysmon
	m           muintptr   // back-link to associated m (nil if idle)
	mcache      *mcache
	pcache      pageCache
	raceprocctx uintptr

	deferpool    [5][]*_defer // pool of available defer structs of different sizes (see panic.go)
	deferpoolbuf [5][32]*_defer

	// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
	goidcache    uint64
	goidcacheend uint64

	// Queue of runnable goroutines. Accessed without lock.
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	// runnext, if non-nil, is a runnable G that was ready'd by
	// the current G and should be run next instead of what's in
	// runq if there's time remaining in the running G's time
	// slice. It will inherit the time left in the current time
	// slice. If a set of goroutines is locked in a
	// communicate-and-wait pattern, this schedules that set as a
	// unit and eliminates the (potentially large) scheduling
	// latency that otherwise arises from adding the ready'd
	// goroutines to the end of the run queue.
	runnext guintptr

	// Available G's (status == Gdead)
	gFree struct {
		gList
		n int32
	}

	sudogcache []*sudog
	sudogbuf   [128]*sudog

	// Cache of mspan objects from the heap.
	mspancache struct {
		// We need an explicit length here because this field is used
		// in allocation codepaths where write barriers are not allowed,
		// and eliminating the write barrier/keeping it eliminated from
		// slice updates is tricky, moreso than just managing the length
		// ourselves.
		len int
		buf [128]*mspan
	}

	tracebuf traceBufPtr

	// traceSweep indicates the sweep events should be traced.
	// This is used to defer the sweep start event until a span
	// has actually been swept.
	traceSweep bool
	// traceSwept and traceReclaimed track the number of bytes
	// swept and reclaimed by sweeping in the current sweep loop.
	traceSwept, traceReclaimed uintptr

	palloc persistentAlloc // per-P to avoid mutex

	_ uint32 // Alignment for atomic fields below

	// The when field of the first entry on the timer heap.
	// This is updated using atomic functions.
	// This is 0 if the timer heap is empty.
	timer0When uint64

	// Per-P GC state
	gcAssistTime         int64    // Nanoseconds in assistAlloc
	gcFractionalMarkTime int64    // Nanoseconds in fractional mark worker (atomic)
	gcBgMarkWorker       guintptr // (atomic)
	gcMarkWorkerMode     gcMarkWorkerMode

	// gcMarkWorkerStartTime is the nanotime() at which this mark
	// worker started.
	gcMarkWorkerStartTime int64

	// gcw is this P's GC work buffer cache. The work buffer is
	// filled by write barriers, drained by mutator assists, and
	// disposed on certain GC state transitions.
	gcw gcWork

	// wbBuf is this P's GC write barrier buffer.
	//
	// TODO: Consider caching this in the running G.
	wbBuf wbBuf

	runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point

	// Lock for timers. We normally access the timers while running
	// on this P, but the scheduler can also do it from a different P.
	timersLock mutex

	// Actions to take at some time. This is used to implement the
	// standard library's time package.
	// Must hold timersLock to access.
	timers []*timer

	// Number of timers in P's heap.
	// Modified using atomic instructions.
	numTimers uint32

	// Number of timerModifiedEarlier timers on P's heap.
	// This should only be modified while holding timersLock,
	// or while the timer status is in a transient state
	// such as timerModifying.
	adjustTimers uint32

	// Number of timerDeleted timers in P's heap.
	// Modified using atomic instructions.
	deletedTimers uint32

	// Race context used while executing timer functions.
	timerRaceCtx uintptr

	// preempt is set to indicate that this P should be enter the
	// scheduler ASAP (regardless of what G is running on it).
	preempt bool

	pad cpu.CacheLinePad
}

通过runtime.GOMAXPROCS函数我们可以改变单个Go程序可以间拥有的P的最大数量。P的最大数量相当于是对可以被并发执行的用户级的G的数量作出限制。

每一个P都必须关联一个M才能使其中的G得以运行

【注意】:运行时系统会将M与关联的P分离开来。但是如果该P的可运行队列中还有未运行的G,那么运行时系统就会找到一个空的M (在调度器的空闲队列中的M) 或者创建一个空的M,并与该P关联起来(为了运行G而做准备)。

runtime.GOMAXPROCS函数设置的只会影响P的数量,但是对M (内核线程)的数量不会影响,所以runtime.GOMAXPROCS 并不是控制线程数,只能说是影响上下文环境P的数目

在Go程序开始运行时,会先由引导程序对M做了数量上的限制,及对P做了限制,P的数量默认为1。所以我们无论在程序中使用go关键字启用多少goroutine,它们都会被塞到一个P的可运行G队列中

在确认P的最大数量后,运行时系统会根据这个数值初始化全局的P列表 【runtime.allp】,类似全局M列表,其中包含了所有 运行时系统创建的所有P。随后,运行时系统会把调度器的可运行G队列【runtime.sched.runq】中的所有G均匀的放入全局的P列表中的各个P的可执行G队列当中。到这里为止,运行时系统需要用到的所有P都准备就绪了。

类似M的空闲列表,调度器也存在一个P的空闲列表【runtime.sched.pidle】,当一个P不再与任何M关联的时候,运行时系统就会把该P放入这个列表中,而一个空闲的P关联了某个M之后会被从这个列表中取出【注意:就算一个P加入了空闲队列,但是它的可运行G队列不一定为空

和M不同P是有状态的:(五种)

Pidle:当前P未和任何M关联

Prunning:当前P正在和某个M关联

Psyscall:当前P中的被运行的那个G正在进行系统调用

Pgcstop:运行时系统正在进行gc。(运行时系统在gc时会试图把全局P列表中的P都处于此状态)

Pdead:当前P已经不再被使用。(在调用runtime.GOMAXPROCS减少P的数量时,多余的P就处于此状态)

P的初始状态就是为Pgcstop,处于这个状态很短暂,在初始化和填充P中的G队列之后,运行时系统会将其状态置为Pidle并放入调度器的空闲P列表 (runtime.sched.pidle)中。其中的P会由调度器根据实际情况进行取用。下图是P在各个状态建的流转情况:

从上图,我们可以看出,除了Pdead之外的其他状态的P都会在运行时系统欲进行GC是被指为Pgcstop在gc结束后状态不会回复到之前的状态的,而是都统一直接转到了Pidle 【这意味着,他们都需要被重新调度】。【注意】:除了Pgcstop 状态的P,其他状态的P都会在 调用runtime.GOMAXPROCS 函数去减少P数目时,被认为是多余的P而状态转为Pdead,这时候其带的可运行G的队列中的G都会被转移到 调度器的可运行G队列中,它的自由G队列 【gfree】也是一样被移到调度器的自由列表 【runtime.sched.gfree】中

【注意】:每个P中都有一个可运行G队列自由G队列。自由G队列包含了很多已经完成的G,随着被运行完成的G的积攒到一定程度后,运行时系统会把其中的部分G转移的调度器的自由G队列 【runtime.sched.gfree】中。

【注意】:当我们每次用 go关键字 启用一个G的时候,运行时系统都会先从P的自由G队列获取一个G来封装我们提供的函数 (go 关键字后面的函数) ,如果发现P中的自由G过少时,会从调度器的自由G队列中移一些G过来,只有连调度器的自由G列表都弹尽粮绝的时候,才会去创建新的G。

G (goroutine):

G是goroutine的头文字, goroutine可以解释为受管理的轻量线程, goroutine使用go关键词创建。

举例来说,  func main() { go other() },  这段代码创建了两个goroutine。
一个是main, 另一个是other, 【注意】:main本身也是一个goroutine

goroutine的新建, 休眠, 恢复, 停止都受到go运行时的管理。
goroutine执行异步操作时会进入休眠状态, 待操作完成后再恢复, 无需占用系统线程
goroutine新建或恢复时会添加到运行队列等待M取出并运行

G的结构体定义:(在 ./src/runtime/runtime2.go 文件中)

type g struct {
	// Stack parameters.
	// stack describes the actual stack memory: [stack.lo, stack.hi).
	// stackguard0 is the stack pointer compared in the Go stack growth prologue.
	// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
	// stackguard1 is the stack pointer compared in the C stack growth prologue.
	// It is stack.lo+StackGuard on g0 and gsignal stacks.
	// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
	stack       stack   // offset known to runtime/cgo  描述了真实的栈内存,包括上下界
	stackguard0 uintptr // offset known to liblink
	stackguard1 uintptr // offset known to liblink

	_panic       *_panic // innermost panic - offset known to liblink
	_defer       *_defer // innermost defer
	m            *m      // current m; offset known to arm liblink
	sched        gobuf  // goroutine切换时,用于保存g的上下文
	syscallsp    uintptr        // if status==Gsyscall, syscallsp = sched.sp to use during gc
	syscallpc    uintptr        // if status==Gsyscall, syscallpc = sched.pc to use during gc
	stktopsp     uintptr        // expected sp at top of stack, to check in traceback
	param        unsafe.Pointer // passed parameter on wakeup用于传递参数,睡眠时其他goroutine可以设置param,唤醒时该goroutine可以获取
	atomicstatus uint32
	stackLock    uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
	goid         int64
	schedlink    guintptr
	waitsince    int64      // approx time when the g become blocked g被阻塞的大体时间
	waitreason   waitReason // if status==Gwaiting

	preempt       bool // preemption signal, duplicates stackguard0 = stackpreempt
	preemptStop   bool // transition to _Gpreempted on preemption; otherwise, just deschedule
	preemptShrink bool // shrink stack at synchronous safe point

	// asyncSafePoint is set if g is stopped at an asynchronous
	// safe point. This means there are frames on the stack
	// without precise pointer information.
	asyncSafePoint bool

	paniconfault bool // panic (instead of crash) on unexpected fault address
	gcscandone   bool // g has scanned stack; protected by _Gscan bit in status
	throwsplit   bool // must not split stack
	// activeStackChans indicates that there are unlocked channels
	// pointing into this goroutine's stack. If true, stack
	// copying needs to acquire channel locks to protect these
	// areas of the stack.
	activeStackChans bool
	// parkingOnChan indicates that the goroutine is about to
	// park on a chansend or chanrecv. Used to signal an unsafe point
	// for stack shrinking. It's a boolean value, but is updated atomically.
	parkingOnChan uint8

	raceignore     int8     // ignore race detection events
	sysblocktraced bool     // StartTrace has emitted EvGoInSyscall about this goroutine
	sysexitticks   int64    // cputicks when syscall has returned (for tracing)
	traceseq       uint64   // trace event sequencer
	tracelastp     puintptr // last P emitted an event for this goroutine
	lockedm        muintptr  //G被锁定只在这个m上运行
	sig            uint32
	writebuf       []byte
	sigcode0       uintptr
	sigcode1       uintptr
	sigpc          uintptr
	gopc           uintptr         // pc of go statement that created this goroutine
	ancestors      *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors)
	startpc        uintptr         // pc of goroutine function
	racectx        uintptr
	waiting        *sudog         // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
	cgoCtxt        []uintptr      // cgo traceback context
	labels         unsafe.Pointer // profiler labels
	timer          *timer         // cached timer for time.Sleep
	selectDone     uint32         // are we participating in a select and did someone win the race?

	// Per-G GC state

	// gcAssistBytes is this G's GC assist credit in terms of
	// bytes allocated. If this is positive, then the G has credit
	// to allocate gcAssistBytes bytes without assisting. If this
	// is negative, then the G must correct this by performing
	// scan work. We track this in bytes to make it fast to update
	// and check for debt in the malloc hot path. The assist ratio
	// determines how this corresponds to scan work debt.
	gcAssistBytes int64
}

// 用于保存G切换时上下文的缓存结构体
type gobuf struct {
	// The offsets of sp, pc, and g are known to (hard-coded in) libmach.
	//
	// ctxt is unusual with respect to GC: it may be a
	// heap-allocated funcval, so GC needs to track it, but it
	// needs to be set and cleared from assembly, where it's
	// difficult to have write barriers. However, ctxt is really a
	// saved, live register, and we only ever exchange it between
	// the real register and the gobuf. Hence, we treat it as a
	// root during stack scanning, which means assembly that saves
	// and restores it doesn't need write barriers. It's still
	// typed as a pointer so that any other writes from Go get
	// write barriers.
	sp   uintptr  // 当前的栈指针
	pc   uintptr // 计数器
	g    guintptr // g自身
	ctxt unsafe.Pointer
	ret  sys.Uintreg
	lr   uintptr
	bp   uintptr // for GOEXPERIMENT=framepointer
}

下面我们来讲讲G。Go语言的编译器会把我们编写的go语句编程一个运行时系统的函数调用,并把go语句中那个函数及其参数都作为参数传递给这个运行时系统函数中。

运行时系统在接到这样一个调用后,会先检查一下go函数及其参数的合法性,紧接着会试图从本地P的自由G队列中(或者调度器的自由G队列)中获取一个可用的自由G (P中有讲述了),如果没有则新创建一个G。类似M和P,G在运行时系统中也有全局的G列表【runtime.allg】,那些新建的G会先放到这个全局的G列表中,其列表的作用也是集中放置了当前运行时系统中给所有的G的指针。在用自由G封装go的函数时,运行时系统都会对这个G做一次初始化。

初始化:包含了被关联的go关键字后的函数及当前G的状态机G的ID等等。在G被初始化完成后就会被放置到当前本地的P的可运行队列中。只要时机成熟,调度器会立即尽心这个G的调度运行。

G的各种状态:

Gidle:G被创建但还未完全被初始化。

Grunnable:当前G为可运行的,正在等待被运行。

Grunning:当前G正在被运行。

Gsyscall:当前G正在被系统调用

Gwaiting:当前G正在因某个原因而等待

Gdead:当前G完成了运行

正在被初始化进行中的G是处于Grunnable状态的。一个G真正被使用是在状态为Grunnable之后。G的生命周期及状态变化如图:

图上有一步是事件到来,那么G在运行过程中,是否等待某个事件以及等待什么样的事件完全由起封装的go关键字后的函数决定。(如:等待chan中的值、涉及网络I/O、time.Timer、time.Sleep等等事件)

G退出系统调用,及其复杂:运行时系统先会尝试直接运行当前G,仅当无法被运行时才会转成Grunnable并放置入调度器的自由G列表中。

最后,已经是Gdead状态的G是可以被重新初始化并使用的。而对比进入Pdead状态的P等待的命运只有被销毁。处于Gdead的G会被放置到本地P或者调度器的自由G列表中。

至此,G、M、P的初步描述已经完毕,下面我们来看一看一些核心的队列:

G、M、P的容器
中文名源码的名称作用域简要说明
全局M列表runtime.allm运行时系统存放所有M
全局P列表runtime.allp运行时系统存放所有P
全局G列表runtime.allg运行时系统存放所有G
调度器中的空闲M列表runtime.sched.midle调度器存放空闲M
调度器中的空闲P列表runtime.sched.pidle调度器存放空闲P
调度器中的可运行G队列runtime.sched.runq调度器存放可运行G
调度器中那个的自由G列表runtime.sched.gfree调度器存放自由G
P的可运行G队列runq本地P存放当前P中的可运行G
P中的自由G列表gfree本地P存放当前P中的自由G

 三个全局的列表主要为了统计运行时系统的的所有G、M、P。我们主要关心剩下的这些容器,尤其是和G相关的四个

在运行时系统创建的G都会被保存在全局的G列表中,值得注意的是:从Gsyscall转出来的G都会被放置到调度器的可运行G队列中。而被运行时系统初始化的G会被放置到本地P的可运行列表中。从Gwaiting转出来的G,除了因网络I/O陷入等待的G之外,都会被放置到本地P的可运行G队列中。转成Gdead状态的G会先被放置到本地P的自由G列表 (上面的描述可以知道这一点)。调度器中的与G、M、P相关的列表其实只是起了一个暂存的作用。

一句话概括三者关系:

  • G需要绑定在M上才能运行;
  • M需要绑定P才能运行;

下面我们看一看三者及内核调度实体【KSE】的关系:

综上所述,一个G的执行需要M和P的支持。一个M在于一个P关联之后就形成一个有效的G运行环境 【内核线程 +  上下文环境】。每个P都含有一个 可运行G的队列【runq】。队列中的G会被一次传递给本地P关联的M并且获得运行时机

由上图可以看出 M 与 KSE 总是 一对一 的。一个M能且仅能代表一个内核线程

一个M的生命周期内,它会且仅会与一个KSE产生关联M与P以及P与G之间的关联是多变的总是会随着实际调度的过程而改变。其中, M 与 P 总是一对一,P 与 G 总是 一对多, 而 一个 G 最终由 一个 M 来负责运行

上述我们讲的运行时系统其实就是我们下面要说的调度器。

我们再来回顾下G、M、P 中的主要成员:

G里面比较重要的成员:

  • stack: 当前g使用的栈空间, 有lo和hi两个成员
  • stackguard0: 检查栈空间是否足够的值, 低于这个值会扩张栈, 0是go代码使用的
  • stackguard1: 检查栈空间是否足够的值, 低于这个值会扩张栈, 1是原生代码使用的
  • m: 当前g对应的m
  • sched: g的调度数据, 当g中断时会保存当前的pc和rsp等值到这里, 恢复运行时会使用这里的值
  • atomicstatus: g的当前状态
  • schedlink: 下一个g, 当g在链表结构中会使用
  • preempt: g是否被抢占中
  • lockedm: g是否要求要回到这个M执行, 有的时候g中断了恢复会要求使用原来的M执行

M里面比较重要的成员:

  • g0: 用于调度的特殊g, 调度和执行系统调用时会切换到这个g
  • curg: 当前运行的g
  • p: 当前拥有的P
  • nextp: 唤醒M时, M会拥有这个P
  • park: M休眠时使用的信号量, 唤醒M时会通过它唤醒
  • schedlink: 下一个m, 当m在链表结构中会使用
  • mcache: 分配内存时使用的本地分配器, 和p.mcache一样(拥有P时会复制过来)
  • lockedg: lockedm的对应值

P里面比较重要的成员:

  • status: p的当前状态
  • link: 下一个p, 当p在链表结构中会使用
  • m: 拥有这个P的M
  • mcache: 分配内存时使用的本地分配器
  • runqhead: 本地运行队列的出队序号
  • runqtail: 本地运行队列的入队序号
  • runq: 本地运行队列的数组, 可以保存256个G
  • gfree: G的自由列表, 保存变为_Gdead后可以复用的G实例
  • gcBgMarkWorker: 后台GC的worker函数, 如果它存在M会优先执行它
  • gcw: GC的本地工作队列, 详细将在下一篇(GC篇)分析

调度器涉及到的结构体除了上面的G、M、P 之外,还有以下,比如全局的调度器:

type schedt struct {
	// accessed atomically. keep at top to ensure alignment on 32-bit systems.
    // 下面两个变量需以原子访问访问。保持在 struct 顶部,确保其在 32 位系统上可以对齐
	goidgen   uint64
	lastpoll  uint64 // time of last network poll, 0 if currently polling
	pollUntil uint64 // time to which current poll is sleeping

	lock mutex

	// When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be
	// sure to call checkdead().
    // 当修改 nmidle,nmidlelocked,nmsys,nmfreed 这些数值时
    // 需要记得调用 checkdead
	midle        muintptr // idle m's waiting for work 空闲的M 队列
	nmidle       int32    // number of idle m's waiting for work  当前等待工作的空闲 m 计数
	nmidlelocked int32    // number of locked m's waiting for work 当前等待工作的被 lock 的 m 计数
	mnext        int64    // number of m's that have been created and next M ID
	maxmcount    int32    // maximum number of m's allowed (or die)
	nmsys        int32    // number of system m's not counted for deadlock
	nmfreed      int64    // cumulative number of freed m's

	ngsys uint32 // number of system goroutines; updated atomically 系统 goroutine 的数量, 原子操作

	pidle      puintptr // idle p's   空闲的 p 队列
	npidle     uint32
	nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

	// Global runnable queue.
	runq     gQueue
	runqsize int32

	// disable controls selective disabling of the scheduler.
	//
	// Use schedEnableUser to control this.
	//
	// disable is protected by sched.lock.
	disable struct {
		// user disables scheduling of user goroutines.
		user     bool
		runnable gQueue // pending runnable Gs
		n        int32  // length of runnable
	}

	// Global cache of dead G's.
	gFree struct {
		lock    mutex
		stack   gList // Gs with stacks
		noStack gList // Gs without stacks
		n       int32
	}

	// Central cache of sudog structs.
	sudoglock  mutex
	sudogcache *sudog

	// Central pool of available defer structs of different sizes.
	deferlock mutex
	deferpool [5]*_defer

	// freem is the list of m's waiting to be freed when their
	// m.exited is set. Linked through m.freelink.
	freem *m

	gcwaiting  uint32 // gc is waiting to run
	stopwait   int32
	stopnote   note
	sysmonwait uint32
	sysmonnote note

	// safepointFn should be called on each P at the next GC
	// safepoint if p.runSafePointFn is set.
	safePointFn   func(*p)
	safePointWait int32
	safePointNote note

	profilehz int32 // cpu profiling rate

	procresizetime int64 // nanotime() of last change to gomaxprocs
	totaltime      int64 // ∫gomaxprocs dt up to procresizetime

	// sysmonlock protects sysmon's actions on the runtime.
	//
	// Acquire and hold this mutex to block sysmon from interacting
	// with the rest of the runtime.
	sysmonlock mutex
}
type gQueue struct {
	head guintptr
	tail guintptr
}

局调度器,全局只有一个schedt类型的实例

sudoG 结构体:

// sudog 代表在等待列表里的 g,比如向 channel 发送/接收内容时
// 之所以需要 sudog 是因为 g 和同步对象之间的关系是多对多的
// 一个 g 可能会在多个等待队列中,所以一个 g 可能被打包为多个 sudog
// 多个 g 也可以等待在同一个同步对象上
// 因此对于一个同步对象就会有很多 sudog 了
// sudog 是从一个特殊的池中进行分配的。用 acquireSudog 和 releaseSudog 来分配和释放 sudog
type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.

	g *g

	next *sudog
	prev *sudog
	elem unsafe.Pointer // data element (may point to stack)

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.

	acquiretime int64
	releasetime int64
	ticket      uint32

	// isSelect indicates g is participating in a select, so
	// g.selectDone must be CAS'd to win the wake-up race.
	isSelect bool

	parent   *sudog // semaRoot binary tree
	waitlink *sudog // g.waiting list or semaRoot
	waittail *sudog // semaRoot
	c        *hchan // channel
}

那么goroutine的入口是怎么样的呢?首先,我们从goroutine是如何被创建的说起,创建goroutine的函数为:newproc 函数 (在 ./src/runtime/proc.go 文件中),即:使用go命令创建goroutine时, go会把go命令编译为对runtime.newproc的调用

// Create a new g running fn with siz bytes of arguments.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
//
// The stack layout of this call is unusual: it assumes that the
// arguments to pass to fn are on the stack sequentially immediately
// after &fn. Hence, they are logically part of newproc's argument
// frame, even though they don't appear in its signature (and can't
// because their types differ between call sites).
//
// This must be nosplit because this stack layout means there are
// untyped arguments in newproc's argument frame. Stack copies won't
// be able to adjust them and stack splits won't be able to copy them.
//
//go:nosplit
// 根据 参数 fn 和 siz 创建一个 g 并把它放置入 自由g队列中等待唤醒
// 编译器翻译一个 go 表达式时会调用这个函数
// 无法拆分堆栈,因为它假设参数在 &fn 之后顺序可用; 如果发生堆栈拆分,则不会复制它们。
//  新建一个goroutine, 用fn + PtrSize 获取第一个参数的地址,也就是argp 用siz - 8 获取pc地址
func newproc(siz int32, fn *funcval) {
     // add 是一个指针运算,跳过函数指针 把栈上的参数起始地址找到
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	gp := getg()
    // getcallerpc返回的是 调用函数之后的那条程序指令的地址,
    // 即callee函数返回时要执行的下一条指令的地址
	pc := getcallerpc()
	systemstack(func() {
		newg := newproc1(fn, argp, siz, gp, pc)

		_p_ := getg().m.p.ptr()
		runqput(_p_, newg, true)

		if mainStarted {
			wakep()
		}
	})
}
type funcval struct {
	fn uintptr
	// variable-size, fn-specific data here
}

runtime.newproc函数中只做了三件事:

  • 计算额外参数的地址 argp
  • 获取调用端的地址(返回地址) pc
  • 使用systemstack调用 newproc1 函数床架G, 最后调用runqput【把g放到本地可运行对列中】
// Create a new g in state _Grunnable, starting at fn, with narg bytes
// of arguments starting at argp. callerpc is the address of the go
// statement that created this. The caller is responsible for adding
// the new g to the scheduler.
//
// This must run on the system stack because it's the continuation of
// newproc, which cannot split the stack.
//
//go:systemstack
// 根据函数参数和函数地址,创建一个新的G,然后将这个G加入队列等待运行
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
	_g_ := getg()  // 先获取 当前 g,其实这里获取到的是 g0
     // 判断下 func 的实现是否为空
	if fn == nil {
		_g_.m.throwing = -1 // do not dump full stacks
		throw("go of nil func value")
	}
    // 设置g对应的m的locks++, 禁止抢占
	acquirem() // disable preemption because it can be holding p in a local var
	siz := narg
	siz = (siz + 7) &^ 7

	// We could allocate a larger initial stack if necessary.
	// Not worth it: this is almost always an error.
	// 4*sizeof(uintreg): extra space added below
	// sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
	if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
		throw("newproc: function arguments too large for new goroutine")
	}

	_p_ := _g_.m.p.ptr()
	newg := gfget(_p_)
	if newg == nil {
		newg = malg(_StackMin)
		casgstatus(newg, _Gidle, _Gdead)
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}
	if newg.stack.hi == 0 {
		throw("newproc1: newg missing stack")
	}

	if readgstatus(newg) != _Gdead {
		throw("newproc1: new g is not Gdead")
	}

	totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
	totalSize += -totalSize & (sys.SpAlign - 1)                  // align to spAlign
	sp := newg.stack.hi - totalSize
	spArg := sp
	if usesLR {
		// caller's LR
		*(*uintptr)(unsafe.Pointer(sp)) = 0
		prepGoExitFrame(sp)
		spArg += sys.MinFrameSize
	}
	if narg > 0 {
		memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
		// This is a stack-to-stack copy. If write barriers
		// are enabled and the source stack is grey (the
		// destination is always black), then perform a
		// barrier copy. We do this *after* the memmove
		// because the destination stack may have garbage on
		// it.
		if writeBarrier.needed && !_g_.m.curg.gcscandone {
			f := findfunc(fn.fn)
			stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
			if stkmap.nbit > 0 {
				// We're in the prologue, so it's always stack map index 0.
				bv := stackmapdata(stkmap, 0)
				bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
			}
		}
	}

	memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
	newg.sched.sp = sp
	newg.stktopsp = sp
	newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	gostartcallfn(&newg.sched, fn)
	newg.gopc = callerpc
	newg.ancestors = saveAncestors(callergp)
	newg.startpc = fn.fn
	if _g_.m.curg != nil {
		newg.labels = _g_.m.curg.labels
	}
	if isSystemGoroutine(newg, false) {
		atomic.Xadd(&sched.ngsys, +1)
	}
	casgstatus(newg, _Gdead, _Grunnable)

	if _p_.goidcache == _p_.goidcacheend {
		// Sched.goidgen is the last allocated id,
		// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
		// At startup sched.goidgen=0, so main goroutine receives goid=1.
		_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
		_p_.goidcache -= _GoidCacheBatch - 1
		_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
	}
	newg.goid = int64(_p_.goidcache)
	_p_.goidcache++
	if raceenabled {
		newg.racectx = racegostart(callerpc)
	}
	if trace.enabled {
		traceGoCreate(newg, newg.startpc)
	}
	releasem(_g_.m)

	return newg
}

//go:nosplit
func acquirem() *m {
	_g_ := getg()
	_g_.m.locks++
	return _g_.m
}
//go:nosplit
func releasem(mp *m) {
	_g_ := getg()
	mp.locks--
	if mp.locks == 0 && _g_.preempt {
		// restore the preemption request in case we've cleared it in newstack
		_g_.stackguard0 = stackPreempt
	}
}

runtime.newproc1的处理如下:

  • 调用getg获取当前的g, 会编译为读取FS寄存器(TLS), 这里会获取到g0
  • 设置g对应的m的locks++, 禁止抢占
  • 获取m拥有的p
  • 新建一个g
    • 首先调用gfget从p.gfree获取g, 如果之前有g被回收在这里就可以复用
    • 获取不到时调用malg分配一个g, 初始的栈空间大小是2K
    • 需要先设置g的状态为已中止(_Gdead), 这样gc不会去扫描这个g的未初始化的栈
  • 把参数复制到g的栈上
  • 把返回地址复制到g的栈上, 这里的返回地址是goexit, 表示调用完目标函数后会调用goexit
  • 设置g的调度数据(sched)
    • 设置sched.sp等于参数+返回地址后的rsp地址
    • 设置sched.pc等于目标函数的地址, 查看gostartcallfn和gostartcall
    • 设置sched.g等于g
  • 设置g的状态为待运行(_Grunnable)
  • 调用runqput把g放到运行队列
    • 首先随机把g放到p.runnext, 如果放到runnext则入队原来在runnext的g
    • 然后尝试把g放到P的"本地运行队列"
    • 如果本地运行队列满了则调用runqputslow把g放到"全局运行队列"
      • runqputslow会把本地运行队列中一半的g放到全局运行队列, 这样下次就可以继续用快速的本地运行队列了
  • 如果当前有空闲的P, 但是无自旋的M(nmspinning等于0), 并且主函数已执行则唤醒或新建一个M
    • 这一步非常重要, 用于保证当前有足够的M运行G, 具体请查看上面的"空闲M链表"
    • 唤醒或新建一个M会通过wakep函数
      • 首先交换nmspinning到1, 成功再继续, 多个线程同时执行wakep只有一个会继续
      • 调用startm函数
        • 调用pidleget从"空闲P链表"获取一个空闲的P
        • 调用mget从"空闲M链表"获取一个空闲的M
        • 如果没有空闲的M, 则调用newm新建一个M
          • newm会新建一个m的实例, m的实例包含一个g0, 然后调用newosproc动一个系统线程
          • newosproc会调用syscall clone创建一个新的线程
          • 线程创建后会设置TLS, 设置TLS中当前的g为g0, 然后执行mstart
        • 调用notewakeup(&mp.park)唤醒线程

创建goroutine的流程就这么多了, 接下来看看M是如何调度的.


本文链接: http://www.dtmao.cc/news_show_550072.shtml

附件下载

上一篇:致青春

下一篇:1.1 Go安装

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?