Concurrency in Go

让我们先看一下列代码的输出:

package main

import "sync"

func main() {
	var wg sync.WaitGroup
	for _, v := range [...]string{"Hello", "World", "Hello", "Work"} {
		wg.Add(1)
		go func() {
			defer wg.Done()
			print(v," ")
		}()
	}
	wg.Wait()
}

输出:

Work Work Work Work 

这段代码并没有按照数组的顺序输出,而是输出了四个"Work"

而要解释其原因我们就得从Go语言的并发组件说起

Go语言并发组件

goroutine

goroutine是Go语言中最基本的组织单位之一。它们是一种被称为协程的对线程更高级的抽象。不同于OS线程和有语言运行时管理的绿色线程,goroutine与Go语言运行时深度集成,由GMP统一管理。main()函数也是由一个goroutine运行(main goroutine

简单的说goroutine是一个并发的函数(不一定并行),可以简单的在某函数之前添加go关键字来触发。

所以我们可以尝试解释开始的那一段程序:遍历一个字符串数组,并将每个字符串在各自对应的goroutine中打印。而通常情况下,在goroutine开始执行开始执行之前,循环就已经结束了。但是这里有一个有趣的点,那就是既然循环已经结束,为什么还能访问到变量v

这是因为Go语言运行时会将变量v的值由内存转移到堆中,这样goroutine就不会意外的访问被释放的内存。然而由于v始终执行同一块地址,所以每次循环都会覆盖上一次的值,最后goroutine开始执行时获取到的便是最后赋上的值,于是才会出现输出了四个相同的值。

如果想要程序按照顺序输出,则只需要将v的副本传入闭包即可:

for _, v := range [...]string{"Hello", "World", "Hello", "Work"} {
    wg.Add(1)
    go func(str string) {
        defer wg.Done()
        print(str, " ")
    }(v)
}

此外goroutine还有一个特点就是轻量!goroutine之间的上下文切换(即一个被托管的并发程序必须保存他的状态以切换到另一个并发程序)的消耗远小于OS线程。

sync包

sync包中包含了对内存同步访问最有用的并发原语。

WaitGroup

WaitGroup常用来等待一组并发操作的完成,特别使用于当我们不关心并发操作的结果,或者有其他方法来收集他们的结果时。

正如开头那段代码中所使用的那样:

var wg sync.WaitGroup
for _, v := range [...]string{"Hello", "World", "Hello", "Work"} {
    wg.Add(1)
    go func() {
        defer wg.Done()
        print(v," ")
    }()
}
wg.Wait()

其中先创建了一个sync.WaitGroup类型的变量,然后再每次循环中通过wg.Add()方法增加计数器的增量,并在每个执行的goroutine中调用wg.Done()方法对计数器进行递减。而在main goroutine中通过wg.Wati()方法阻塞,直至计数器为0,然后结束程序。

互斥锁和读写锁

sync.Mutex提供了一种安全的方式来表示对共享资源的独占访问,以此来保护临界区(即程序中需要独占访问共享资源的部分)。让我们看一下Mutex的使用,并解释一下什么是饥饿

var count int
var lock sync.Mutex
begin := make(chan struct{})

// increment
go func() {
    <-begin
    for begin := time.Now(); time.Since(begin) <= 1*time.Second; {
        for i := 0; i <= 3; i++ {
            lock.Lock()
            time.Sleep(1 * time.Millisecond)
            count++
            lock.Unlock()
        }
    }
}()

// decrement
go func() {
    <-begin
    for begin := time.Now(); time.Since(begin) <= 1*time.Second; {
        lock.Lock()
        time.Sleep(3 * time.Nanosecond)
        count -= 3
        lock.Unlock()
    }
}()

close(begin)
time.Sleep(2 * time.Second)
println(count)

我们先忽视创建的begin变量,其作用是用来让两个同时开始。

我们先创建了一个count用于计数,其默认值为0。然后开启了两个goroutine分别对其进行累加,和递减。不同之处在于递减操作每3ns执行一次,每次减3。累加操作每毫秒执行一次,每次累加1。两段代码都运行1s,然后在main goroutine中等待2s,并输出count的结果。

由于两个goroutine都对共享变量count进行了操作,所以需要在操作时通过Mutex进行加锁,操作完成后再解锁。

正常来说,两个goroutine等待的时间都是3ns,最后的输出应该差不多为0。但是多次运行后会发现,实际的数值往往小于0。

这是由于decrement的goroutine将临界区扩大之后导致了与increment的goroutine进行不公平的竞争,所产生的饥饿,这使得increment的goroutine无法高效的工作。

除此之外,饥饿还有可能产生于数据库连接、文件句柄、CPU等任何必须共享的资源。

sync.RWMutex概念上与sync.Mutex一样,但是它允许我们对内存的读写分别控制。通过其各自的方法签名就可以看出来:

type Mutex
    func (m *Mutex) Lock()
    func (m *Mutex) Unlock()
type RWMutex
    func (rw *RWMutex) Lock()
    func (rw *RWMutex) RLock()
    func (rw *RWMutex) RLocker() Locker
    func (rw *RWMutex) RUnlock()
    func (rw *RWMutex) Unlock()

所以RWMutex本身也是一个Locker,通过以下代码我们可以看到读写锁的性能优势:

test := func(count int, lock sync.Locker) time.Duration {
    var wg sync.WaitGroup
    wg.Add(count)
    begin := time.Now()
    for i := count; i > 0; i-- {
        go func(wg *sync.WaitGroup, locker sync.Locker) {
            defer wg.Done()
            lock.Lock()
            time.Sleep(1 * time.Nanosecond)
            lock.Unlock()
        }(&wg, lock)
    }
    wg.Wait()
    return time.Since(begin)
}

// format print
tw := tabwriter.NewWriter(os.Stdout, 0, 2, 2, ' ', 0)
defer tabw.Flush()
fmt.Fprintf(tw, "Counts\tRWMutext\tMutex\n")

var rwLock sync.RWMutex
for i := 0; i < 8; i++ {
    count := int(math.Pow(2, float64(i)))
    fmt.Fprintf(
        tw,
        "%d\t%v\t%v\n",
        count,
        test(count, rwLock.RLocker()),
        test(count, &rwLock))
}

首先我们创建了一个test()方法来计算循环count次休眠1ns所的时间,等所有goroutine执行完以后再返回所花费的时间。然后我们使用变量rwLock指向一个sync.RWMutex。然后我们分别将RLockerLocker传入test()方法。

至于为什么可以将sync.RWMutex的指针传入 test()方法,我们可以在sync包中找到Locker接口的声明:

type Locker interface {
	Lock()
	Unlock()
}

sync.RWMutex的指针可以调用Locker接口的方法:

func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()

最后让我们来看一下结果:

Counts  RWMutext   Mutex
1       12.944ms   15.5537ms
2       18.6419ms  40.2317ms
4       20.2131ms  81.3651ms
8       20.4274ms  122.8911ms
16      20.3875ms  263.979ms
32      20.2892ms  545.0835ms
64      20.4077ms  1.1188133s
128     20.5967ms  2.2073484s

可以看到读写锁的效率远高于互斥锁,其根本原因就在于,在请求一个读写锁用于读处理时,允许人与数量的读锁同时添加,而互斥锁则必须等待别的锁释放之后才能获取。

cond

sync包提供的cond提供了一种很好的方式去处理当某个gouroutine拿到锁吗,却不满足其他条件时的等待。

比如我们想要f1方法需要得到f2方法的允许才能打印f2告诉它的内容,所以即使当f1拿到锁也应该先让出来:

var approved bool
var str string
var wg sync.WaitGroup
wg.Add(1)

cond := sync.NewCond(&sync.Mutex{})

f1 := func() {
    cond.L.Lock()
    for !approved {
        fmt.Println("Wait...")
        cond.Wait()
    }
    fmt.Println(str)
    cond.L.Unlock()
    wg.Done()
}

f2 := func() {
    time.Sleep(1)		// 减少f2的竞争,让f1极有可能先拿到锁
    cond.L.Lock()
    str = "Hello Work!"
    approved = true
    cond.L.Unlock()
    cond.Signal()
}

go f1()
go f2()

wg.Wait()

我们先通过sync.NewCond(&sync.Mutex{})创建了一个cond,由于f2休眠了以下,所以f1 极有可能先拿到锁。

并且在没得到允许的情况下执行了cond.Wait()来将得到的锁释放,该方法会释放当前拿到的锁并挂起等待,直到有调用cond.Signal()才会被唤醒。所以最后输出如下:

Wait...
Hello Work!

通过Wait()等待的goroutine会被运行时维护在一个FIFO的列表中,等待信号。 Signal()会通知等待时间最长的goroutine

Broadcast则会同时通知所有等待的goroutine,对以上代码稍作修改:

var approved bool
var str string
var wg sync.WaitGroup
// wg.Add(1)
wg.Add(3)

cond := sync.NewCond(&sync.Mutex{})

f1 := func(name string) {
    cond.L.Lock()
    for !approved {
        fmt.Printf("%v is waiting...\n", name)
        cond.Wait()
    }
    fmt.Println(str)
    cond.L.Unlock()
    wg.Done()
}

f2 := func() {
    time.Sleep(1)
    cond.L.Lock()
    str = "Hello Work!"
    approved = true
    cond.L.Unlock()
    // cond.Signal()
    cond.Broadcast()
}

go f1("zhangsan")
go f1("lisi")
go f1("wangwu")
go f2()

wg.Wait()

得到输出如下:

lisi is waiting...
zhangsan is waiting...
wangwu is waiting...
wangwu said: Hello Work! 
lisi said: Hello Work! 
zhangsan said: Hello Work! 

once

让我们直接用下面几条代码片段和输出结果来了解以下sycn.Once

var count int
increment := func() { count++ }
var wg sync.WaitGroup
var once sync.Once

wg.Add(10)
for i:=10;i>0;i--{
    once.Do(increment)
    wg.Done()
}
wg.Wait()
fmt.Println(count)

输出:

1

再看看多次调用once.Do()

var count int
increment := func() { count++ }
var once sync.Once

once.Do(increment)
once.Do(increment)

fmt.Println(count)

输出:

1

由此可以看到 sync.Once只计算调用Do()方法的次数,而不是多少次唯一调用Do()方法,所以需要注意sync.Once副本的使用。 再看一下循环调用:

var count int
increment := func() { count++ }

s := make([]sync.Once, 1, 1)

for _, once := range s {
    once.Do(increment)
}
s[0].Do(increment)

fmt.Println(count)

输出:

2

最后再看一个死锁:

var onceA, onceB sync.Once
var initB func()
initA := func(){onceB.Do(initB)}
initB = func(){onceA.Do(initA)}
onceA.Do(initA)

当我们在程序中申请一块内存时,通常并不是直接向操作系统申请。Go语言会预先向操作系统申请一部分内存,然后由我们的程序直接使用,以提高性能。

相同的想法也被体现在sync.Pool中。让我们看一下下面的例子:

connectToService := func() interface{} {
    time.Sleep(1 * time.Second)
    fmt.Println("Connect to Service...")
    return struct{}{}
}
serviceConnPool := func() *sync.Pool {
    p := &sync.Pool{
        New: connectToService,
    }
    for i := 0; i < 5; i++ {
        p.Put(p.New())		// 保存连接
    }
    return p
}

connPool := serviceConnPool()		// 创建连接池

for i := 100; i > 0; i-- {
    conn := connPool.Get()			// 从连接池取出连接
    fmt.Println(i, ": Get conn...")		
    connPool.Put(conn)				// 将实例放回连接池
}

我们通过connectionToService来模拟了一个生成连接的方法。然后将其设置为sync.PoolNew,并循环创建了5次,然后存入变量p中。最后,再模拟了100次获取连接,并在输出之后,将连接放回连接池。

当使用sync.Pool时需要记住以下几点:

  • 实例化sync.Pool时通过传入一个生成资源的方法到New,通过New()生成资源时是线程安全的

  • sync.PoolNew()创建的资源应该是无状态的

  • sync.Pool通过Get()取出的对象一定要通过Put()放回,否则该实例将无法被复用

channel

channel可以用来同步内存访问,但更多的被用于在goroutine之间传递信息。创建一个channel的方式如下:

var c1 chan interface{}	
c1 = make(chan interface{})

c1 是一个可以接收或者发送任意类型的channel(因为interface{}可以存储任意类型),而chan类型的变量还可以被定义为接收或者发送:

var sendChan chan<- interface{}
var receiveChan <-chan interface{}
sendChan = c1
receiveChan = c1

由于发送和接收指的是chan数据的流向,chan变量名也通常被定义为以Stream结尾。

Go语言中channel是阻塞的,也就是说channel中的数据需要被消费后才可以写入,或者数据被写入后才会读取。所以以下代码即使在main函数中运行也会不立刻退出,而是在1s后输出"Hello Work!"然后在退出:

strStream := make(chan string)
go func() {
    time.Sleep(1*time.Second)
    strStream <- "Hello Work!"
}()

str, ok := <-strStream
if ok{
    fmt.Println(str)
}

同时由于channel是阻塞的,所以如果没有向channel写入值,而一直尝试读取的话就会产生死锁。

此外以上的代码从strStream读取数据时返回了两个值,第二个值为true则表示此次从该channel取的值是新写入的数据,false表示此次从该channel取的值是由close channel时生成的默认值。所以channel的接收可以使用一个值接收,但通常应该在使用前先判断该值的返回方式。

close channel可以禁止某个channel上的写入操作,但我们任然可以从该channel读取数据:

intStream := make(chan int)
close(intStream)

i, ok := <-intStream
fmt.Printf("(%v): %v", ok, i)

输出:

(false): 0

rangefor一起使用也可以从channel中获取数据,并在channel关闭时退出循环:

dataStream := make(chan int)
go func() {
    defer close(dataStream)
    for i := 0; i < 5; i++ {
        time.Sleep(1 * time.Second)
        dataStream <- i
    }
}()

for data := range dataStream {
    fmt.Print(data, " ")
}

输出:

0 1 2 3 4 

此外利用close channel还可以使多个goroutine同时开始执行:

begin := make(chan interface{})
var wg sync.WaitGroup

for i:=0;i<3;i++{
    wg.Add(1)
    go func(i int) {
        defer wg.Done()
        <- begin
        fmt.Printf("(%v) is beginning...\n", i)
    }(i)
}

fmt.Println("Start...")
close(begin)
wg.Wait()

输出:

Start...
(0) is beginning...
(1) is beginning...
(2) is beginning...

除此之外,我们还可以使用make创建带有缓冲区的channel。缓冲channel是一个内存中的FIFO队列,用于并发程序通信。它能存储多个数据,直到数据满时阻塞写入,数据空时阻塞读取:

bufferStream := make(chan int, 3)
bufferStream <- 1
bufferStream <- 2
bufferStream <- 3
fmt.Println(<-bufferStream)
fmt.Println(<-bufferStream)
fmt.Println(<-bufferStream)

channel的默认值是nil,如果没有初始化channle那么,对该channel的读取或写入操作都会被阻塞,而关闭nilchannel则会报panic

var nilChan chan interface{}
close(nilChan)

输出:

panic: close of nil channel

channel行为准则

channel为我们提供了很好的goroutine间通信,而我们需要做的则是再正确的环境中配置channel,即分配channel的所有权。换句话说就是我们需要明确哪个gouroutine,拥有哪些 channel,并对这些channel有哪些权限。

听起来可能会很复杂,但是单向channel使得channel的所有权的划分相当清晰。它将允许我们区分channel的所有者和使用者。我们需要先明确以下所有者和使用者的职责:

所有者:

  1. 实例化channel

  2. 执行写操作,或将所有权交给另一个goroutine

  3. 关闭channel

  4. 将该channel暴露为只读

由于所有者可以初始化channel,所以其可以消除操作nil channel的风险,由于所有者决定channel的关闭,所以消除了向关闭的channel写入数据,以及多次关闭channel的风险,由于所有者执行写入,所以消除了写入channel数据类型异常的风险。

消费者:

  1. 知道channel是何时关闭的

  2. 正确处理阻塞

让我们用下面的例子来帮助阐明一下这些概念:

// producer
chanOwner := func() <-chan int {
    resultStream := make(chan int, 5)
    go func() {
        defer close(resultStream)
        for i := 0; i < 5; i++ {
            resultStream <- i
        }
    }()
    return resultStream
}

c := chanOwner()

// consumer
for v := range c{
    fmt.Println(v)
}

通过chanOwner所执行的函数的签名限制了返回的channel的类型为只读。在函数中,我们不会创建nil channel并写入或关闭它,或者多次关闭所创建的channel。而在消费者中,range可以阻塞channel并知道chanenl何时关闭。

同时在我们的程序中,应该尽可能的缩小channel的所有权范围,这将使我们的系统更容易梳理。

select

select语句将channel于诸如取消、超时、等待和默认值之类的概念结合起来。select可以阻塞并等待多个channel直到某个channel可以使用。用法很类似switch语句,但select中的case没有顺序,即使不满足任何一个case执行也不会失败。

以下是一个关于select语法的简单示例:

c1 := make(chan int)
c2 := make(chan int)
c3 := make(chan<- int)
c3 <- 0

select {
    case <-c1:
    // do something
    case v2 := <-c2:
    fmt.Println(v2)
    case c3 <- 0:
    // do something
}

以上例子将会阻塞,直到c1或者c2有数据传入或者c3有数据读取。

对于select还有两个有趣的问题:

  • 多个case同时可用select将会如何处理?

  • 没有case可用将会如何处理?

对于第一个问题我们可以看一下下面的代码输出:

c1 := make(chan int); close(c1)
c2 := make(chan int); close(c2)
var count1,count2 int 

for i:=0;i<10000;i++ {
    select {
        case <-c1:
        count1++
        case <-c2:
        count2 ++
    }
}
println(count1,count2)

输出:

4970 5030

多次运行后会发现,两个case执行大致是平均的。其原因很简单,因为Go语言运行时不能确定哪一个case中的是更需要被执行的,所以平均是最好的选择。

对于第二个问题,我们也可以看一个简单的例子:

c := make(chan int)
select {
    case <-c:
}

输出:

fatal error: all goroutines are asleep - deadlock!

对于没有可用的case的情况,通常有两种常见的方式:

第一种设置默认退出时间:

c := make(chan int)
select {
    case <-c:
    	// do something
    case <-time.After(1 * time.Second):
    fmt.Println("timeout")
}

第二种是利用for循环和select一起使用来处理,这样可以在某一个case等待的过程中,执行一些别的操作:

c := make(chan int)
go func() { time.Sleep(3 * time.Second); close(c) }()

var i int
loop:
for {
    select {
        case <-c:
        break loop
        default:
        // do something
    }
    i++
    time.Sleep(1 * time.Second)
}
fmt.Printf("cycle times: %v", i)

最后看一个select将会永远阻塞的用法:

select {}

context包

context包是Go1.7版本引入的。假设我们将一个goroutine称为父goroutine,其创建的其他goroutine称为子goroutinecontext为我们在父goroutine单向的同步信号,以及传递信息提供了极大的便利。

此外,context还允许各子goroutine在父goroutine的控制下,采用各自不同的控制程序的方式。

context包相当简单:

var Canceled = errors.New("context canceled")
var DeadlineExceeded error = deadlineExceededError{}

type CancelFunc func()
type Context interface {}
type emptyCtx int
type cancelCtx struct {}
type timerCtx struct {}
type valueCtx struct {}
func Background() Context
func TODO() Context

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

除了定义的常量和类型,四个以Ctx为后缀的类型除了emptyCtx以外,都是对Context的封装。Background()TODO()都是用来返回一个空的Context类型的实例。

其余三个With前缀的方法除了都接收一个Context类型的参数,然后返回一个Context的实例和一个可以取消该返回ContextCancelFunc类型的方法。

除了emptyCtx,其余的Context创建都是基于一个Context创建。通常用Background()来获取一个根Context,基于这个Context来衍生出其他的Context。而衍生出来的Context将被添加到原有Contextchildren字段中。基于每个Context可以创建多个Context,由此形成一棵树:

我们可以在Context接口的定义中看到,取消一个Context有什么用:

type Context interface {
	Deadline() (deadline time.Time, ok bool)
    // Done returns a channel that's closed when work done on behalf of this
	// context should be canceled.
	Done() <-chan struct{}
	Err() error
	Value(key interface{}) interface{}
}

Done()方法返回一个当Context被取消时,将会关闭的通道。再扒一下cancel()的代码:

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	// ...
	if c.done == nil {
		c.done = closedchan
	} else {
		close(c.done)
	}
	for child := range c.children {
		child.cancel(false, err)
	}
	// ...
}

