# 并发

并发编程相关概念。

并发(concurrency):逻辑上具备同时处理多个任务的能力。

并行(parallesim):物理上在同一时刻执行多个并发任务。

需要程序以并发模型设计。执行时依据环境(单核或多核处理器)不同,有不同运行方式和效率。多核处理器真正同时执行多个任务,而单核只能以间隔切换方式运行。所以说,并发是并行的必要条件,并行是并发的理想状态。并行需要多进程(process)或多线程(thread)支持,而并发可在单线程上以协程(coroutine)实现。

协程通常是指在单线程上,通过协作式切换执行多个任务的并发设计。比如,将 IO 等待时间,用来执行其他任务。且单线程无竟态条件,可减少或避免使用锁。某些时候,这些用户空间,较少上下文切换的协程比多线程有更高的执行效率。

# 任务

简单将 goroutine 归为协程( coroutine )并不合适。它类似多线程和协程的综合体,最大限度发挥多核处理能力,提升执行效率。

按需扩张的极小初始栈,支持海量任务。
高效无锁内存分配和复用,提升并发性能。
调度器平衡任务队列,充分利用多处理器。
线程自动休眠和唤醒,减少内核开销。
基于信号( signal )实现抢占式任务调度。

关键字 go 将目标函数和参数 打包(非执行)成并发任务单元,放入待运行队列。
无法确定执行时间、执行次序,以及执行线程,由调度器负责处理。

func main() {
    // 打包并发任务(函数 + 参数)。
    // 并非立即执行。
	go println("abc")
	go func(x int) { println(x) }(123)
    // 上述并发任务会被其他饥饿线程取走。
    // 执行时间未知,下面这行可能先输出。
	println("main")
	time.Sleep(time.Second)
}

参数立即计算并复制。

var c int
func inc() int {
	c++
	return c
}
func main() {
	a := 100
    // 立即计算出参数 (100, 1),并复制。
	go func(x, y int) {
		time.Sleep(time.Second)
		println("go:", x, y)     // 100, 1
	}(a, inc())            
	a += 100                   
	println("main:", a, inc())   // 200, 2
	time.Sleep(time.Second * 3)
}
// main: 200 2
//   go: 100 1

# 结束

所有用户代码都以 goroutine 执行,包括 main.main 入口函数。
进程结束,不会等待其他正在执行或尚未执行的任务。

func main() {
	go func() {
		defer println("g done.")
		time.Sleep(time.Second)
	}()
	defer println("main done.")
}
// main done.

# 等待

等待任务结束,可以做得比 time.Sleep 更优雅一些。

channel : 信号通知。
WaitGroup :等待多个任务结束。
Context :上下文通知。
Mutex :锁阻塞。

如果只是一次性通知行为,可使用空结构。只要关闭通道,等待(阻塞)即可解除。

func main() {
	q := make(chan struct{})
	go func() {
		defer close(q)
        println("done.")
	}()
	<- q
}

添加计数( WaitGroup.Add )应在创建任务和等待之前,否则可能导致等待提前解除。
可以有多处等待,实现群体性通知。

func main() {
	var wg sync.WaitGroup
    wg.Add(10)
    
	for i := 0; i < 10; i++ {
		go func(id int) {
			defer wg.Done()
			println(id, "done.")
		}(i)
	}
    
	wg.Wait()
}

上下文的实现和通道基本一致。

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		defer cancel()
		println("done.")
	}()
	<- ctx.Done()
}

利用锁实现 “同步”,在其他语言很常见,但 Go 更倾向于以通信代替。

func main() {
	var lock sync.Mutex
	lock.Lock()
	go func() {
		defer lock.Unlock()
		println("done.")
	}()
	lock.Lock()
	lock.Unlock()
	println("exit")
}

# 终止

主动结束任务,有以下几种方式。

调用 runtime.Goexit 终止任务。
调用 os.Exit 结束进程。

在任务调用堆栈(call stack)的任何位置调用 runtime.Goexit 都能立即终止任务。
结束前,延迟调用( defer )被执行。其他任务不受影响。

