1.048596

并发编程Go实现及其上下文控制

相较于其他语言,Go 在并发编程方面有得天独厚的优势,在语言层面上支持了一种协程即用户态线程来实现并发编程。轻量级的协程带来了更快的上下文切换,以及不用涉及内核态转换的优势。能够高效的进行并发编程的开发,并且拥有还不错的性能。

了解轻量级协程

有些语言直接操作的是内核态的线程,由操作系统的内核来调度,所以相对比较“笨重”,通常消耗 2MB 或者更多的栈内存,而且在切换时会陷入到操作系统内核态。而 Go 语言中内置的协程调度,它创建的线程是用户态的,通过 Go 应用自身调度而非操作系统内核的调度,直接实现了应用层面的多协程切换。通常一个 goroutine 只占 2KB 的栈内存,而且上下文切换也更加迅速。理论上可以轻松维护 10 万级别的协程,但协程过多也有其副作用,下文中也会详细说明。

调度模型

首先 GMP 模型是 Go 语言的多线程模型,主要是由以下三部分组成。其实这也是通过多年的演进,早期的 Go 语言并不是如此,本文基于目前绝大多数使用的 1.16~1.19 版本进行说明。

  • Goroutine

代表了 Go 语言里的线程,包含了线程的函数和堆栈信息。

  • Processor

代表对处理器的逻辑抽象,协程需要它才能执行,具有一定上下文和缓存。

  • Machine

对内核线程的封装,是真正运行线程的对象,负责协程实际运行。

抢占式调度

目前 Go 的调度规则是一种基于协作的抢占式调度。首先编译器会在调用函数前插入 runtime.morestack 代码,并且在 GC 时会检查运行超过 10ms 的协程标记为 StackPreempt,发生函数调用时会执行 runtime.newstack 的函数抢占目前标记为 StackPreempt 的协程来实现抢占。

后续又添加了一种基于信号的抢占式调度来解决触发抢占时机局限于函数调用的问题。在垃圾回收时触发并发出 SIGURG 信号,由程序注册的信号函数 runtim.doSigPreempt 处理并让当前函数设置为休眠状态让出线程,来实现其他函数抢占线程得以被执行。

多协程编程

并发和并行

多线程离不开并发和并行的概念。首先强调的是并发不等于并行,前者是某一时段同时执行多个任务,后者是某一时刻同时执行多个任务,并发是并行的超集。

并发

单核 CPU,逻辑上同时执行

并发是指两种或两种以上的行为在系统中同时存在,至于这两个行为是否在某一时刻同时执行,在并发的概念中并不考虑。绝大多数时间都是核心切换线程太快让人感觉上是同时执行,但其实某一时刻 CPU 核心还是只能够处理一个指令,也就是说只能处理一个任务。

并行

多核 CPU,物理机上的同时执行

这种常常是多 CPU 核心时实现的效果,一般多处理器会将多个任务交给多个核心同时处理。由于每个单独的 CPU 核心只能够串行的执行指令,多个核心才能实现真正的在同一时刻并发的处理多个任务。

协程的创建

这里的使用线程主要指的是 goroutine 而不是系统的内核线程,从应用层面上调度的协程更快。启动 Go 线程的时候需要使用关键字 go 加上需要协程中执行的函数,这个函数会被异步执行,同时它的返回值会被忽略。下面是最普通的一种写法,会隐藏 panic 掉整个进程的风险,之后会介绍一种更加安全的封装。

// An anonymous function method is used to start the thread.
go func() {
	for i := 0; i < 10; i++ {
		fmt.Println("Hello World!")
	}
}()

在协程函数结束之前可以用 recover() 捕获异常,下面是一种比较基础的封装,如果需要更多的函数参数可以利用反射 interface 来支持通用的函数参数列表,在很多开源库中也不乏这样的实现。

func SafeGo(do func())  {
	go func() {
		defer func() {
			// catch panic in do() avoid crash
			if err := recover(); err != nil {
				fmt.Println(fmt.Sprintf("panic %s\n", err))
				fmt.Println(fmt.Sprint(string(debug.Stack())))
			}
		}()
		do()
	}()
}

协程调度函数