当调用cancel()时,关闭了done通道,随后调用了childrencancel()。这里的children即是通过WithCancel()等方法返回的Context实例对应的cancelCtx

所以说context是单向同步信号的,因为衍生的Context可以其原有的Context取消,而不能取消原有的Context

valueCtx通过封装Context并添加额外键值对的方式来存储数据:

type valueCtx struct {
	Context
	key, val interface{}
}

其寻找数据的规则也可以通过Value()方法看到:

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {		// 首先在自己保存的数据中找
		return c.val
	}
	return c.Context.Value(key) 	// 然后在父Context中找
}

此外Context的数据流动是单项的,所以不是试图修改valueCtx里的值。 我们还可以看到,valueCtx只能存一对值,并且父Context无法查找子Context的值。而且如果父子Contextkey冲突时,无法获取到父Context的值。所以建议使用自定义类型作为key:

type keyctxa string
type keyctxb string
ctx := context.Background()
var k1 keyctxa = "KeyA"
ctxA := context.WithValue(ctx, k1, 1)
var k2 keyctxb = "KeyA"
ctxB := context.WithValue(ctxA, k2, 2)

fmt.Println(ctxA.Value(k1))
fmt.Println(ctxB.Value(k2))

最后是一个使用context启用多个goroutine搜索切片中某个值的简单例子:

