让我们先看一下列代码的输出:
复制 package main
import "sync"
func main () {
var wg sync . WaitGroup
for _, v := range [ ... ] string { "Hello" , "World" , "Hello" , "Work" } {
wg. Add ( 1 )
go func () {
defer wg. Done ()
print (v, " " )
}()
}
wg. Wait ()
}
输出:
这段代码并没有按照数组的顺序输出,而是输出了四个"Work"
而要解释其原因我们就得从Go语言的并发组件说起
Go语言并发组件
goroutine
goroutine
是Go语言中最基本的组织单位之一。它们是一种被称为协程
的对线程更高级的抽象。不同于OS线程和有语言运行时管理的绿色线程,goroutine
与Go语言运行时深度集成,由GMP
统一管理。main()
函数也是由一个goroutine
运行(main goroutine
)
简单的说goroutine
是一个并发的函数(不一定并行),可以简单的在某函数之前添加go
关键字来触发。
所以我们可以尝试解释开始的那一段程序:遍历一个字符串数组,并将每个字符串在各自对应的goroutine
中打印。而通常情况下,在goroutine
开始执行开始执行之前,循环就已经结束了。但是这里有一个有趣的点,那就是既然循环已经结束,为什么还能访问到变量v
?
这是因为Go语言运行时会将变量v
的值由内存转移到堆中,这样goroutine
就不会意外的访问被释放的内存。然而由于v
始终执行同一块地址,所以每次循环都会覆盖上一次的值,最后goroutine
开始执行时获取到的便是最后赋上的值,于是才会出现输出了四个相同的值。
如果想要程序按照顺序输出,则只需要将v
的副本传入闭包即可:
复制 for _, v := range [ ... ] string { "Hello" , "World" , "Hello" , "Work" } {
wg. Add ( 1 )
go func (str string ) {
defer wg. Done ()
print (str, " " )
}(v)
}
此外goroutine
还有一个特点就是轻量!goroutine
之间的上下文切换(即一个被托管的并发程序必须保存他的状态以切换到另一个并发程序)的消耗远小于OS线程。
sync包
sync
包中包含了对内存同步访问最有用的并发原语。
WaitGroup
WaitGroup
常用来等待一组并发操作的完成,特别使用于当我们不关心并发操作的结果,或者有其他方法来收集他们的结果时。
正如开头那段代码中所使用的那样:
复制 var wg sync . WaitGroup
for _, v := range [ ... ] string { "Hello" , "World" , "Hello" , "Work" } {
wg. Add ( 1 )
go func () {
defer wg. Done ()
print (v, " " )
}()
}
wg. Wait ()
其中先创建了一个sync.WaitGroup
类型的变量,然后再每次循环中通过wg.Add()
方法增加计数器的增量,并在每个执行的goroutine
中调用wg.Done()
方法对计数器进行递减。而在main goroutine
中通过wg.Wati()
方法阻塞,直至计数器为0,然后结束程序。
互斥锁和读写锁
sync.Mutex
提供了一种安全的方式来表示对共享资源的独占访问,以此来保护临界区(即程序中需要独占访问共享资源的部分)。让我们看一下Mutex
的使用,并解释一下什么是饥饿
:
复制 var count int
var lock sync . Mutex
begin := make ( chan struct {})
// increment
go func () {
<- begin
for begin := time. Now (); time. Since (begin) <= 1 * time.Second; {
for i := 0 ; i <= 3 ; i ++ {
lock. Lock ()
time. Sleep ( 1 * time.Millisecond)
count ++
lock. Unlock ()
}
}
}()
// decrement
go func () {
<- begin
for begin := time. Now (); time. Since (begin) <= 1 * time.Second; {
lock. Lock ()
time. Sleep ( 3 * time.Nanosecond)
count -= 3
lock. Unlock ()
}
}()
close (begin)
time. Sleep ( 2 * time.Second)
println (count)
我们先忽视创建的begin
变量,其作用是用来让两个同时开始。
我们先创建了一个count
用于计数,其默认值为0。然后开启了两个goroutine
分别对其进行累加,和递减。不同之处在于递减操作每3ns执行一次,每次减3。累加操作每毫秒执行一次,每次累加1。两段代码都运行1s,然后在main goroutine
中等待2s,并输出count
的结果。
由于两个goroutine
都对共享变量count
进行了操作,所以需要在操作时通过Mutex
进行加锁,操作完成后再解锁。
正常来说,两个goroutine
等待的时间都是3ns,最后的输出应该差不多为0。但是多次运行后会发现,实际的数值往往小于0。
这是由于decrement的goroutine
将临界区扩大之后导致了与increment的goroutine
进行不公平的竞争,所产生的饥饿,这使得increment的goroutine
无法高效的工作。
除此之外,饥饿还有可能产生于数据库连接、文件句柄、CPU等任何必须共享的资源。
sync.RWMutex
概念上与sync.Mutex
一样,但是它允许我们对内存的读写分别控制。通过其各自的方法签名就可以看出来:
复制 type Mutex
func (m * Mutex) Lock ()
func (m * Mutex) Unlock ()
type RWMutex
func (rw * RWMutex) Lock ()
func (rw * RWMutex) RLock ()
func (rw * RWMutex) RLocker () Locker
func (rw * RWMutex) RUnlock ()
func (rw * RWMutex) Unlock ()
所以RWMutex
本身也是一个Locker
,通过以下代码我们可以看到读写锁的性能优势:
复制 test := func (count int , lock sync . Locker ) time . Duration {
var wg sync . WaitGroup
wg. Add (count)
begin := time. Now ()
for i := count; i > 0 ; i -- {
go func (wg * sync . WaitGroup , locker sync . Locker ) {
defer wg. Done ()
lock. Lock ()
time. Sleep ( 1 * time.Nanosecond)
lock. Unlock ()
}( & wg, lock)
}
wg. Wait ()
return time. Since (begin)
}
// format print
tw := tabwriter. NewWriter (os.Stdout, 0 , 2 , 2 , ' ' , 0 )
defer tabw. Flush ()
fmt. Fprintf (tw, "Counts\tRWMutext\tMutex\n" )
var rwLock sync . RWMutex
for i := 0 ; i < 8 ; i ++ {
count := int (math. Pow ( 2 , float64 (i)))
fmt. Fprintf (
tw,
" %d \t %v \t %v \n" ,
count,
test (count, rwLock. RLocker ()),
test (count, & rwLock))
}
首先我们创建了一个test()
方法来计算循环count次休眠1ns所的时间,等所有goroutine
执行完以后再返回所花费的时间。然后我们使用变量rwLock
指向一个sync.RWMutex
。然后我们分别将RLocker
和Locker
传入test()
方法。
至于为什么可以将sync.RWMutex
的指针传入 test()
方法,我们可以在sync
包中找到Locker
接口的声明:
复制 type Locker interface {
Lock ()
Unlock ()
}
而sync.RWMutex
的指针可以调用Locker
接口的方法:
复制 func (rw * RWMutex ) Lock ()
func (rw * RWMutex ) Unlock ()
最后让我们来看一下结果:
复制 Counts RWMutext Mutex
1 12.944ms 15.5537ms
2 18.6419ms 40.2317ms
4 20.2131ms 81.3651ms
8 20.4274ms 122.8911ms
16 20.3875ms 263.979ms
32 20.2892ms 545.0835ms
64 20.4077ms 1.1188133s
128 20.5967ms 2.2073484s
可以看到读写锁的效率远高于互斥锁,其根本原因就在于,在请求一个读写锁用于读处理时,允许人与数量的读锁同时添加,而互斥锁则必须等待别的锁释放之后才能获取。
cond
sync
包提供的cond
提供了一种很好的方式去处理当某个gouroutine
拿到锁吗,却不满足其他条件时的等待。
比如我们想要f1
方法需要得到f2
方法的允许才能打印f2
告诉它的内容,所以即使当f1
拿到锁也应该先让出来:
复制 var approved bool
var str string
var wg sync . WaitGroup
wg. Add ( 1 )
cond := sync. NewCond ( & sync . Mutex {})
f1 := func () {
cond.L. Lock ()
for ! approved {
fmt. Println ( "Wait..." )
cond. Wait ()
}
fmt. Println (str)
cond.L. Unlock ()
wg. Done ()
}
f2 := func () {
time. Sleep ( 1 ) // 减少f2的竞争,让f1极有可能先拿到锁
cond.L. Lock ()
str = "Hello Work!"
approved = true
cond.L. Unlock ()
cond. Signal ()
}
go f1 ()
go f2 ()
wg. Wait ()
我们先通过sync.NewCond(&sync.Mutex{})
创建了一个cond
,由于f2
休眠了以下,所以f1
极有可能先拿到锁。
并且在没得到允许的情况下执行了cond.Wait()
来将得到的锁释放,该方法会释放当前拿到的锁并挂起等待,直到有调用cond.Signal()
才会被唤醒。所以最后输出如下:
通过Wait()
等待的goroutine
会被运行时维护在一个FIFO的列表中,等待信号。 Signal()
会通知等待时间最长的goroutine
。
而Broadcast
则会同时通知所有等待的goroutine
,对以上代码稍作修改:
复制 var approved bool
var str string
var wg sync . WaitGroup
// wg.Add(1)
wg. Add ( 3 )
cond := sync. NewCond ( & sync . Mutex {})
f1 := func (name string ) {
cond.L. Lock ()
for ! approved {
fmt. Printf ( " %v is waiting...\n" , name)
cond. Wait ()
}
fmt. Println (str)
cond.L. Unlock ()
wg. Done ()
}
f2 := func () {
time. Sleep ( 1 )
cond.L. Lock ()
str = "Hello Work!"
approved = true
cond.L. Unlock ()
// cond.Signal()
cond. Broadcast ()
}
go f1 ( "zhangsan" )
go f1 ( "lisi" )
go f1 ( "wangwu" )
go f2 ()
wg. Wait ()
得到输出如下:
复制 lisi is waiting ...
zhangsan is waiting ...
wangwu is waiting ...
wangwu said: Hello Work !
lisi said: Hello Work !
zhangsan said: Hello Work !
once
让我们直接用下面几条代码片段和输出结果来了解以下sycn.Once
:
复制 var count int
increment := func () { count ++ }
var wg sync . WaitGroup
var once sync . Once
wg. Add ( 10 )
for i := 10 ;i > 0 ;i -- {
once. Do (increment)
wg. Done ()
}
wg. Wait ()
fmt. Println (count)
输出:
再看看多次调用once.Do()
复制 var count int
increment := func () { count ++ }
var once sync . Once
once. Do (increment)
once. Do (increment)
fmt. Println (count)
输出:
由此可以看到 sync.Once
只计算调用Do()
方法的次数,而不是多少次唯一调用Do()
方法,所以需要注意sync.Once
副本的使用。 再看一下循环调用:
复制 var count int
increment := func () { count ++ }
s := make ([] sync . Once , 1 , 1 )
for _, once := range s {
once. Do (increment)
}
s[ 0 ]. Do (increment)
fmt. Println (count)
输出:
最后再看一个死锁:
复制 var onceA, onceB sync . Once
var initB func ()
initA := func (){onceB. Do (initB)}
initB = func (){onceA. Do (initA)}
onceA. Do (initA)
池
当我们在程序中申请一块内存时,通常并不是直接向操作系统申请。Go语言会预先向操作系统申请一部分内存,然后由我们的程序直接使用,以提高性能。
相同的想法也被体现在sync.Pool
中。让我们看一下下面的例子:
复制 connectToService := func () interface {} {
time. Sleep ( 1 * time.Second)
fmt. Println ( "Connect to Service..." )
return struct {}{}
}
serviceConnPool := func () * sync . Pool {
p := & sync . Pool {
New: connectToService,
}
for i := 0 ; i < 5 ; i ++ {
p. Put (p. New ()) // 保存连接
}
return p
}
connPool := serviceConnPool () // 创建连接池
for i := 100 ; i > 0 ; i -- {
conn := connPool. Get () // 从连接池取出连接
fmt. Println (i, ": Get conn..." )
connPool. Put (conn) // 将实例放回连接池
}
我们通过connectionToService
来模拟了一个生成连接的方法。然后将其设置为sync.Pool
的New
,并循环创建了5次,然后存入变量p
中。最后,再模拟了100次获取连接,并在输出之后,将连接放回连接池。
当使用sync.Pool
时需要记住以下几点:
实例化sync.Pool
时通过传入一个生成资源的方法到New
,通过New()
生成资源时是线程安全的
sync.Pool
中New()
创建的资源应该是无状态的
从sync.Pool
通过Get()
取出的对象一定要通过Put()
放回,否则该实例将无法被复用
channel
channel
可以用来同步内存访问,但更多的被用于在goroutine
之间传递信息。创建一个channel
的方式如下:
复制 var c1 chan interface {}
c1 = make ( chan interface {})
c1
是一个可以接收或者发送任意类型的channel
(因为interface{}
可以存储任意类型),而chan
类型的变量还可以被定义为接收或者发送:
复制 var sendChan chan<- interface {}
var receiveChan <-chan interface {}
sendChan = c1
receiveChan = c1
由于发送和接收指的是chan
数据的流向,chan
变量名也通常被定义为以Stream
结尾。
Go语言中channel
是阻塞的,也就是说channel
中的数据需要被消费后才可以写入,或者数据被写入后才会读取。所以以下代码即使在main
函数中运行也会不立刻退出,而是在1s后输出"Hello Work!"然后在退出:
复制 strStream := make ( chan string )
go func () {
time. Sleep ( 1 * time.Second)
strStream <- "Hello Work!"
}()
str, ok := <- strStream
if ok{
fmt. Println (str)
}
同时由于channel
是阻塞的,所以如果没有向channel
写入值,而一直尝试读取的话就会产生死锁。
此外以上的代码从strStream
读取数据时返回了两个值,第二个值为true
则表示此次从该channel
取的值是新写入的数据,false
表示此次从该channel
取的值是由close channel
时生成的默认值。所以channel
的接收可以使用一个值接收,但通常应该在使用前先判断该值的返回方式。
而close channel
可以禁止某个channel
上的写入操作,但我们任然可以从该channel
读取数据:
复制 intStream := make ( chan int )
close (intStream)
i, ok := <- intStream
fmt. Printf ( "( %v ): %v " , ok, i)
输出:
range
和for
一起使用也可以从channel
中获取数据,并在channel
关闭时退出循环:
复制 dataStream := make ( chan int )
go func () {
defer close (dataStream)
for i := 0 ; i < 5 ; i ++ {
time. Sleep ( 1 * time.Second)
dataStream <- i
}
}()
for data := range dataStream {
fmt. Print (data, " " )
}
输出:
此外利用close channel
还可以使多个goroutine
同时开始执行:
复制 begin := make ( chan interface {})
var wg sync . WaitGroup
for i := 0 ;i < 3 ;i ++ {
wg. Add ( 1 )
go func (i int ) {
defer wg. Done ()
<- begin
fmt. Printf ( "( %v ) is beginning...\n" , i)
}(i)
}
fmt. Println ( "Start..." )
close (begin)
wg. Wait ()
输出:
复制 Start ...
( 0 ) is beginning ...
( 1 ) is beginning ...
( 2 ) is beginning ...
除此之外,我们还可以使用make
创建带有缓冲区的channel
。缓冲channel
是一个内存中的FIFO队列,用于并发程序通信。它能存储多个数据,直到数据满时阻塞写入,数据空时阻塞读取:
复制 bufferStream := make ( chan int , 3 )
bufferStream <- 1
bufferStream <- 2
bufferStream <- 3
fmt. Println ( <- bufferStream)
fmt. Println ( <- bufferStream)
fmt. Println ( <- bufferStream)
channel
的默认值是nil
,如果没有初始化channle
那么,对该channel
的读取或写入操作都会被阻塞,而关闭nil
的channel
则会报panic
复制 var nilChan chan interface {}
close (nilChan)
输出:
复制 panic: close of nil channel
channel
行为准则
channel
为我们提供了很好的goroutine
间通信,而我们需要做的则是再正确的环境中配置channel
,即分配channel
的所有权。换句话说就是我们需要明确哪个gouroutine
,拥有哪些 channel
,并对这些channel
有哪些权限。
听起来可能会很复杂,但是单向channel
使得channel
的所有权的划分相当清晰。它将允许我们区分channel
的所有者和使用者。我们需要先明确以下所有者和使用者的职责:
所有者:
执行写操作,或将所有权交给另一个goroutine
由于所有者可以初始化channel
,所以其可以消除操作nil channel
的风险,由于所有者决定channel
的关闭,所以消除了向关闭的channel
写入数据,以及多次关闭channel
的风险,由于所有者执行写入,所以消除了写入channel
数据类型异常的风险。
消费者:
让我们用下面的例子来帮助阐明一下这些概念:
复制 // producer
chanOwner := func () <-chan int {
resultStream := make ( chan int , 5 )
go func () {
defer close (resultStream)
for i := 0 ; i < 5 ; i ++ {
resultStream <- i
}
}()
return resultStream
}
c := chanOwner ()
// consumer
for v := range c{
fmt. Println (v)
}
通过chanOwner
所执行的函数的签名限制了返回的channel
的类型为只读。在函数中,我们不会创建nil channel
并写入或关闭它,或者多次关闭所创建的channel
。而在消费者中,range
可以阻塞channel
并知道chanenl
何时关闭。
同时在我们的程序中,应该尽可能的缩小channel
的所有权范围,这将使我们的系统更容易梳理。
select
select
语句将channel
于诸如取消、超时、等待和默认值之类的概念结合起来。select
可以阻塞并等待多个channel
直到某个channel
可以使用。用法很类似switch
语句,但select
中的case
没有顺序,即使不满足任何一个case
执行也不会失败。
以下是一个关于select
语法的简单示例:
复制 c1 := make ( chan int )
c2 := make ( chan int )
c3 := make ( chan<- int )
c3 <- 0
select {
case <- c1:
// do something
case v2 := <- c2:
fmt. Println (v2)
case c3 <- 0 :
// do something
}
以上例子将会阻塞,直到c1
或者c2
有数据传入或者c3
有数据读取。
对于select
还有两个有趣的问题:
对于第一个问题我们可以看一下下面的代码输出:
复制 c1 := make ( chan int ); close (c1)
c2 := make ( chan int ); close (c2)
var count1,count2 int
for i := 0 ;i < 10000 ;i ++ {
select {
case <- c1:
count1 ++
case <- c2:
count2 ++
}
}
println (count1,count2)
输出:
多次运行后会发现,两个case
执行大致是平均的。其原因很简单,因为Go语言运行时不能确定哪一个case
中的是更需要被执行的,所以平均是最好的选择。
对于第二个问题,我们也可以看一个简单的例子:
复制 c := make ( chan int )
select {
case <- c:
}
输出:
复制 fatal error : all goroutines are asleep - deadlock !
对于没有可用的case
的情况,通常有两种常见的方式:
第一种设置默认退出时间:
复制 c := make ( chan int )
select {
case <- c:
// do something
case <- time. After ( 1 * time.Second):
fmt. Println ( "timeout" )
}
第二种是利用for
循环和select
一起使用来处理,这样可以在某一个case
等待的过程中,执行一些别的操作:
复制 c := make ( chan int )
go func () { time. Sleep ( 3 * time.Second); close (c) }()
var i int
loop:
for {
select {
case <- c:
break loop
default :
// do something
}
i ++
time. Sleep ( 1 * time.Second)
}
fmt. Printf ( "cycle times: %v " , i)
最后看一个select
将会永远阻塞的用法:
context包
context
包是Go1.7版本引入的。假设我们将一个goroutine
称为父goroutine
,其创建的其他goroutine
称为子goroutine
,context
为我们在父goroutine
单向 的同步信号,以及传递信息提供了极大的便利。
此外,context
还允许各子goroutine
在父goroutine
的控制下,采用各自不同的控制程序的方式。
context
包相当简单:
复制 var Canceled = errors. New ( "context canceled" )
var DeadlineExceeded error = deadlineExceededError {}
type CancelFunc func ()
type Context interface {}
复制 type emptyCtx int
type cancelCtx struct {}
type timerCtx struct {}
type valueCtx struct {}
复制 func Background () Context
func TODO () Context
func WithCancel (parent Context ) (ctx Context , cancel CancelFunc )
func WithDeadline (parent Context , d time . Time ) ( Context , CancelFunc )
func WithTimeout (parent Context , timeout time . Duration ) ( Context , CancelFunc )
func WithValue (parent Context , key, val interface {}) Context
除了定义的常量和类型,四个以Ctx
为后缀的类型除了emptyCtx
以外,都是对Context
的封装。Background()
和TODO()
都是用来返回一个空的Context
类型的实例。
其余三个With
前缀的方法除了都接收一个Context
类型的参数,然后返回一个Context
的实例和一个可以取消该返回Context
的CancelFunc
类型的方法。
除了emptyCtx
,其余的Context
创建都是基于一个Context
创建。通常用Background()
来获取一个根Context
,基于这个Context
来衍生出其他的Context
。而衍生出来的Context
将被添加到原有Context
的children
字段中。基于每个Context
可以创建多个Context
,由此形成一棵树:
我们可以在Context
接口的定义中看到,取消一个Context
有什么用:
复制 type Context interface {
Deadline () (deadline time . Time , ok bool )
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled.
Done () <-chan struct {}
Err () error
Value (key interface {}) interface {}
}
Done()
方法返回一个当Context
被取消时,将会关闭的通道。再扒一下cancel()
的代码:
复制 func (c * cancelCtx ) cancel (removeFromParent bool , err error ) {
// ...
if c.done == nil {
c.done = closedchan
} else {
close (c.done)
}
for child := range c.children {
child. cancel ( false , err)
}
// ...
}
当调用cancel()
时,关闭了done
通道,随后调用了children
的cancel()
。这里的children
即是通过WithCancel()
等方法返回的Context
实例对应的cancelCtx
。
所以说context
是单向同步信号的,因为衍生的Context
可以其原有的Context
取消,而不能取消原有的Context
。
valueCtx
通过封装Context
并添加额外键值对的方式来存储数据:
复制 type valueCtx struct {
Context
key, val interface {}
}
其寻找数据的规则也可以通过Value()
方法看到:
复制 func (c * valueCtx ) Value (key interface {}) interface {} {
if c.key == key { // 首先在自己保存的数据中找
return c.val
}
return c.Context. Value (key) // 然后在父Context中找
}
此外Context
的数据流动是单项的,所以不是试图修改valueCtx
里的值。 我们还可以看到,valueCtx
只能存一对值,并且父Context
无法查找子Context
的值。而且如果父子Context
的key
冲突时,无法获取到父Context
的值。所以建议使用自定义类型作为key:
复制 type keyctxa string
type keyctxb string
ctx := context. Background ()
var k1 keyctxa = "KeyA"
ctxA := context. WithValue (ctx, k1, 1 )
var k2 keyctxb = "KeyA"
ctxB := context. WithValue (ctxA, k2, 2 )
fmt. Println (ctxA. Value (k1))
fmt. Println (ctxB. Value (k2))
最后是一个使用context
启用多个goroutine
搜索切片中某个值的简单例子:
复制 func Search (slice [] int , target int ) ( bool , error ) {
done := make ( chan bool ) // 由子goroutine向上传递完成信号
ctx := context. Background () // 获取一个根Context
ctx, cancel := context. WithTimeout (ctx, 3 * time.Second) // 设置超时
defer cancel ()
length := len (slice)
index := length / 2 + 1
go gSearch (ctx, done, slice[:index], target)
go gSearch (ctx, done, slice[index:], target)
select {
case <- ctx. Done (): // 超时或被取消时将会返回关闭的通道
return false , ctx. Err ()
case <- done: // goroutine 完成所返回的信号
cancel () // 向所有子goroutine发送取消信号
return true , nil
}
}
复制 func gSearch (ctx context . Context , done chan<- bool , slice [] int , target int ) {
var count int
BEGIN:
for _, v := range slice {
select {
case <- ctx. Done (): // 接收到取消信号将结束搜索
fmt. Println (ctx. Err ())
break BEGIN
default :
if v == target {
done <- true // 搜索到将会返回完成信号
break BEGIN
}
}
count ++
}
fmt. Println ( "goroutine find: " , count, "times" )
}
GMP
正如我们在groutine
一节中提到的,Go语言关于并发处理的强大之处在于语言本身对线程的抽象。Go语言的调度器则专门用于处理goroutine
和OS线程之间的调度。
让我们先了解以下Go调度器的三个主要概念:
G: groutine
M: OS线程(源码中也被称为机器) P: 上下文(源码中也被称为处理器)
其关系如下图所示:
有一点需要提到,通过runtime.GOMAXPROCS()
方法可以设置Go语言运行时的上下文数量。在1.5版本之前默认为1,通常通过:
复制 runtime. GOMAXPROCS (runtime. NumCPU ())
来充分利用机器上的所有CPU核心,而随后的版本默认设置为主机上的逻辑CPU数量。
工作窃取
现在我们知道P将会为我们调度G到M上运行,而具体是如何调度的则是通过一种被称为工作窃取 的策略。
在了解工作窃取之前,让我们先了解以下,公平调度策略。假设有n个处理器,还有x个任务需要被执行,那么每个处理器在公平调度策略下都会分到x/n个任务。 这看起来似乎很合理,但是在Go语言中采用的是fork-join
模型。fork-join
模型可以使得程序在指定一点上“分叉”而开始并行执行,在随后的一点上“合并”并恢复顺序执行。
假设我们现在有四个任务,分配给两个处理器执行。T1依赖于T4的执行,而T2的执行时间比T1和T3的执行时间总和都要长。与此同时T1和T3分配给处理器P1,T2和T4分配给了P2,那么两个处理器运行的时间将会如下所示:
复制 Time P1 P2
n+a T1 T2
n+b (blocked) T2
n+b+d (blocked) T4
n+b+d+a T1 (idle)
n+b+d+a+c T3 (idle)
我们可以看到,P1在T1等待T4完成的时候是出于阻塞状态的,而且P2在完成T4后T3在等待被处理,而P2却是空闲状态。由此我们可以看到公平调度策略下存在一些问题。
工作窃取算法基于每个处理器独自的任务处理队列,并允许某个处理器从别的处理器准备处理的任务中获取任务并处理。该算法遵循以下一些基本原则,对于给定的线程:
如果有线程空闲,则选取一个随机的线程,从它关联的队列的头部窃取工作
如果在未准备好的join-point
(即与之同步的goroutine
还没有完成),则将工作从线程队列的尾部出栈。
如果线程的任务队列是空的,则:
a. 暂停加入
b. 从别的线程队列中随机窃取工作
让我们通过一个Fibonacci数列的计算来看看工作窃取算法是如何工作的:
复制 var fib func (n int ) <-chan int
fib = func (n int ) <-chan int {
result := make ( chan int )
go func () {
defer close (result)
if n <= 2 {
result <- 1
return
}
result <- <- fib (n - 1 ) + <- fib (n - 2 )
}()
return result
}
fmt. Println ( <- fib ( 4 ))
首先T1将fib(4)
创建在自己的队列,此时有可能T2从T1的队列中盗取了fib(4)
,这里我们假设T1执行了fib(4)
。然后由于+
操作符从左到右计算,所有在T1的队列中先创建fib(3)
然后创建fib(2)
复制 Step T1 Stack T1 Queue T2 Stack T2 Queue
1 (main goroutine )
2 (main goroutine ) fib( 4 )
3 (main goroutine )( wait join )
fib(4 )
4 (main goroutine )( wait join ) fib( 3 )
fib(4 ) fib( 2 )
此时T2依然是空闲的,它将会从T1的队列头部取出fib(3)
并执行。(通常fib(3)
会更有可能被T1执行,之后会讨论其原因,这里为了演示方便假设取出的是fib(3)
)T1的fib(4)
进入等待后取出fib(2)
执行
复制 Step T1 Stack T1 Queue T2 Stack T2 Queue
5 (main goroutine)(wait join) fib ( 2 ) fib ( 3 )
fib ( 4 )
6 (main goroutine)(wait join) fib ( 3 )
fib ( 4 )(wait join)
fib ( 2 )
7 (main goroutine)(wait join) fib ( 3 ) fib ( 2 )
fib ( 4 )(wait join) fib ( 1 )
fib ( 2 )
T2执行fib(3)
后同样创建了fib(2)
和fib(1)
在自己的队列,此时T1执行完自己的任务后,从T2中窃取了任务fib(2)
,并执行
复制 Step T1 Stack T1 Queue T2 Stack T2 Queue
8 (main goroutine)(wait join) fib ( 3 )(wait join) fib ( 2 )
fib ( 4 )(wait join) fib ( 1 )
( return 1 )
9 (main goroutine)(wait join) fib ( 3 )(wait join)
fib ( 4 )(wait join) fib ( 1 )
fib ( 2 )
10 (main goroutine)(wait join) fib ( 3 )(wait join)
fib ( 4 )(wait join) ( return 1 )
( return 1 )
T2执行的fib(3)
完成join,返回后T1上的fib(4)
也完成了join
复制 Step T1 Stack T1 Queue T2 Stack T2 Queue
11 (main goroutine)(wait join) ( return 2 )
fib ( 4 )(wait join)
12 (main goroutine)(wait join)
( return 3 )
最后继续执行main goroutine
并将<-fib(4)
打印出来。
总的来说这种任务调度方式在性能上有很多隐含的好处
续体窃取
由于在Go语言中goroutine
很好的封装了一个工作体,所以我们会很自然的认为Go会将任务或者goroutine
进行排队。实际上这并不是Go语言工作窃取算法的原理
Go语言工作窃取算法是对续体 进行入队和窃取的。在Go语言中,goroutine
就是任务,而goroutine
之后的都被称为续体
当一个执行线程到达一个join point
时,该线程必须暂停执行,并等待回调,这被称为延迟join
。任务窃取和续体窃取都存在延迟join
,但发生频率有显著差异。
试想一下:当创建一个goroutine
时,通常我们会希望该goroutine
会被立即执行,以尽快到达join point
。同时,执行其续体的goroutine
也会想尽快join回原来的goroutine
。在新创建的goroutine
完成之前,续体尝试join的情况也并不罕见。
对于这些情况有一条公理,即在调度goroutine
时,最合理的是立即开始执行这个goroutine
同样是刚才的Fibonacci函数,我们先做一下约定:
再让我们采用续体窃取的方式模拟一下运行过程:
首先T1执行main()
,然后创建并执行fib(4)
,将续体conf.of main
放入队列
复制 Step T1 Stack T1 Queue T2 Stack T2 Queue
1 main
2 fib ( 4 ) conf.of main
随后续体conf.of main
被T2窃取,T1执行fib(4)
后将续体conf.of fib(4)
加入队列尾部,T1继续执行fib(3)
复制 Step T1 Stack T1 Queue T2 Stack T2 Queue
3 fib ( 3 ) conf.of fib ( 4 ) conf.of main
T2到达到续体conf.of main
的join point
后,继续从T1的队列中窃取任务conf.of fib(4)
执行,T1也同时执行fib(3)
并将执行fib(3)
产生的续体放入队列,接着执行fib(3)
创建的fib(2)
复制 Step T1 Stack T1 Queue T2 Stack T2 Queue
4 fib ( 2 ) conf.of fib ( 3 ) conf.of main ( wait join )
conf.of fib ( 4 )
随后T1执行完fib(2)
准备返回,T2从续体conf.of fib(4)
中执行fib(2)
复制 Step T1 Stack T1 Queue T2 Stack T2 Queue
5 ( return 1 ) conf.of fib ( 3 ) conf.of main (wait join) conf.of fib ( 4 )
fib ( 2 )
T1继续从队列取出续体conf.of fib(3)
执行其中的fib(1)
,T2也返回了fib(2)
的计算,并完成续体conf.of fib(4)
的运行,在fib(4)
的join point
等待
复制 Step T1 Stack T1 Queue T2 Stack T2 Queue
6 fib ( 1 ) conf.of main ( wait join ) conf.of fib ( 4 )
( return 1 )
7 (return 1 ) conf.of main( wait join )
fib(4 )( wait join )
最后T1完成fib(1)
后返回并完成join,T2打印结果
复制 Step T1 Stack T1 Queue T2 Stack T2 Queue
8 conf.of main (wait join)
( return 3 )
9 main (print 3 )
我们可以看到,通过续体窃取,在线程上的延迟join
都发生在空闲的线程上,并且调用栈的最大深度也低于任务窃取。并如果我们让同样的代码在单个线程上执行之后会发现,它的运行就像我们使用普通函数一样。从理论上来说,续体窃取是被任务优于任务窃取的。
全局上下文
了解完Go语言的窃取之后,还有最后一个问题。当执行goroutine
的系统线程阻塞时,Go语言将如何处理?
从逻辑上讲,当执行G的M阻塞时,对应G也将被阻塞。这似乎很合理,但是,从性能上看,Go语言应该尽可能多的保证P活跃。
在上述情况中,Go语言会将阻塞的M与该P分离,并将该P切换到另一个无阻塞的M上,阻塞的G任然与阻塞的M关联。当G阻塞结束时,操作系统会尝试使用另一个M来回退P,以便能执行这个阻塞的G。但是,这并不总是能顺利进行,如果失败,线程会把该G放在全局上下文 中,然后线程休眠,并放回Go运行时的线程池,以供将来使用。(例如goroutine
被再次阻塞)
在窃取算法中,P也会定期检查全局上下文,以查看是否有可执行的G。