这里会有一些相应的协程调度函数,但 Go 语言官方并不建议使用它们。理论上如果是如此严苛的追求性能,应该使用的一定不是 Go 语言这种带有运行时环境且依赖 GC 的语言,所以一笔带过这些违背了语言初衷的函数。相比之下 Go 更像是一种能够快速开发且具有一定性能保证的程序语言,绝非极致性能的追求者。

  • Goexit()

可以直接结束当前调用协程的运行,但不会抛出 panic 异常。

  • Gosched()

暂停当前线程的执行,使其他线程先行运算。

  • GOMAXPROCS()

这个比较常用而且重要,可以设置同时执行协程的逻辑 CPU 数量,默认是和硬件的线程数保持一致。

// It can get the current number of logical processors.
now := GOMAXPROCS(-1)

协程间通信

有很多线程间通讯的方式,最为常用的就是通过共享内存。在 Go 语言中并不推荐这种方式,因为会带来锁的使用以及一些并发安全问题。更偏向于使用 channel 作为 Golang 中协程通信的手段,因为它是原生并发安全的,而且能够减少代码层面上的依赖。在这之前首先要先了解一下通道的底层实现,channel 底层采用的是一个加锁的循环队列,并发执行时 goroutine 之间会争抢锁的使用权。结构体中我们可以看出有读写的协程等待队列,会根据情况安排获取全局锁的协程。

type hchan struct {
	qcount   uint
	dataqsiz uint
	buf      unsafe.Pointer
	// points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type
	sendx    uint
	recvx    uint
	recvq    waitq
	sendq    waitq
	// Lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	lock mutex
}

使用 Channel

使用 make 函数来给通道分配内存空间,通道的主要分为两大类,一种是无缓冲,一种是有缓冲。缓冲区是为了一定程度上防止阻塞而设立的,但如果缓冲区已经装满,那么新的写操作也还是会被阻塞,直到缓冲区域重新有位置的时候才进行写入。

  • 通道的缓冲

通道的缓冲对于线程之间的通信很重要,有缓冲的通道主要用于传输数据,无缓冲的通道主要用于通知。在程序的底层行为上会有一定的不同,这里一位大佬的文章 https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/ 讲解更加细致。

// The channel of work task class has buffer.
jobs := make(chan int, 10)
// Notification channel has no buffer.
done := make(chan bool)

作为函数参数

如果作为函数参数就要看下通道的类型了,通道有的只能读,有的只能写,有的能读能写。这些在函数参数的书写的时候都是可以定义的,使用时会有一定的参数类型校验。

func Work(jobs <-chan *job done chan<- bool) {}
// Read or write according to the direction of the arrow.

多路复用

实际使用中一个协程可能会同时监听多个通道传来的值,这时候需要用到 select 语句从里面选择一个操作执行,来协调多个通道的消费。语义上类似 Unix 系统中提供的 IO 多路复用函数。类似于 switch 的语法,从 case 的通道中取值,这里不是按值判断,而是选取一个有内容的通道并从中取值。这也体现出了语言中的扇入的理念,把多个通道合并操作。

case data := <-ch1:
  // code...
case data := <-ch2:
  // code...
default:
  // code...
  1. 查找是否已经就绪的 Channel 并从中取值
  2. 将当前 goroutine 加入到这些 Channel 的等待队列
  3. Channel 唤醒当前协程并使用 select 取值

通知退出机制

关闭被 select 监听的其中一个通道会被立即感知,然后可以进行相应的操作,这就实现了通知退出机制。这样可以实现较简单的反馈,之后可以使用 context 封装的通知机制,但其底层依然是这个原理。当标识通道有元素的时候,select 会感知出来并执行相应的语句,也可以起到通知操作的作用。

var done chan struct{}
func random(done chan struct{}) chan int {
	ch := make(chan int, 10)
label:
	for {
		num, _ := rand.Int()
		select {
		case ch <- num:
		case <-done:
			// jump out of code snippet
			break label
		}
	}
	return ch
}
func main() {
	done := make(chan struct{}, 1)
	ch := random(done)
	// put the data to the channal
	done <- struct{}{}
	close(done)
}

通道使用踩坑

通道的 Panic

向已经关闭的通道写数据和重复关闭通道都会 panic