func Search(slice []int, target int) (bool, error) {
	done := make(chan bool)		// 由子goroutine向上传递完成信号
    ctx := context.Background()	// 获取一个根Context
	ctx, cancel := context.WithTimeout(ctx, 3*time.Second)	// 设置超时
	defer cancel()

	length := len(slice)
	index := length/2 + 1
	go gSearch(ctx, done, slice[:index], target)
	go gSearch(ctx, done, slice[index:], target)

	select {
	case <-ctx.Done():	// 超时或被取消时将会返回关闭的通道
		return false, ctx.Err()
	case <-done:		// goroutine 完成所返回的信号
		cancel()		// 向所有子goroutine发送取消信号
		return true, nil
	}
}
func gSearch(ctx context.Context, done chan<- bool, slice []int, target int) {
	var count int
BEGIN:
	for _, v := range slice {
		select {
		case <-ctx.Done():	// 接收到取消信号将结束搜索
			fmt.Println(ctx.Err())
			break BEGIN
		default:
			if v == target {
				done <- true	// 搜索到将会返回完成信号
				break BEGIN
			}
		}
		count++
	}
	fmt.Println("goroutine find: ", count, "times")
}

GMP

正如我们在groutine一节中提到的,Go语言关于并发处理的强大之处在于语言本身对线程的抽象。Go语言的调度器则专门用于处理goroutine和OS线程之间的调度。