func main() {
	q := make(chan struct{})
	go func() {
		defer close(q)
		defer println("done.")
		a()
		b()
		c()
	}()
	<- q
}
func a() { println("a") }
func b() { println("b"); runtime.Goexit() }
func c() { println("c") }
// a
// b
// done.

main goroutine 里调用 Goexit ,它会等待其他任务结束,然后崩溃进程。

func main() {
	q := make(chan struct{})
	go func() {
		defer close(q)
		defer println("done.")
		time.Sleep(time.Second)
	}()
	runtime.Goexit()
	<- q
}
// done.
// fatal error: no goroutines (main called runtime.Goexit) - deadlock!

os.Exit 可在任意位置结束进程。不等待其他任务,也不执行延迟调用。

func main() {
	go func() {
		defer println("g done.")
		time.Sleep(time.Second)
	}()
	defer println("main done.")
	os.Exit(-1)
}

# 限制

运行时可能会创建很多线程,但任何时候都只有几个参与并发任务执行,其他处于休眠状态。
默认与逻辑处理器(logic core)数量相等,或使用 GOMAXPROCS 修改。

package main
import (
	"sync"
	"math"
)
//go:noinline
func sum() (n int) {
	for i := 0; i < math.MaxUint32; i++ {
		n += i
	}
	return
}
func main() {
	var wg sync.WaitGroup
	for i := 0; i < 4; i++ {
		wg.Add(1)
		
		go func(){
			defer wg.Done()
			sum()
		}()
	}
	wg.Wait()
}
$ go build -o test
$ time GOMAXPROCS=1 ./test
real	0m7.465s    
user	0m7.446s    
sys	  0m0.016s
$ time GOMAXPROCS=2 ./test
real	0m3.819s   # 执行时间。(wall clock)
user	0m7.569s   # 多核累加。(CPU clock)
sys	  0m0.014s

并发数是否越大越好?未必!这涉及线程上下文切换等诸多因素,通常以默认设置即可。
除环境变量,也可在代码中直接调用 runtime.GOMAXPROCS

不要频繁调用 GOMAXPROCS ,它会导致 STW,影响性能。

除并发控制外,还可以通过 GOGCGOMEMLIMIT 调整垃圾回收频率和内存阈值。

# 调度

除运行时自动调度外,某些时候需要手动控制任务执行。

挂起

暂时挂起任务,释放线程去执行其他任务。
当前任务被放回任务队列,等待下次被某个线程重新获取后继续执行。

也就是说,一个任务不一定由同一线程完成。实际上,除了主动协作调度外,还要考虑运行时抢占调度等因素。长时间运行的任务会被暂停,让其他等待任务有机会执行,以确保公平。

package main
import (
	"sync"
	"runtime"
)
func main() {
    // 限制并发任务数。
	runtime.GOMAXPROCS(1)
	var wg sync.WaitGroup
	wg.Add(2)
	a, b := make(chan struct{}), make(chan struct{})
	go func() {
		defer wg.Done()
		<- a
		for i := 0; i < 5; i++ {
			println("a", i)
		}
	}()
	go func() {
		defer wg.Done()
		<- b
		for i := 0; i < 5; i++ {
			println("b", i)
			if i == 2 { runtime.Gosched() }
		}
	}()
    // 安排执行次序。
    close(b)
    close(a)
    
	wg.Wait()
}
/* --- output -----------
b 0
b 1
b 2  <---- PAUSE
a 0
a 1
a 2
a 3
a 4
b 3  <---- CONT
b 4
*/

发令

暂停一批任务,直到某个信号发出。

func main() {
	var wg sync.WaitGroup
	r := make(chan struct{})
    
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(id int){
			defer wg.Done()
			<- r            // 阻塞,等待信号。
			println(id)
		}(i)
	}
    
    close(r)
	wg.Wait()
}

也可反向使用 sync.WaitGroup ,让多个 goroutine Wait ,然后 main Done
类似实现,还有信号量,可以控制启动任务数量。

次序

多个任务按特定次序执行。

func main() {
	const CNT = 5
    var wg sync.WaitGroup
	wg.Add(CNT)
	var chans [CNT]chan struct{}
    
	for i := 0; i < CNT; i++ {
		chans[i] = make(chan struct{})
		go func(id int){
			defer wg.Done()
			<- chans[id]
			println(id)
		}(i)
	}
	// 次序(延时,给调度器时间处理)
	for _, x := range []int{4, 0, 1, 3, 2} {
		close(chans[x])
		time.Sleep(time.Millisecond * 10)
	}
	wg.Wait()
}