通道的阻塞

向未初始化内存的通道读写会使当前线程永久阻塞

向缓冲区满的通道写存数据会阻塞线程,后续能够写入时继续执行

通道中没有数据读取则会阻塞线程,后续能够读取时继续执行

应该注意的是,阻塞很正常,只是一种等待,但是永久阻塞就会使线程报废。

通道的非阻塞

读取已经关闭的的通道会返回零值,不会阻塞

对未满的通道进行读写不会阻塞

并发编程范式

有很多固定的优秀并发编程范式可以去借鉴,掌握了范式就相当于站在了巨人的肩膀上。类似于设计模式,需要根据实际情况来使用,大多数情况不要使用过于复杂的架构,简单的就是最好用且最容易维护的,不要一时炫技导致后人维护时可读性太差。

生产者模型

生产者顾名思义是用来生成全局的事务的,生成之后通过通道给不同的 Worker 线程来处理这些问题。

func GeneratorIntA(done chan struct{}) chan int {
	ch := make(chan int, 10)
	go func() {
	label:
		for {
			// stop by listening for a signal
			select {
			case ch <- rand.Int():
			case <-done:
				break label
			}
		}
		close(ch)
	}()
	return ch
}
func GeneratorIntB(done chan struct{}) chan int {
	ch := make(chan int, 10)
	go func() {
	label:
		for {
			// stop by listening for a signal
			select {
			case ch <- rand.Int():
			case <-done:
				break label
			}
		}
		close(ch)
	}()
	return ch
}
func GeneratorInt(done chan struct{}) chan int {
	ch := make(chan int, 20)
	go func() {
	Label:
		for {
			select {
			// total generator multiplexing
			case ch <- <-GeneratorIntA(done):
			case ch <- <-GeneratorIntB(done):
			case <-done:
				break Label
			}
		}
		close(ch)
	}()
	return ch
}
func main() {
	done := make(chan struct{})
	ch := GeneratorInt(done)
	for i := 0; i < 10; i++ {
		fmt.Println(<-ch)
	}
	done <- struct{}{}
	fmt.Println("stop generator")
}

链式管道

如果一个函数的输入输出都是通过通道来实现的,那么很多这样的函数链式调用就可以形成一个调用链

func Chain(in chan int) chan int {
	out := make(chan int)
	go func() {
		for v := range in {
			out <- v + 1
		}
		close(out)
	}()
	return out
}
func main() {
	in := make(chan int)
	go func() {
		for i := 0; i < 10; i++ {
			in <- i
		}
		close(in)
	}()
	// equivalent to a batch process
	out := Chain(Chain(Chain(in)))
	for v := range out {
		fmt.Println(v)
	}
}

一对一处理

当收到一个任务的时候就开启一个 goroutine 来处理这类业务,当少量并发的时候就很好用且思路简单。比如对于小流量网站的 Web 服务器来应答请求的时候,一个请求就可以开启一个线程。

// Server processing request function which use tcp connection.
func HandleFunc(conn *net.TCPConn) {
	defer conn.Close()
	for {
		buf := make([]byte, 256)
		cnt, err := conn.Read(buf)
		if err != nil {
			return
		}
	}
}
func main() {
	localAddr, err := net.ResolveTCPAddr("tcp", "192.168.124.4:8090")
	if err != nil {
		return
	}
	listener, err := net.ListenTCP("tcp", localAddr)
	if err != nil {
		return
	}
	for {
		conn, err := listener.AcceptTCP()
		if err != nil {
			return
		}
		// process connection request
		go HandleFunc(conn)
	}
}

工作协程池

协程并不是随意开启的,虽然 Go 中的线程是很轻量的协程,但是还是需要加以限制。否则会带来大量的协程上下文切换,反而降低了有效的运算时间。通常我们需要根据运行平台的情况开辟协程池,构成一个工作池来用固定数量的线程处理问题。下面的例子只是最简单的用例,之后还可以加上通知通道来控制协程池的结束。