让我们先了解以下Go调度器的三个主要概念:

G: groutine M: OS线程(源码中也被称为机器) P: 上下文(源码中也被称为处理器)

其关系如下图所示:

有一点需要提到,通过runtime.GOMAXPROCS()方法可以设置Go语言运行时的上下文数量。在1.5版本之前默认为1,通常通过:

runtime.GOMAXPROCS(runtime.NumCPU())

来充分利用机器上的所有CPU核心,而随后的版本默认设置为主机上的逻辑CPU数量。

工作窃取

现在我们知道P将会为我们调度G到M上运行,而具体是如何调度的则是通过一种被称为工作窃取的策略。

在了解工作窃取之前,让我们先了解以下,公平调度策略。假设有n个处理器,还有x个任务需要被执行,那么每个处理器在公平调度策略下都会分到x/n个任务。 这看起来似乎很合理,但是在Go语言中采用的是fork-join模型。fork-join模型可以使得程序在指定一点上“分叉”而开始并行执行,在随后的一点上“合并”并恢复顺序执行。

假设我们现在有四个任务,分配给两个处理器执行。T1依赖于T4的执行,而T2的执行时间比T1和T3的执行时间总和都要长。与此同时T1和T3分配给处理器P1,T2和T4分配给了P2,那么两个处理器运行的时间将会如下所示:

Time        P1          P2
n+a         T1          T2
n+b         (blocked)   T2
n+b+d       (blocked)   T4
n+b+d+a     T1          (idle)
n+b+d+a+c   T3          (idle)

我们可以看到,P1在T1等待T4完成的时候是出于阻塞状态的,而且P2在完成T4后T3在等待被处理,而P2却是空闲状态。由此我们可以看到公平调度策略下存在一些问题。

工作窃取算法基于每个处理器独自的任务处理队列,并允许某个处理器从别的处理器准备处理的任务中获取任务并处理。该算法遵循以下一些基本原则,对于给定的线程:

  1. fork点将任务添加到于线程关联的队列的尾部

  2. 如果有线程空闲,则选取一个随机的线程,从它关联的队列的头部窃取工作

  3. 如果在未准备好的join-point(即与之同步的goroutine还没有完成),则将工作从线程队列的尾部出栈。

  4. 如果线程的任务队列是空的,则:

    a. 暂停加入

    b. 从别的线程队列中随机窃取工作

让我们通过一个Fibonacci数列的计算来看看工作窃取算法是如何工作的:

var fib func(n int) <-chan int
fib = func(n int) <-chan int {
    result := make(chan int)
    go func() {
        defer close(result)
        if n <= 2 {
            result <- 1
            return
        }
        result <- <-fib(n-1) + <-fib(n-2)
    }()
    return result
}

