# 并发
并发编程相关概念。
并发(concurrency):逻辑上具备同时处理多个任务的能力。
并行(parallesim):物理上在同一时刻执行多个并发任务。
需要程序以并发模型设计。执行时依据环境(单核或多核处理器)不同,有不同运行方式和效率。多核处理器真正同时执行多个任务,而单核只能以间隔切换方式运行。所以说,并发是并行的必要条件,并行是并发的理想状态。并行需要多进程(process)或多线程(thread)支持,而并发可在单线程上以协程(coroutine)实现。
协程通常是指在单线程上,通过协作式切换执行多个任务的并发设计。比如,将 IO 等待时间,用来执行其他任务。且单线程无竟态条件,可减少或避免使用锁。某些时候,这些用户空间,较少上下文切换的协程比多线程有更高的执行效率。
# 任务
简单将 goroutine 归为协程( coroutine
)并不合适。它类似多线程和协程的综合体,最大限度发挥多核处理能力,提升执行效率。
按需扩张的极小初始栈,支持海量任务。
高效无锁内存分配和复用,提升并发性能。
调度器平衡任务队列,充分利用多处理器。
线程自动休眠和唤醒,减少内核开销。
基于信号( signal
)实现抢占式任务调度。
关键字 go
将目标函数和参数 打包(非执行)成并发任务单元,放入待运行队列。
无法确定执行时间、执行次序,以及执行线程,由调度器负责处理。
func main() { | |
// 打包并发任务(函数 + 参数)。 | |
// 并非立即执行。 | |
go println("abc") | |
go func(x int) { println(x) }(123) | |
// 上述并发任务会被其他饥饿线程取走。 | |
// 执行时间未知,下面这行可能先输出。 | |
println("main") | |
time.Sleep(time.Second) | |
} |
参数立即计算并复制。
var c int | |
func inc() int { | |
c++ | |
return c | |
} | |
func main() { | |
a := 100 | |
// 立即计算出参数 (100, 1),并复制。 | |
go func(x, y int) { | |
time.Sleep(time.Second) | |
println("go:", x, y) // 100, 1 | |
}(a, inc()) | |
a += 100 | |
println("main:", a, inc()) // 200, 2 | |
time.Sleep(time.Second * 3) | |
} | |
// main: 200 2 | |
// go: 100 1 |
# 结束
所有用户代码都以 goroutine
执行,包括 main.main
入口函数。
进程结束,不会等待其他正在执行或尚未执行的任务。
func main() { | |
go func() { | |
defer println("g done.") | |
time.Sleep(time.Second) | |
}() | |
defer println("main done.") | |
} | |
// main done. |
# 等待
等待任务结束,可以做得比 time.Sleep
更优雅一些。
channel
: 信号通知。WaitGroup
:等待多个任务结束。Context
:上下文通知。Mutex
:锁阻塞。
如果只是一次性通知行为,可使用空结构。只要关闭通道,等待(阻塞)即可解除。
func main() { | |
q := make(chan struct{}) | |
go func() { | |
defer close(q) | |
println("done.") | |
}() | |
<- q | |
} |
添加计数( WaitGroup.Add
)应在创建任务和等待之前,否则可能导致等待提前解除。
可以有多处等待,实现群体性通知。
func main() { | |
var wg sync.WaitGroup | |
wg.Add(10) | |
for i := 0; i < 10; i++ { | |
go func(id int) { | |
defer wg.Done() | |
println(id, "done.") | |
}(i) | |
} | |
wg.Wait() | |
} |
上下文的实现和通道基本一致。
func main() { | |
ctx, cancel := context.WithCancel(context.Background()) | |
go func() { | |
defer cancel() | |
println("done.") | |
}() | |
<- ctx.Done() | |
} |
利用锁实现 “同步”,在其他语言很常见,但 Go 更倾向于以通信代替。
func main() { | |
var lock sync.Mutex | |
lock.Lock() | |
go func() { | |
defer lock.Unlock() | |
println("done.") | |
}() | |
lock.Lock() | |
lock.Unlock() | |
println("exit") | |
} |
# 终止
主动结束任务,有以下几种方式。
调用 runtime.Goexit
终止任务。
调用 os.Exit
结束进程。
在任务调用堆栈(call stack)的任何位置调用 runtime.Goexit
都能立即终止任务。
结束前,延迟调用( defer
)被执行。其他任务不受影响。
func main() { | |
q := make(chan struct{}) | |
go func() { | |
defer close(q) | |
defer println("done.") | |
a() | |
b() | |
c() | |
}() | |
<- q | |
} | |
func a() { println("a") } | |
func b() { println("b"); runtime.Goexit() } | |
func c() { println("c") } | |
// a | |
// b | |
// done. |
在 main goroutine
里调用 Goexit
,它会等待其他任务结束,然后崩溃进程。
func main() { | |
q := make(chan struct{}) | |
go func() { | |
defer close(q) | |
defer println("done.") | |
time.Sleep(time.Second) | |
}() | |
runtime.Goexit() | |
<- q | |
} | |
// done. | |
// fatal error: no goroutines (main called runtime.Goexit) - deadlock! |
而 os.Exit
可在任意位置结束进程。不等待其他任务,也不执行延迟调用。
func main() { | |
go func() { | |
defer println("g done.") | |
time.Sleep(time.Second) | |
}() | |
defer println("main done.") | |
os.Exit(-1) | |
} |
# 限制
运行时可能会创建很多线程,但任何时候都只有几个参与并发任务执行,其他处于休眠状态。
默认与逻辑处理器(logic core)数量相等,或使用 GOMAXPROCS
修改。
package main | |
import ( | |
"sync" | |
"math" | |
) | |
//go:noinline | |
func sum() (n int) { | |
for i := 0; i < math.MaxUint32; i++ { | |
n += i | |
} | |
return | |
} | |
func main() { | |
var wg sync.WaitGroup | |
for i := 0; i < 4; i++ { | |
wg.Add(1) | |
go func(){ | |
defer wg.Done() | |
sum() | |
}() | |
} | |
wg.Wait() | |
} |
$ go build -o test | |
$ time GOMAXPROCS=1 ./test | |
real 0m7.465s | |
user 0m7.446s | |
sys 0m0.016s | |
$ time GOMAXPROCS=2 ./test | |
real 0m3.819s # 执行时间。(wall clock) | |
user 0m7.569s # 多核累加。(CPU clock) | |
sys 0m0.014s |
并发数是否越大越好?未必!这涉及线程上下文切换等诸多因素,通常以默认设置即可。
除环境变量,也可在代码中直接调用 runtime.GOMAXPROCS
。
不要频繁调用 GOMAXPROCS
,它会导致 STW,影响性能。
除并发控制外,还可以通过 GOGC
、 GOMEMLIMIT
调整垃圾回收频率和内存阈值。
# 调度
除运行时自动调度外,某些时候需要手动控制任务执行。
挂起
暂时挂起任务,释放线程去执行其他任务。
当前任务被放回任务队列,等待下次被某个线程重新获取后继续执行。
也就是说,一个任务不一定由同一线程完成。实际上,除了主动协作调度外,还要考虑运行时抢占调度等因素。长时间运行的任务会被暂停,让其他等待任务有机会执行,以确保公平。
package main | |
import ( | |
"sync" | |
"runtime" | |
) | |
func main() { | |
// 限制并发任务数。 | |
runtime.GOMAXPROCS(1) | |
var wg sync.WaitGroup | |
wg.Add(2) | |
a, b := make(chan struct{}), make(chan struct{}) | |
go func() { | |
defer wg.Done() | |
<- a | |
for i := 0; i < 5; i++ { | |
println("a", i) | |
} | |
}() | |
go func() { | |
defer wg.Done() | |
<- b | |
for i := 0; i < 5; i++ { | |
println("b", i) | |
if i == 2 { runtime.Gosched() } | |
} | |
}() | |
// 安排执行次序。 | |
close(b) | |
close(a) | |
wg.Wait() | |
} | |
/* --- output ----------- | |
b 0 | |
b 1 | |
b 2 <---- PAUSE | |
a 0 | |
a 1 | |
a 2 | |
a 3 | |
a 4 | |
b 3 <---- CONT | |
b 4 | |
*/ |
发令
暂停一批任务,直到某个信号发出。
func main() { | |
var wg sync.WaitGroup | |
r := make(chan struct{}) | |
for i := 0; i < 10; i++ { | |
wg.Add(1) | |
go func(id int){ | |
defer wg.Done() | |
<- r // 阻塞,等待信号。 | |
println(id) | |
}(i) | |
} | |
close(r) | |
wg.Wait() | |
} |
也可反向使用 sync.WaitGroup
,让多个 goroutine Wait
,然后 main Done
。
类似实现,还有信号量,可以控制启动任务数量。
次序
多个任务按特定次序执行。
func main() { | |
const CNT = 5 | |
var wg sync.WaitGroup | |
wg.Add(CNT) | |
var chans [CNT]chan struct{} | |
for i := 0; i < CNT; i++ { | |
chans[i] = make(chan struct{}) | |
go func(id int){ | |
defer wg.Done() | |
<- chans[id] | |
println(id) | |
}(i) | |
} | |
// 次序(延时,给调度器时间处理) | |
for _, x := range []int{4, 0, 1, 3, 2} { | |
close(chans[x]) | |
time.Sleep(time.Millisecond * 10) | |
} | |
wg.Wait() | |
} |
# 存储
默认情形下,任务和线程并非绑定关系,所以不能用 TLS 之类的本地存储。
以参数方式传入外部容器,但要避免和其他并发任务竞争(data race)。
runtime.LockOSThread
可锁定调用线程,常用于 syscall
和 CGO
。
func main() { | |
var gls [2]struct { | |
id int | |
ret int | |
} | |
var wg sync.WaitGroup | |
wg.Add(len(gls)) | |
for i := 0; i < len(gls); i++ { | |
go func(id int) { | |
defer wg.Done() | |
gls[id].id = id | |
gls[id].ret = (id + 1) * 100 | |
}(i) | |
} | |
wg.Wait() | |
fmt.Printf("%+v\n", gls) | |
} | |
// [{id:0 ret:100} {id:1 ret:200}] |
# 通道
鼓励使用 CSP 通道,以通信来代替内存共享,实现并发安全。
通道(channel)行为类似消息队列。不限收发人数,不可重复消费。
Don't communicate by sharing memory, share memory by communicating.
CSP: Communicating Sequential Process.
# 同步
没有数据缓冲区,须收发双方到场直接交换数据。
阻塞,直到有另一方准备妥当或通道关闭。
可通过 cap == 0 判断为无缓冲通道。
func main() { | |
quit := make(chan struct{}) | |
data := make(chan int) | |
go func() { | |
data <- 11 | |
}() | |
go func() { | |
defer close(quit) | |
println(<- data) | |
println(<- data) | |
}() | |
data <- 22 | |
<- quit | |
} |
# 异步
通道自带固定大小缓冲区。有数据或有空位时,不会阻塞。
用 cap、len 获取缓冲区大小和当前缓冲数。
func main() { | |
quit := make(chan struct{}) | |
data := make(chan int, 3) | |
data <- 11 | |
data <- 22 | |
data <- 33 | |
println(cap(data), len(data)) // 3 3 | |
go func() { | |
defer close(quit) | |
println(<- data) | |
println(<- data) | |
println(<- data) | |
println(<- data) // block | |
}() | |
data <- 44 | |
<- quit | |
} |
缓冲区大小仅是内部属性,不属于类型组成部分。
通道变量本身就是指针,可判断是否为同一对象或 nil。
func main() { | |
var a, b chan int = make(chan int, 3), make(chan int) | |
var c chan bool | |
println(a == b) // false | |
println(c == nil) // true | |
println(a, unsafe.Sizeof(a)) // 0xc..., 8 | |
} |
# 关闭
对于 closed
或 nil
通道,规则如下:
无论收发, nil
通道都会阻塞。
不能关闭 nil
通道。
重复关闭通道,引发 panic
!
向已关闭通道发送数据,引发 panic
!
从已关闭通道接收数据,返回缓冲数据或零值。
提示, nil
通道是指没有 make
的变量。
鉴于通道关闭后,所有基于此的阻塞都被解除,可用作通知。
没有判断通道是否已被关闭的直接方法,只能透过收发模式获知。
func main() { | |
c := make(chan int) | |
close(c) | |
// close(c) ~ panic: close of closed channel | |
// 不会阻塞,返回零值。 | |
println(<- c) // 0 | |
println(<- c) // 0 | |
} |
func main() { | |
c := make(chan int, 2) | |
c <- 1 | |
close(c) | |
// 不会阻塞,返回零值。 | |
println(<- c) // 1 | |
println(<- c) // 0 | |
println(<- c) // 0 | |
} |
为避免重复关闭,可包装 close
函数。
也可以类似方式封装 send
、 recv
操作。
func closechan[T any](c chan T) { | |
defer func(){ | |
recover() | |
}() | |
close(c) | |
} | |
func main() { | |
c := make(chan int, 2) | |
closechan(c) | |
closechan(c) | |
} |
保留关闭状态。注意,为并发安全,关闭和获取关闭状态应保持同步。
可使用 sync.RWMutex
、 sync.Once
优化设计。
type Queue[T any] struct { | |
sync.Mutex | |
ch chan T | |
cap int | |
closed bool | |
} | |
func NewQueue[T any](cap int) *Queue[T] { | |
return &Queue[T]{ | |
ch: make(chan T, cap), | |
} | |
} | |
func (q *Queue[T]) Close() { | |
q.Lock() | |
defer q.Unlock() | |
if !q.closed { | |
close(q.ch) | |
q.closed = true | |
} | |
} | |
func (q *Queue[T]) IsClosed() bool { | |
q.Lock() | |
defer q.Unlock() | |
return q.closed | |
} | |
func main() { | |
var wg sync.WaitGroup | |
q := NewQueue[int](3) | |
for i := 0; i < 10; i++ { | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
defer q.Close() | |
println(q.IsClosed()) | |
}() | |
} | |
wg.Wait() | |
} |
利用 nil
通道阻止退出。
func main() { | |
<-(chan struct{})(nil) // select{} | |
} |
# 收发
正常接收操作是 ok-idiom
或 range
模式。
ok == false
:通道被关闭。for ... range
:循环,直至通道关闭。
func main() { | |
var wg sync.WaitGroup | |
wg.Add(2) | |
c := make(chan int) | |
go func() { | |
defer wg.Done() | |
for { | |
x, ok := <-c | |
if !ok { return } | |
println(x) | |
} | |
}() | |
go func() { | |
defer wg.Done() | |
defer close(c) | |
c <- 1 | |
c <- 2 | |
c <- 3 | |
}() | |
wg.Wait() | |
} |
func main() { | |
var wg sync.WaitGroup | |
wg.Add(2) | |
c := make(chan int) | |
go func() { | |
defer wg.Done() | |
for x := range c{ | |
println(x) | |
} | |
}() | |
go func() { | |
defer wg.Done() | |
defer close(c) | |
c <- 1 | |
c <- 2 | |
c <- 3 | |
}() | |
wg.Wait() | |
} |
及时关闭通道,否则可能导致死锁。fatal error: all goroutines are asleep - deadlock!
# 单向
通道默认双向,不区分发送和接收端。可限制方向获得更严谨的操作逻辑。
func main() { | |
var wg sync.WaitGroup | |
wg.Add(2) | |
c := make(chan int) | |
var send chan<- int = c | |
var recv <-chan int = c | |
// recv | |
go func() { | |
defer wg.Done() | |
for x := range recv { | |
println(x) | |
} | |
}() | |
// send | |
go func() { | |
defer wg.Done() | |
defer close(c) | |
for i := 0; i < 3; i++ { | |
send <- i | |
} | |
}() | |
wg.Wait() | |
} |
不能在单向通道上做逆向操作。
func main() { | |
c := make(chan int, 2) | |
var send chan<- int = c | |
var recv <-chan int = c | |
// <-send // ~ cannot receive from send-only channel | |
// recv <- 1 // ~ cannot send to receive-only channel | |
// close(recv) // ~ cannot close receive-only channel | |
} |
无法将单向通道转换回去。
func main() { | |
var a, b chan int | |
a = make(chan int, 2) | |
var recv <-chan int = a | |
var send chan<- int = a | |
// b = (chan int)(recv) // ~ cannot convert recv (<-chan int) to type chan int | |
// b = (chan int)(send) // ~ cannot convert send (chan<- int) to type chan int | |
} |
尽管可用 make
创建单向通道,但没有任何意义,因为通道必须同时有收发行为。
func main() { | |
q := make(<-chan struct{}) | |
go func(){ | |
close(q) // ~ cannot close receive-only channel | |
}() | |
<- q | |
} |
选择
用 select
语句处理多个通道,随机选择可用通道做收发操作。
将失效通道置为 nil
(阻塞,不可用),用作结束判断。
func main() { | |
var wg sync.WaitGroup | |
wg.Add(2) | |
cha, chb := make(chan int), make(chan int) | |
// recv | |
go func() { | |
defer wg.Done() | |
for { | |
x := 0 | |
ok := false | |
// random | |
select { | |
case x, ok = <-cha: if !ok { cha = nil } | |
case x, ok = <-chb: if !ok { chb = nil } | |
} | |
if (cha == nil) && (chb == nil) { return } | |
println(x) | |
} | |
}() | |
// send | |
go func() { | |
defer wg.Done() | |
defer close(cha) | |
defer close(chb) | |
for i := 0; i < 10; i++ { | |
// random | |
select { | |
case cha <- i: | |
case chb <- i * 10: | |
} | |
} | |
}() | |
wg.Wait() | |
} |
即便是同一通道,也会随机选择 case 执行。
func main() { | |
var wg sync.WaitGroup | |
wg.Add(2) | |
cha := make(chan int) | |
// recv | |
go func() { | |
defer wg.Done() | |
for { | |
x := 0 | |
ok := false | |
// random | |
select { | |
case x, ok = <-cha: println("c1", x) | |
case x, ok = <-cha: println("c2", x) | |
} | |
if !ok { return } | |
} | |
}() | |
// send | |
go func() { | |
defer wg.Done() | |
defer close(cha) | |
for i := 0; i < 10; i++ { | |
// random | |
select { | |
case cha <- i: | |
case cha <- i * 10: | |
} | |
} | |
}() | |
wg.Wait() | |
} |
所有通道都不可用,则执行 default 分支,避免阻塞。
func main() { | |
exit := make(chan struct{}) | |
c := make(chan int) | |
go func() { | |
defer close(exit) | |
for { | |
select { | |
case x, ok := <-c: | |
if !ok { return } | |
println(x) | |
default: | |
} | |
fmt.Println("wait...") | |
time.Sleep(time.Second) | |
} | |
}() | |
time.Sleep(time.Second * 3) | |
c <- 100 | |
close(c) | |
<-exit | |
} |
空 select 语句,一直阻塞或死锁。
select{} |
缺省
利用 default
实现判断逻辑。
func main() { | |
done := make(chan struct{}) | |
// 多槽构成的数据区。 | |
data := []chan int{ | |
make(chan int, 3), | |
} | |
go func() { | |
defer close(done) | |
for i := 0; i < 10; i++ { | |
// 向最后一个槽添加数据。 | |
// 添加失败(已满),则创建新槽。 | |
select { | |
case data[len(data)-1] <- i: | |
default: | |
data = append(data, make(chan int, 3)) | |
data[len(data)-1] <- i | |
} | |
} | |
}() | |
<-done | |
for _, c := range data { | |
close(c) | |
for x := range c { | |
println(x) | |
} | |
} | |
} |
反射
如果运行期才能确定通道数量,可利用反射( reflect
)实现。
func main() { | |
exit := make(chan struct{}) | |
// 运行时动态创建。 | |
chans := make([]chan int, 0) | |
chans = append(chans, make(chan int)) | |
chans = append(chans, make(chan int)) | |
go func() { | |
defer close(exit) | |
// 反射构建 select 操作。 | |
cases := make([]reflect.SelectCase, len(chans)) | |
for i, c := range chans { | |
cases[i] = reflect.SelectCase { | |
Dir: reflect.SelectRecv, | |
Chan: reflect.ValueOf(c), | |
} | |
} | |
for { | |
index, value, ok := reflect.Select(cases) | |
// 检查并退出。 | |
if !ok { | |
chans[index] = nil | |
n := 0 | |
for _, c := range chans { | |
if c == nil { n++ } | |
if n == len(chans) { return } | |
} | |
continue | |
} | |
println(index, value.Int(), ok) | |
} | |
}() | |
chans[1] <- 101 | |
chans[0] <- 100 | |
for _, c := range chans { | |
close(c) | |
} | |
<- exit | |
} |
# 使用
通常以工厂方法将 goroutine 和 channel 绑定。
func newRecv[T any](cap int) (data chan T, done chan struct{}) { | |
data = make(chan T, cap) | |
done = make(chan struct{}) | |
go func() { | |
defer close(done) | |
for v := range data { | |
println(v) | |
} | |
}() | |
return | |
} | |
func main() { | |
data, done := newRecv[int](3) | |
for i := 0; i < 10; i++ { | |
data <- i | |
} | |
close(data) | |
<- done | |
} |
# 超时
如果 channel 阻塞且没有关闭,那么可能导致 goroutine 泄漏(leak)。
解决办法是用 select
default
,或 time.After
设置超时。
func main() { | |
quit := make(chan struct{}) | |
c := make(chan int) | |
go func() { | |
defer close(quit) | |
select { | |
case x, ok := <- c: println(x, ok) | |
case <- time.After(time.Second): return | |
} | |
}() | |
<- quit | |
} |
# 信号量
用通道实现信号量(semaphore),在同一时刻仅指定数量的 goroutine 参与工作。
type Sema struct { | |
c chan struct{} | |
} | |
func NewSema(n int) *Sema { | |
return &Sema{ | |
c: make(chan struct{}, n), | |
} | |
} | |
func (m *Sema) Acquire() { | |
m.c <- struct{}{} | |
} | |
func (m *Sema) Release() { | |
<- m.c | |
} | |
func main() { | |
var wg sync.WaitGroup | |
// runtime: 4 | |
// sema: 2 | |
runtime.GOMAXPROCS(4) | |
sem := NewSema(2) | |
for i := 0; i < 5; i++ { | |
wg.Add(1) | |
go func(id int) { | |
defer wg.Done() | |
sem.Acquire() | |
defer sem.Release() | |
for n := 0; n < 3; n++ { | |
time.Sleep(time.Second * 2) | |
fmt.Println(id, time.Now()) | |
} | |
}(i) | |
} | |
wg.Wait() | |
} |
# 对象池
鉴于通道本身就是一个并发安全的队列,可用作 ID generator、Pool 用途。
type Pool[T any] chan T | |
func NewPool[T any](cap int) Pool[T] { | |
return make(chan T, cap) | |
} | |
func (p Pool[T]) Get() (v T, ok bool) { | |
select { | |
case v = <-p: ok = true | |
default: | |
} | |
return | |
} | |
func (p Pool[T]) Put(v T) bool { | |
select { | |
case p <- v: return true | |
default: | |
} | |
return false | |
} | |
// ----------------------------- | |
func main() { | |
p := NewPool[int](2) | |
println(p.Put(1)) | |
println(p.Put(2)) | |
println(p.Put(3)) | |
for { | |
v, ok := p.Get() | |
if !ok { break } | |
println(v) | |
} | |
} |
# 退出
捕获 INT
、 TERM
信号,顺便实现一个简易的 atexit
和 函数。
import ( | |
"os" | |
"os/signal" | |
"sync" | |
"syscall" | |
"time" | |
) | |
var exs = &struct { | |
sync.RWMutex | |
funcs []func() | |
signals chan os.Signal | |
}{} | |
func atexit(f func()) { | |
exs.Lock() | |
defer exs.Unlock() | |
exs.funcs = append(exs.funcs, f) | |
} | |
func wait(code int) { | |
// 信号注册。 | |
if exs.signals == nil { | |
exs.signals = make(chan os.Signal) | |
signal.Notify(exs.signals, syscall.SIGINT, syscall.SIGTERM) | |
} | |
// 独立函数,确保 atexit 函数能按 FILO 顺序执行。 | |
// 不受 os.Exit 影响。 | |
func() { | |
exs.RLock() | |
for _, f := range exs.funcs { | |
defer f() | |
} | |
exs.RUnlock() | |
<-exs.signals | |
}() | |
// 终止进程。 | |
os.Exit(code) | |
} | |
func main() { | |
atexit(func() { | |
time.Sleep(time.Second * 3) | |
println("1 ..."); | |
}) | |
atexit(func() { | |
println("2 ...") | |
}) | |
println("Press CTRL + C to exit.") | |
wait(1) | |
} |
# 队列
通道本就是队列,需要关心的是如何优雅(gracefully)地关闭通道。
func main() { | |
max := int64(100) // 最大发送计数。 | |
m := 3 // 接收者数量。 | |
n := 3 // 发送者数量。 | |
var wg sync.WaitGroup | |
wg.Add(m + n) | |
data := make(chan int) // 数据通道。 | |
done := make(chan struct{}) // 结束通知。 | |
// m recv | |
for i := 0; i < m; i++ { | |
go func() { | |
defer wg.Done() | |
for { | |
select { | |
case <- done: return | |
case v := <- data: println(v) | |
} | |
} | |
}() | |
} | |
// n send | |
for i := 0; i < n; i++ { | |
go func(id int) { | |
defer wg.Done() | |
defer func(){ recover() }() | |
for { | |
select { | |
case <- done: return | |
case data <- id: | |
if atomic.AddInt64(&max, -1) <= 0 { | |
close(done) | |
return | |
} | |
default: | |
} | |
} | |
}(i) | |
} | |
wg.Wait() | |
} |
# 性能优化
将发往通道的数据打包,减少传输次数,可有效提升性能。
通道内部实现有锁和数据复制操作,单次发送更多数据(批处理),可改善性能。
const ( | |
MAX = 500000 // 数据统计上限。 | |
BLOCK = 500 // 数据块大小。 | |
CAP = 100 // 缓冲区大小。 | |
) | |
//go:noinline | |
func normal() { | |
done := make(chan struct{}) | |
c := make(chan int, CAP) | |
go func() { | |
defer close(done) | |
count := 0 | |
for x := range c { | |
count += x | |
} | |
}() | |
for i := 0; i < MAX; i++ { | |
c <- i | |
} | |
close(c) | |
<-done | |
} | |
//go:noinline | |
func block() { | |
done := make(chan struct{}) | |
c := make(chan [BLOCK]int, CAP) | |
go func() { | |
defer close(done) | |
count := 0 | |
for a := range c { | |
for _, x := range a { | |
count += x | |
} | |
} | |
}() | |
for i := 0; i < MAX; i += BLOCK { | |
// 使用数组对数据打包。 | |
var b [BLOCK]int | |
for n := 0; n < BLOCK; n++ { | |
b[n] = i + n | |
if i + n == MAX - 1 { | |
break | |
} | |
} | |
c <- b | |
} | |
close(c) | |
<-done | |
} |
func BenchmarkNormal(b *testing.B) { | |
for i := 0; i < b.N; i++ { | |
normal() | |
} | |
} | |
func BenchmarkBlock(b *testing.B) { | |
for i := 0; i < b.N; i++ { | |
block() | |
} | |
} |
cpu: 12th Gen Intel(R) Core(TM) i7-12700 | |
BenchmarkNormal-20 55 21640556 ns/op 1095 B/op 3 allocs/op | |
BenchmarkBlock-20 1826 650704 ns/op 401544 B/op 3 allocs/op |
# 泄露
如果通道一直处于阻塞状态,那么会导致 goroutine 无法结束和回收,形成资源泄露。
func test() chan byte { | |
c := make(chan byte) | |
go func() { | |
buf := make([]byte, 0, 10<<20) // 10MB | |
for { | |
d, ok := <- c | |
if !ok { return } | |
buf = append(buf, d) | |
} | |
}() | |
return c | |
} | |
func main() { | |
for i := 0; i < 5; i++ { | |
test() | |
} | |
for { | |
time.Sleep(time.Second) | |
runtime.GC() | |
} | |
} |
# 因 goroutine 无法结束,其持有的 buf 内存无法回收。 | |
$ GODEBUG=gctrace=1 ./test | |
gc 1 @0.002s 20%: ... 50->50->50 MB, 50 MB goal, ... | |
gc 2 @1.002s 0%: ... 50->50->50 MB, 100 MB goal, ... (forced) | |
gc 3 @2.009s 0%: ... 50->50->50 MB, 100 MB goal, ... (forced) | |
... | |
gc 7 @6.029s 0%: ... 50->50->50 MB, 100 MB goal, ... (forced) |
# 可以观察到这些 goroutine 的状态。 | |
$ GODEBUG="schedtrace=1000,scheddetail=1" ./test | |
SCHED 3008ms: gomaxprocs=2 idleprocs=2 threads=4 ... | |
G1: status=4(sleep) | |
G2: status=4(force gc (idle)) | |
G3: status=4(GC sweep wait) | |
G4: status=4(GC scavenge wait) | |
G5: status=4(chan receive) m=-1 lockedm=-1 | |
G6: status=4(chan receive) m=-1 lockedm=-1 | |
G7: status=4(chan receive) m=-1 lockedm=-1 | |
G8: status=4(chan receive) m=-1 lockedm=-1 | |
G9: status=4(chan receive) m=-1 lockedm=-1 | |
G10: status=4(GC worker (idle)) | |
G11: status=4(GC worker (idle)) |
通过添加 time.After
之类手段,让通道有解除阻塞的机会。
另外,不当使用 time.Tick
,也会引发泄漏。
# 同步
通道并非用来取代锁,各有不同使用场景。
通道解决高级别逻辑层次并发架构,锁则用来保护低级别局部代码安全。
临界条件:多线程同时读写共享资源(临界资源)。
临界区:读写临界资源的代码片段。
互斥锁:同一时刻,只有一个线程能进入临界区。
读写锁:写独占(其他读写均被阻塞),读共享。
信号量:允许指定数量线程进入临界区。
自旋锁:失败后,以循环积极尝试。(无上下文切换,小粒度)
悲观锁:操作前独占锁定。
乐观锁:假定无竞争,后置检查。(Lock Free, CAS)
标准库 sync
提供了多种锁,另有原子操作等。
Mutex
:互斥锁。RWMutex
:读写锁。
WaitGroup
:等待一组任务结束。Cond
:单播或广播唤醒其他任务。Once
:确保只调用一次(函数)。
Map
:并发安全字典,(少写多读,数据不重叠)Pool
:对象池。(缓存对象可被回收)
# 竞争检测
测试阶段,以 -race
编译,注入竞争检查(data race detection)指令。
有较大性能损失,避免在基准测试和发布版本中使用。
有不确定性,不能保证百分百测出。
单元测试有效完整,定期执行竞争检查。
func main() { | |
var wg sync.WaitGroup | |
wg.Add(2) | |
x := 0 | |
go func() { | |
defer wg.Done() | |
x++ | |
}() | |
go func() { | |
defer wg.Done() | |
println(x) | |
}() | |
wg.Wait() | |
} |
$ go build -race && ./test | |
================== | |
WARNING: DATA RACE | |
Read at 0x00c0000160d8 by goroutine 7: | |
main.main.func2() | |
/root/go/test/main.go:20 +0x74 | |
Previous write at 0x00c0000160d8 by goroutine 6: | |
main.main.func1() | |
/root/go/test/main.go:15 +0x86 | |
Goroutine 7 (running) created at: | |
main.main() | |
/root/go/test/main.go:18 +0x1d6 | |
Goroutine 6 (finished) created at: | |
main.main() | |
/root/go/test/main.go:13 +0x12e | |
================== | |
1 | |
Found 1 data race(s) |
# 互斥锁
文档中标明 “must not be copied”,应避免复制导致锁机制失效。
type data struct { | |
sync.Mutex | |
} | |
// go vet: passes lock by value | |
func (d data) test(s string) { | |
d.Lock() | |
defer d.Unlock() | |
} |
控制在最小范围内,及早释放。
// 错误用法 | |
func { | |
m.Lock() | |
defer m.Unlock | |
url := cache["key"] | |
get(url) // 该操作并不需要保护,延长锁占用。 | |
} | |
// 正确用法 | |
func { | |
m.Lock() | |
url := cache["key"] | |
m.Unlock() | |
get(url) | |
} |
不支持递归锁。
func main() { | |
var m sync.Mutex | |
m.Lock() | |
{ | |
// m.Lock() // ~ fatal error: all goroutines are asleep - deadlock! | |
// m.Unlock() | |
} | |
m.Unlock() | |
} |
# 读写锁
某些场合以读写锁替代互斥锁,可提升性能。
func main() { | |
var wg sync.WaitGroup | |
var rw sync.RWMutex | |
x := 0 | |
// 1 写 | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for i := 0; i < 5; i++ { | |
rw.Lock() | |
time.Sleep(time.Second) // 模拟长时间操作! | |
now := time.Now().Format("15:04:05") | |
x++ | |
fmt.Printf("[W] %d, %v\n", x, now) | |
rw.Unlock() | |
} | |
}() | |
//n 读 | |
for i := 0; i < 5; i++ { | |
wg.Add(1) | |
go func(id int) { | |
defer wg.Done() | |
for n := 0; n < 5; n++ { | |
rw.RLock() | |
time.Sleep(time.Second) | |
now := time.Now().Format("15:04:05") | |
fmt.Printf(" [R%d] %d, %v\n", id, x, now) | |
rw.RUnlock() | |
} | |
}(i) | |
} | |
wg.Wait() | |
} | |
/* | |
[W] 1, 11:23:17 // 独占 | |
[R4] 1, 11:23:18 // 并发 | |
[R3] 1, 11:23:18 | |
[R1] 1, 11:23:18 | |
[R0] 1, 11:23:18 | |
[R2] 1, 11:23:18 | |
[W] 2, 11:23:19 | |
[R4] 2, 11:23:20 | |
[R3] 2, 11:23:20 | |
[R1] 2, 11:23:20 | |
[R0] 2, 11:23:20 | |
[R2] 2, 11:23:20 | |
*/ |
# 条件变量
内部以计数器和队列作为单播( signal
)和广播( broadcast
)依据。
引入外部锁作为临界资源保护,可与其他逻辑同步。
func main() { | |
var wg sync.WaitGroup | |
cond := sync.NewCond(&sync.Mutex{}) | |
data := make([]int, 0) | |
// 1 写 | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for i := 0; i < 5; i++ { | |
// 保护临界资源。 | |
cond.L.Lock() | |
data = append(data, i + 100) | |
cond.L.Unlock() | |
// 唤醒一个。 | |
cond.Signal() | |
} | |
// 唤醒所有(剩余)。 | |
// cond.Broadcast() | |
}() | |
//n 读 | |
for i := 0; i < 5; i++ { | |
wg.Add(1) | |
go func(id int) { | |
defer wg.Done() | |
// 锁定临界资源。 | |
cond.L.Lock() | |
// 循环检查是否符合后续操作条件。 | |
// 如条件不符,则继续等待。 | |
for len(data) == 0 { | |
cond.Wait() | |
} | |
x := data[0] | |
data = data[1:] | |
cond.L.Unlock() | |
println(id, ":", x) | |
}(i) | |
} | |
wg.Wait() | |
} |
为什么 Wait 之前必须加锁?除锁定临界资源外,还与其内部设计有关。
// cond.go | |
func (c *Cond) Wait() { | |
c.checker.check() | |
t := runtime_notifyListAdd(&c.notify) | |
c.L.Unlock() | |
runtime_notifyListWait(&c.notify, t) | |
c.L.Lock() | |
} |
# 单次执行
确保仅执行一次,无论后续是同一函数或不同函数都不行。
func main() { | |
var once sync.Once | |
f1 := func() { println("1") } | |
f2 := func() { println("2") } | |
once.Do(f1) | |
// 以下目标函数不会执行。 | |
once.Do(f1) | |
once.Do(f2) | |
once.Do(f2) | |
} | |
// 1 |
以内部状态(done)记录第一次执行,与具体什么函数无关。
func main() { | |
var once sync.Once | |
once.Do(func() { println("1") }) | |
once.Do(func() { println("1") }) | |
once.Do(func() { println("2") }) | |
once.Do(func() { println("2") }) | |
} | |
// 1 |