type Task struct {
	ID      int
	randnum int
}
type Result struct {
	task   Task
	result int
}
var tasks = make(chan Task, 10)
var results = make(chan Result, 10)
func Process(num int) int {
	sum := 0
	for num != 0 {
		digit := num % 10
		sum += digit
		num /= 10
	}
	return sum
}
func Worker(wg *WaitGroup) {
	defer wg.Done()
	// Constantly read new task processing from tasks channel.
	for task := range tasks {
		result := Result{task, Process(task.randnum)}
		results <- result
	}
}
func CreateWorkerPool(number int) {
	var wg sync.WaitGroup
	// Construct multiple workers to handle tasks.
	for i := 0; i < number; i++ {
		wg.Add(1)
		go worker(&wg)
	}
	wg.Wait()
	close(results)
}

Future 模式

当一个任务的多个子任务没有依赖关系,那么开启直接子任务的时候就可以利用并发来完成,而不是链式或者串行的完成这些任务。比如在用户注册后,我们需要发送邮件和短信通知。

type Query struct {
	sql    chan string
	result chan string
}
func ExecQuery(q Query) {
	go func() {
		// get input
		sql := <-q.sql
		q.result <- "result from " + sql
	}()
}
func main() {
	q := Query{make(chan string, 1), make(chan string, 1)}
	go ExecQuery(q)
	// Send parameters to the channel.
	q.sql <- "slect * from table"
	// Get results from the channel.
	fmt.Println(<-q.result)
}

Context 上下文

对于 Go 语言来说,各个线程是平行的关系,每一个线程都是同样等级不存在父子关系,所以不同功能层的协程结束的时候无法结束相关的其他派生协程,这时候需要生成一个 context 来作为异步函数的一个参数传入,之后就可以通过这个上下文参数来控制线程。

通知协程退出

这个包的设计目的之一是包装退出通知机制,处于一个 goroutine 调用树上每一个子结点通知皆可生效。当我们在上级协程调用 cancel() 时,可以在派生协程中监听到 ctx.Done() 通道中的消息以此来通知派生协程退出。

func Child(ctx context.Context) {
	for {
		select {
		// The notification exit mechanism is encapsulated.
		case <-ctx.Done():
			return
		default:
			// code...
		}
	}
}
func main() {
	// build a context
	ctx, cancel := context.WithCancel(context.Background())
	go Child(ctx)
	time.Sleep(10 * time.Second)
	cancel()
}

超时通知退出

设定超时时间后,子协程在超时之后就会被通知退出。需要注意的是,还有一种 Deadline() 的方式是传入特定时间来计算超时的,普通的超时是从线程开始运行的时候计时。这种方式通常运用在一些 RPC 或者 HTTP 框架中,防止某次请求的处理函数执行时间过长导致堆积。

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
	// Although this context has a time-limited exit, it can also actively call
	// cancel() to exit.
	go func(ctx context.Context) {
		select {
		case <-ctx.Done():
			fmt.Println("Time out")
			return
		default:
			// code...
		}
	}(ctx)
	cancel()
}

上下文传值

这就是 context 包的存在意义的第二个方面了,即数据可以通过上下文传递给该树上所有的协程。对于这种上下文的意义不在于退出,所以没有生成主动调用的 cancel() 函数对象。

func Something(ctx context.Context) {
	v, ok := ctx.Value("key").(string)
	if ok {
		fmt.Println("%v", v)
	}
}
func main() {
	ctx := context.WithValue(context.Background(), "key", "value")
	go Something(ctx)
}

这里的传递是使用键值对的方式进行传递,这两个分别都是空接口 interface{},所以理论上可以传入任何类型的值。但是需要知道的是,这原意并不是提供了一种传递参数的方法。这种传值被用在很多日志或者监控系统的链路追踪上。比如可以为一条调用链的 context.Context 做一个标记值,在这条链路中的日志统一打印标记,在日志中搜索这个标记就可以实现对调用链路的追踪。

type Context interface {
	// The empty interface channel notification exits.
	Done() <-chan struct{}
	Err() error
	// The deadline for the execution of the task.
	Deadline() (deadline time.Time, ok bool)
	// According to key to get the value stored in context.
	Value(key interface{}) interface{}
}

取消信号沿着树的叶子方向进行,每到一个结点都会像 BFS 一样关闭所有的子结点

如果父节点超时取消了,那么子节点也会被取消,而不是不受控制。即使还不到超时取消的条件