fmt.Println(<-fib(4))

首先T1将fib(4)创建在自己的队列,此时有可能T2从T1的队列中盗取了fib(4),这里我们假设T1执行了fib(4)。然后由于+操作符从左到右计算,所有在T1的队列中先创建fib(3)然后创建fib(2)

Step   T1 Stack                      T1 Queue   T2 Stack            T2 Queue
1      (main goroutine)                                              
2      (main goroutine)              fib(4)                          
3      (main goroutine)(wait join)                                   
       fib(4)                                                        
4      (main goroutine)(wait join)   fib(3)                          
       fib(4)                        fib(2)                  

此时T2依然是空闲的,它将会从T1的队列头部取出fib(3)并执行。(通常fib(3)会更有可能被T1执行,之后会讨论其原因,这里为了演示方便假设取出的是fib(3))T1的fib(4)进入等待后取出fib(2)执行

Step   T1 Stack                      T1 Queue   T2 Stack            T2 Queue
5      (main goroutine)(wait join)   fib(2)     fib(3)               
       fib(4)                                                        
6      (main goroutine)(wait join)              fib(3)               
       fib(4)(wait join)                                             
       fib(2)                                                        
7      (main goroutine)(wait join)              fib(3)              fib(2)
       fib(4)(wait join)                                            fib(1)
       fib(2)                                                     