# 存储

默认情形下,任务和线程并非绑定关系,所以不能用 TLS 之类的本地存储。
以参数方式传入外部容器,但要避免和其他并发任务竞争(data race)。

runtime.LockOSThread 可锁定调用线程,常用于 syscallCGO

func main() {
	var gls [2]struct {
		id  int
		ret int
	}
	var wg sync.WaitGroup
	wg.Add(len(gls))
	for i := 0; i < len(gls); i++ {
		go func(id int) {
			defer wg.Done()
			gls[id].id = id
			gls[id].ret = (id + 1) * 100
		}(i)
	}
	wg.Wait()
	fmt.Printf("%+v\n", gls)
}
// [{id:0 ret:100} {id:1 ret:200}]

# 通道

鼓励使用 CSP 通道,以通信来代替内存共享,实现并发安全。
通道(channel)行为类似消息队列。不限收发人数,不可重复消费。

Don't communicate by sharing memory, share memory by communicating.
CSP: Communicating Sequential Process.

# 同步

没有数据缓冲区,须收发双方到场直接交换数据。

阻塞,直到有另一方准备妥当或通道关闭。
可通过 cap == 0 判断为无缓冲通道。

func main() {
	quit := make(chan struct{})
	data := make(chan int)
	go func() {
		data <- 11
	}()
	go func() {
		defer close(quit)
		
		println(<- data)
		println(<- data)
	}()
	data <- 22
	<- quit
}

# 异步

通道自带固定大小缓冲区。有数据或有空位时,不会阻塞。

用 cap、len 获取缓冲区大小和当前缓冲数。

func main() {
	quit := make(chan struct{})
	data := make(chan int, 3)
	data <- 11
	data <- 22
	data <- 33
    
    println(cap(data), len(data))  // 3 3
	go func() {
		defer close(quit)
		println(<- data)
		println(<- data)
		println(<- data)
		println(<- data)  // block
	}()
	data <- 44
	<- quit
}

缓冲区大小仅是内部属性,不属于类型组成部分。
通道变量本身就是指针,可判断是否为同一对象或 nil。

func main() {
	var a, b chan int = make(chan int, 3), make(chan int)
	var c chan bool
    
	println(a == b)               // false
	println(c == nil)             // true
	println(a, unsafe.Sizeof(a))  // 0xc..., 8
}

# 关闭

对于 closednil 通道,规则如下:

无论收发, nil 通道都会阻塞。
不能关闭 nil 通道。

重复关闭通道,引发 panic
向已关闭通道发送数据,引发 panic
从已关闭通道接收数据,返回缓冲数据或零值。

提示, nil 通道是指没有 make 的变量。
鉴于通道关闭后,所有基于此的阻塞都被解除,可用作通知。

没有判断通道是否已被关闭的直接方法,只能透过收发模式获知。

func main() {
	c := make(chan int)
	close(c)
	// close(c) ~ panic: close of closed channel
    
    // 不会阻塞,返回零值。
	println(<- c) // 0
	println(<- c) // 0
}
func main() {
	c := make(chan int, 2)
	c <- 1
	close(c)	
    // 不会阻塞,返回零值。
	println(<- c)  // 1
	println(<- c)  // 0
	println(<- c)  // 0
}

为避免重复关闭,可包装 close 函数。
也可以类似方式封装 sendrecv 操作。

func closechan[T any](c chan T) {
	defer func(){
		recover()
	}()
	close(c)
}
func main() {
	c := make(chan int, 2)
	closechan(c)
	closechan(c)
}

保留关闭状态。注意,为并发安全,关闭和获取关闭状态应保持同步。
可使用 sync.RWMutexsync.Once 优化设计。