T2执行fib(3)后同样创建了fib(2)fib(1)在自己的队列,此时T1执行完自己的任务后,从T2中窃取了任务fib(2),并执行

Step   T1 Stack                      T1 Queue   T2 Stack            T2 Queue
8      (main goroutine)(wait join)              fib(3)(wait join)   fib(2)
       fib(4)(wait join)                        fib(1)               
       (return 1)                                                    
9      (main goroutine)(wait join)              fib(3)(wait join)    
       fib(4)(wait join)                        fib(1)               
       fib(2)                                                        
10     (main goroutine)(wait join)              fib(3)(wait join)    
       fib(4)(wait join)                        (return 1)           
       (return 1)                                                                 

T2执行的fib(3)完成join,返回后T1上的fib(4)也完成了join

Step   T1 Stack                      T1 Queue   T2 Stack            T2 Queue
11     (main goroutine)(wait join)              (return 2)           
       fib(4)(wait join)                                             
12     (main goroutine)(wait join)                                   
       (return 3)    

最后继续执行main goroutine并将<-fib(4)打印出来。

总的来说这种任务调度方式在性能上有很多隐含的好处

续体窃取

由于在Go语言中goroutine很好的封装了一个工作体,所以我们会很自然的认为Go会将任务或者goroutine进行排队。实际上这并不是Go语言工作窃取算法的原理

Go语言工作窃取算法是对续体进行入队和窃取的。在Go语言中,goroutine就是任务,而goroutine之后的都被称为续体

当一个执行线程到达一个join point时,该线程必须暂停执行,并等待回调,这被称为延迟join。任务窃取和续体窃取都存在延迟join,但发生频率有显著差异。

试想一下:当创建一个goroutine时,通常我们会希望该goroutine会被立即执行,以尽快到达join point。同时,执行其续体的goroutine也会想尽快join回原来的goroutine。在新创建的goroutine完成之前,续体尝试join的情况也并不罕见。

对于这些情况有一条公理,即在调度goroutine时,最合理的是立即开始执行这个goroutine

同样是刚才的Fibonacci函数,我们先做一下约定:

  • 使用cont.of X来表示某续体

  • 当续体被执行时,将其转换成下一次的fib()调用

再让我们采用续体窃取的方式模拟一下运行过程:

首先T1执行main(),然后创建并执行fib(4),将续体conf.of main放入队列

Step   T1 Stack     T1 Queue         T2 Stack                  T2 Queue
1      main                                                   
2      fib(4)     	conf.of main                

随后续体conf.of main被T2窃取,T1执行fib(4)后将续体conf.of fib(4)加入队列尾部,T1继续执行fib(3)

Step   T1 Stack     T1 Queue         T2 Stack                  T2 Queue
3      fib(3)       conf.of fib(4)   conf.of main               

T2到达到续体conf.of mainjoin point后,继续从T1的队列中窃取任务conf.of fib(4)执行,T1也同时执行fib(3)并将执行fib(3)产生的续体放入队列,接着执行fib(3)创建的fib(2)

Step   T1 Stack     T1 Queue         T2 Stack                  T2 Queue
4      fib(2)     	conf.of fib(3)   conf.of main(wait join)    
                                   	 conf.of fib(4)   

随后T1执行完fib(2)准备返回,T2从续体conf.of fib(4)中执行fib(2)

Step   T1 Stack     T1 Queue         T2 Stack                  T2 Queue
5      (return 1)   conf.of fib(3)   conf.of main(wait join)   conf.of fib(4)
                                     fib(2)   

T1继续从队列取出续体conf.of fib(3)执行其中的fib(1),T2也返回了fib(2)的计算,并完成续体conf.of fib(4)的运行,在fib(4)join point等待

Step   T1 Stack     T1 Queue         T2 Stack                  T2 Queue
6      fib(1)                        conf.of main(wait join)   conf.of fib(4)
                                     (return 1)  
7      (return 1)                    conf.of main(wait join)    
                                     fib(4)(wait join)     

最后T1完成fib(1)后返回并完成join,T2打印结果

Step   T1 Stack     T1 Queue         T2 Stack                  T2 Queue
8                                    conf.of main(wait join)    
                                     (return 3)                 
9                                    main(print 3)     

我们可以看到,通过续体窃取,在线程上的延迟join都发生在空闲的线程上,并且调用栈的最大深度也低于任务窃取。并如果我们让同样的代码在单个线程上执行之后会发现,它的运行就像我们使用普通函数一样。从理论上来说,续体窃取是被任务优于任务窃取的。

全局上下文

了解完Go语言的窃取之后,还有最后一个问题。当执行goroutine的系统线程阻塞时,Go语言将如何处理?

从逻辑上讲,当执行G的M阻塞时,对应G也将被阻塞。这似乎很合理,但是,从性能上看,Go语言应该尽可能多的保证P活跃。

在上述情况中,Go语言会将阻塞的M与该P分离,并将该P切换到另一个无阻塞的M上,阻塞的G任然与阻塞的M关联。当G阻塞结束时,操作系统会尝试使用另一个M来回退P,以便能执行这个阻塞的G。但是,这并不总是能顺利进行,如果失败,线程会把该G放在全局上下文中,然后线程休眠,并放回Go运行时的线程池,以供将来使用。(例如goroutine被再次阻塞)

在窃取算法中,P也会定期检查全局上下文,以查看是否有可执行的G。

最后更新于