type Queue[T any] struct {
	sync.Mutex	
	ch     chan T
	cap    int
	closed bool
}
func NewQueue[T any](cap int) *Queue[T] {
	return &Queue[T]{
		ch: make(chan T, cap),
	}
}
func (q *Queue[T]) Close() {
	q.Lock()
	defer q.Unlock()
	if !q.closed {
		close(q.ch)
		q.closed = true
	}
}
func (q *Queue[T]) IsClosed() bool {
	q.Lock()
	defer q.Unlock()
	return q.closed
}
func main() {
	var wg sync.WaitGroup
	q := NewQueue[int](3)
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			defer q.Close()
			println(q.IsClosed())
		}()
	}
	wg.Wait()
}

利用 nil 通道阻止退出。

func main() {
    <-(chan struct{})(nil)    // select{}
}

# 收发

正常接收操作是 ok-idiomrange 模式。

ok == false :通道被关闭。
for ... range :循环,直至通道关闭。

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
	c := make(chan int)
    
	go func() {
		defer wg.Done()
		for {
			x, ok := <-c
			if !ok { return }
			println(x)
		}
	}()
    
    go func() {
    	defer wg.Done()
		defer close(c)
		c <- 1
		c <- 2
		c <- 3	    
	}()
	wg.Wait()
}
func main() {
	var wg sync.WaitGroup
	wg.Add(2)
	c := make(chan int)
    
	go func() {
		defer wg.Done()
		for x := range c{
			println(x)
		}
	}()
    
    go func() {
    	defer wg.Done()
		defer close(c)
		c <- 1
		c <- 2
		c <- 3	    
	}()
	wg.Wait()
}

及时关闭通道,否则可能导致死锁。
fatal error: all goroutines are asleep - deadlock!

# 单向

通道默认双向,不区分发送和接收端。可限制方向获得更严谨的操作逻辑。

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
	c := make(chan int)
	var send chan<- int = c
	var recv <-chan int = c
    // recv
	go func() {
		defer wg.Done()
		for x := range recv {
			println(x)
		}
	}()
    // send
	go func() {
		defer wg.Done()
		defer close(c)
		for i := 0; i < 3; i++ {
			send <- i
		}
	}()
	wg.Wait()
}

不能在单向通道上做逆向操作。

func main() {
	c := make(chan int, 2)
	
    var send chan<- int = c
	var recv <-chan int = c
    
	// <-send  // ~ cannot receive from send-only channel
	// recv <- 1 // ~ cannot send to receive-only channel
	// close(recv) // ~ cannot close receive-only channel
}

无法将单向通道转换回去。

func main() {
	var a, b chan int
    
	a = make(chan int, 2)
	var recv <-chan int = a
	var send chan<- int = a
    
	// b = (chan int)(recv) // ~ cannot convert recv (<-chan int) to type chan int
	// b = (chan int)(send) // ~ cannot convert send (chan<- int) to type chan int
}

尽管可用 make 创建单向通道,但没有任何意义,因为通道必须同时有收发行为。

func main() {
	q := make(<-chan struct{})
	go func(){
		close(q) // ~ cannot close receive-only channel
	}()
	<- q
}

选择

select 语句处理多个通道,随机选择可用通道做收发操作。
将失效通道置为 nil (阻塞,不可用),用作结束判断。

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
    
	cha, chb := make(chan int), make(chan int)
    
    // recv
	go func() { 
		defer wg.Done()
        
		for {
			x := 0
			ok := false
			// random
			select {
			case x, ok = <-cha: if !ok { cha = nil }
			case x, ok = <-chb: if !ok { chb = nil }
			}
            
			if (cha == nil) && (chb == nil) { return }
			println(x)
		}
	}()
    
    // send
	go func() { 
		defer wg.Done()
		defer close(cha)
		defer close(chb)
        
		for i := 0; i < 10; i++ {
            // random
			select { 
			case cha <- i:
			case chb <- i * 10:
			}
		}
	}()
    
	wg.Wait()
}

即便是同一通道,也会随机选择 case 执行。

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
    
	cha := make(chan int)
    
    // recv
	go func() { 
		defer wg.Done()
        
		for {
			x := 0
			ok := false
			// random
			select {
			case x, ok = <-cha: println("c1", x)
			case x, ok = <-cha: println("c2", x)
			}
            
			if !ok { return }
		}
	}()
    
    // send
	go func() { 
		defer wg.Done()
		defer close(cha)
        
		for i := 0; i < 10; i++ {
            // random
			select { 
			case cha <- i:
			case cha <- i * 10:
			}
		}
	}()
    
	wg.Wait()
}

所有通道都不可用,则执行 default 分支,避免阻塞。

func main() {
	exit := make(chan struct{})
	c := make(chan int)
    
	go func() {
		defer close(exit)
        
		for {
			select {
			case x, ok := <-c:
				if !ok { return }
				println(x)
			default: 
			}
            
			fmt.Println("wait...")
			time.Sleep(time.Second)
		}
	}()
    
	time.Sleep(time.Second * 3)
    
	c <- 100
	close(c)
    
	<-exit
}

空 select 语句,一直阻塞或死锁。

select{}

缺省

利用 default 实现判断逻辑。

func main() {
	done := make(chan struct{})
    
    // 多槽构成的数据区。
	data := []chan int{ 
		make(chan int, 3),
	}
	go func() {
		defer close(done)
        
		for i := 0; i < 10; i++ {
			// 向最后一个槽添加数据。
			// 添加失败(已满),则创建新槽。
			select {
			case data[len(data)-1] <- i:
			default:
				data = append(data, make(chan int, 3))
				data[len(data)-1] <- i
			}
		}
	}()
	<-done
	for _, c := range data {
		close(c)
		for x := range c {
			println(x)
		}
	}
}

反射

如果运行期才能确定通道数量,可利用反射( reflect )实现。

func main() {
	exit := make(chan struct{})
    // 运行时动态创建。
	chans := make([]chan int, 0)
	chans = append(chans, make(chan int))
	chans = append(chans, make(chan int))
	go func() {
		defer close(exit)
        // 反射构建 select 操作。
		cases := make([]reflect.SelectCase, len(chans))
		for i, c := range chans {
			cases[i] = reflect.SelectCase {
                Dir: reflect.SelectRecv,
                Chan: reflect.ValueOf(c),
            }
		}
		for {
			index, value, ok := reflect.Select(cases)
            
            // 检查并退出。
			if !ok {
				chans[index] = nil
				n := 0
				for _, c := range chans {
					if c == nil { n++ }
					if n == len(chans) { return }
				}
				continue
			}
			println(index, value.Int(), ok)
		}
	}()
	chans[1] <- 101
	chans[0] <- 100
	for _, c := range chans {
		close(c)
	}
	<- exit
}

# 使用

通常以工厂方法将 goroutine 和 channel 绑定。

func newRecv[T any](cap int) (data chan T, done chan struct{}) {
	data = make(chan T, cap)
	done = make(chan struct{})
	go func() {
		defer close(done)
		for v := range data {
			println(v)
		}
	}()
	return
}
func main() {
	data, done := newRecv[int](3)
	for i := 0; i < 10; i++ {
		data <- i
	}
	close(data)
	<- done
}

# 超时

如果 channel 阻塞且没有关闭,那么可能导致 goroutine 泄漏(leak)。
解决办法是用 select default ,或 time.After 设置超时。

func main() {
	quit := make(chan struct{})
	c := make(chan int)
	go func() {
		defer close(quit)
		select {
		case x, ok := <- c: println(x, ok)
		case <- time.After(time.Second): return
		}
	}()
	<- quit
}

# 信号量

用通道实现信号量(semaphore),在同一时刻仅指定数量的 goroutine 参与工作。

type Sema struct {
	c chan struct{}
}
func NewSema(n int) *Sema {
	return &Sema{
		c: make(chan struct{}, n),
	}
}
func (m *Sema) Acquire() {
	m.c <- struct{}{}
}
func (m *Sema) Release() {
	<- m.c
}
func main() {
	var wg sync.WaitGroup
    // runtime: 4
    //    sema: 2
	runtime.GOMAXPROCS(4)
	sem := NewSema(2)
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			sem.Acquire()
			defer sem.Release()
			for n := 0; n < 3; n++ {
				time.Sleep(time.Second * 2)
				fmt.Println(id, time.Now())
			}
		}(i)
	}
	wg.Wait()
}

# 对象池

鉴于通道本身就是一个并发安全的队列,可用作 ID generator、Pool 用途。

type Pool[T any] chan T
func NewPool[T any](cap int) Pool[T] {
	return make(chan T, cap)
}
func (p Pool[T]) Get() (v T, ok bool) {
	select {
	case v = <-p: ok = true
	default:
	}
	return
}
func (p Pool[T]) Put(v T) bool {
	select {
	case p <- v: return true
	default:
	}
	return false
}
// -----------------------------
func main() {
	p := NewPool[int](2)
	println(p.Put(1))
	println(p.Put(2))
	println(p.Put(3))
	for {
		v, ok := p.Get()
		if !ok { break }
		println(v)
	}
}

# 退出

捕获 INTTERM 信号,顺便实现一个简易的 atexit 和 函数。

import (
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)
var exs = &struct {
	sync.RWMutex
	funcs   []func()
	signals chan os.Signal
}{}
func atexit(f func()) {
	exs.Lock()
	defer exs.Unlock()
	exs.funcs = append(exs.funcs, f)
}
func wait(code int) {
    
    // 信号注册。
	if exs.signals == nil {
		exs.signals = make(chan os.Signal)
		signal.Notify(exs.signals, syscall.SIGINT, syscall.SIGTERM)
	}
    // 独立函数,确保 atexit 函数能按 FILO 顺序执行。
    // 不受 os.Exit 影响。
	func() {
		exs.RLock()
		for _, f := range exs.funcs {
			defer f() 
		}             
		exs.RUnlock()
		<-exs.signals
	}()
    // 终止进程。
	os.Exit(code)
}
func main() {
	atexit(func() { 
		time.Sleep(time.Second * 3) 
		println("1 ..."); 
	})
	atexit(func() { 
		println("2 ...") 
	})
	println("Press CTRL + C to exit.")
	wait(1)
}

# 队列

通道本就是队列,需要关心的是如何优雅(gracefully)地关闭通道。

func main() {
	max := int64(100)  // 最大发送计数。
	m   := 3           // 接收者数量。
	n   := 3           // 发送者数量。
	var wg sync.WaitGroup
	wg.Add(m + n)
	data := make(chan int)        // 数据通道。
	done := make(chan struct{})   // 结束通知。
    
	// m recv
    for i := 0; i < m; i++ {	
		go func() {
			defer wg.Done()
			for {
				select {
				case <- done: return
				case v := <- data: println(v)
				}
			}
		}()
	}
    // n send
    for i := 0; i < n; i++ {
		go func(id int) {
			defer wg.Done()
			defer func(){ recover() }()
			for {
				select {
				case <- done: return
				case data <- id:
					if atomic.AddInt64(&max, -1) <= 0 {
						close(done)
						return
					}
				default:
				}
			}
		}(i)
	}
    
    wg.Wait()
}

# 性能优化

将发往通道的数据打包,减少传输次数,可有效提升性能。
通道内部实现有锁和数据复制操作,单次发送更多数据(批处理),可改善性能。

const (
	MAX    = 500000 // 数据统计上限。
	BLOCK  = 500      // 数据块大小。
	CAP    = 100      // 缓冲区大小。
)
//go:noinline
func normal() {
	done := make(chan struct{})
	c := make(chan int, CAP)
	go func() {
		defer close(done)
		count := 0
		for x := range c {
			count += x
		}
	}()
	for i := 0; i < MAX; i++ {
		c <- i
	}
	close(c)
	<-done
}
//go:noinline
func block() {
	done := make(chan struct{})
	c := make(chan [BLOCK]int, CAP)
	go func() {
		defer close(done)
		count := 0
		for a := range c {
			for _, x := range a {
				count += x
			}
		}
	}()
	for i := 0; i < MAX; i += BLOCK {
        
		// 使用数组对数据打包。
		var b [BLOCK]int
		for n := 0; n < BLOCK; n++ {
			b[n] = i + n
			if i + n == MAX - 1 {
				break
			}
		}
        
		c <- b
	}
	close(c)
	<-done
}
func BenchmarkNormal(b *testing.B) {
	for i := 0; i < b.N; i++ {
		normal()
	}
}
func BenchmarkBlock(b *testing.B) {
	for i := 0; i < b.N; i++ {
		block()
	}
}
cpu: 12th Gen Intel(R) Core(TM) i7-12700
BenchmarkNormal-20            55          21640556 ns/op            1095 B/op          3 allocs/op
BenchmarkBlock-20           1826            650704 ns/op          401544 B/op          3 allocs/op

# 泄露

如果通道一直处于阻塞状态,那么会导致 goroutine 无法结束和回收,形成资源泄露。

func test() chan byte {
	c := make(chan byte)
	go func() {
		buf := make([]byte, 0, 10<<20) // 10MB
		for {
			d, ok := <- c
			if !ok { return }
			buf = append(buf, d)
		}
	}()
	return c
}
func main() {
	for i := 0; i < 5; i++ {
		test()
	}
	for {
		time.Sleep(time.Second)
		runtime.GC()
	}
}
# 因 goroutine 无法结束,其持有的 buf 内存无法回收。
$ GODEBUG=gctrace=1 ./test
gc 1 @0.002s 20%: ... 50->50->50 MB,  50 MB goal, ...
gc 2 @1.002s  0%: ... 50->50->50 MB, 100 MB goal, ... (forced)
gc 3 @2.009s  0%: ... 50->50->50 MB, 100 MB goal, ... (forced)
...
gc 7 @6.029s  0%: ... 50->50->50 MB, 100 MB goal, ... (forced)
# 可以观察到这些 goroutine 的状态。
$ GODEBUG="schedtrace=1000,scheddetail=1" ./test
SCHED 3008ms: gomaxprocs=2 idleprocs=2 threads=4 ...
  
  G1: status=4(sleep)
  G2: status=4(force gc (idle))
  G3: status=4(GC sweep wait)
  G4: status=4(GC scavenge wait)
  
  G5: status=4(chan receive) m=-1 lockedm=-1
  G6: status=4(chan receive) m=-1 lockedm=-1
  G7: status=4(chan receive) m=-1 lockedm=-1
  G8: status=4(chan receive) m=-1 lockedm=-1
  G9: status=4(chan receive) m=-1 lockedm=-1
  
  G10: status=4(GC worker (idle))
  G11: status=4(GC worker (idle))

通过添加 time.After 之类手段,让通道有解除阻塞的机会。
另外,不当使用 time.Tick ,也会引发泄漏。

# 同步

通道并非用来取代锁,各有不同使用场景。
通道解决高级别逻辑层次并发架构,锁则用来保护低级别局部代码安全。

临界条件:多线程同时读写共享资源(临界资源)。
临界区:读写临界资源的代码片段。

互斥锁:同一时刻,只有一个线程能进入临界区。
读写锁:写独占(其他读写均被阻塞),读共享。
信号量:允许指定数量线程进入临界区。
自旋锁:失败后,以循环积极尝试。(无上下文切换,小粒度)

悲观锁:操作前独占锁定。
乐观锁:假定无竞争,后置检查。(Lock Free, CAS)

标准库 sync 提供了多种锁,另有原子操作等。

Mutex :互斥锁。
RWMutex :读写锁。

WaitGroup :等待一组任务结束。
Cond :单播或广播唤醒其他任务。
Once :确保只调用一次(函数)。

Map :并发安全字典,(少写多读,数据不重叠)
Pool :对象池。(缓存对象可被回收)

# 竞争检测

测试阶段,以 -race 编译,注入竞争检查(data race detection)指令。

有较大性能损失,避免在基准测试和发布版本中使用。
有不确定性,不能保证百分百测出。
单元测试有效完整,定期执行竞争检查。

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
	x := 0
	go func() {
		defer wg.Done()
		x++
	}()
	go func() {
		defer wg.Done()
		println(x)
	}()
	wg.Wait()
}
$ go build -race && ./test
==================
WARNING: DATA RACE
Read at 0x00c0000160d8 by goroutine 7:
  main.main.func2()
      /root/go/test/main.go:20 +0x74
Previous write at 0x00c0000160d8 by goroutine 6:
  main.main.func1()
      /root/go/test/main.go:15 +0x86
Goroutine 7 (running) created at:
  main.main()
      /root/go/test/main.go:18 +0x1d6
Goroutine 6 (finished) created at:
  main.main()
      /root/go/test/main.go:13 +0x12e
      
==================
1
Found 1 data race(s)

# 互斥锁

文档中标明 “must not be copied”,应避免复制导致锁机制失效。

type data struct {
	sync.Mutex
}
// go vet: passes lock by value
func (d data) test(s string) { 
	d.Lock()
	defer d.Unlock()
}

控制在最小范围内,及早释放。

// 错误用法
func {
	m.Lock()
    defer m.Unlock
    
	url := cache["key"]
	get(url)            // 该操作并不需要保护,延长锁占用。
}
// 正确用法
func {
	m.Lock()
	url := cache["key"]
	m.Unlock()
    
	get(url)
}

不支持递归锁。

func main() {
	var m sync.Mutex
	m.Lock()
    
	{
		// m.Lock() // ~ fatal error: all goroutines are asleep - deadlock!
		// m.Unlock()
	}
    
	m.Unlock()
}

# 读写锁

某些场合以读写锁替代互斥锁,可提升性能。

func main() {
	var wg sync.WaitGroup
	var rw sync.RWMutex
	x := 0
	// 1 写
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < 5; i++ {
			rw.Lock()
			time.Sleep(time.Second) // 模拟长时间操作!
			now := time.Now().Format("15:04:05")
			x++
			fmt.Printf("[W] %d, %v\n", x, now)
			rw.Unlock()
		}
	}()
	//n 读
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			for n := 0; n < 5; n++ {
				rw.RLock()
				time.Sleep(time.Second)
				now := time.Now().Format("15:04:05")
				fmt.Printf("    [R%d] %d, %v\n", id, x, now)
				rw.RUnlock()
			}
		}(i)
	}
	wg.Wait()
}
/*
[W] 1, 11:23:17         // 独占
    [R4] 1, 11:23:18    // 并发
    [R3] 1, 11:23:18
    [R1] 1, 11:23:18
    [R0] 1, 11:23:18
    [R2] 1, 11:23:18
[W] 2, 11:23:19
    [R4] 2, 11:23:20
    [R3] 2, 11:23:20
    [R1] 2, 11:23:20
    [R0] 2, 11:23:20
    [R2] 2, 11:23:20
*/

# 条件变量

内部以计数器和队列作为单播( signal )和广播( broadcast )依据。
引入外部锁作为临界资源保护,可与其他逻辑同步。

func main() {
	var wg sync.WaitGroup
	cond := sync.NewCond(&sync.Mutex{})
	data := make([]int, 0)
	// 1 写
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < 5; i++ {
            
            // 保护临界资源。
			cond.L.Lock()
			data = append(data, i + 100)
			cond.L.Unlock()
            
            // 唤醒一个。
			cond.Signal()
		}
        // 唤醒所有(剩余)。
		// cond.Broadcast()
	}()
	//n 读
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
            
            // 锁定临界资源。
			cond.L.Lock()
            // 循环检查是否符合后续操作条件。
            // 如条件不符,则继续等待。
			for len(data) == 0 {
				cond.Wait()
			}
			x := data[0]
			data = data[1:]
			cond.L.Unlock()
			println(id, ":", x)
		}(i)
	}
	wg.Wait()
}

为什么 Wait 之前必须加锁?除锁定临界资源外,还与其内部设计有关。

// cond.go
func (c *Cond) Wait() {
	c.checker.check()
	t := runtime_notifyListAdd(&c.notify)
	c.L.Unlock()
	runtime_notifyListWait(&c.notify, t)
	c.L.Lock()
}

# 单次执行

确保仅执行一次,无论后续是同一函数或不同函数都不行。

func main() {
	var once sync.Once
	f1 := func() { println("1") }
	f2 := func() { println("2") }
	once.Do(f1)
    
    // 以下目标函数不会执行。
	once.Do(f1) 
	once.Do(f2)
	once.Do(f2)
}
// 1

以内部状态(done)记录第一次执行,与具体什么函数无关。

func main() {
	var once sync.Once
	once.Do(func() { println("1") })
	once.Do(func() { println("1") })
	once.Do(func() { println("2") })
	once.Do(func() { println("2") })
}
// 1
更新于
-->