Go语言入门基础:协程
第21章 协程
目录
- 21.1 启动Go协程
- 协程的概念与特点
- 启动协程的方法及示例
- 协程执行顺序的控制
- 21.2 通道
- 通道类型及表示方式
- 21.2.1 实例化通道
- 21.2.2 数据缓冲
- 21.2.3 单向通道
- 21.2.4 通道与select语句
- 21.3 互斥锁
- 协程并发访问的逻辑问题
- 互斥锁的使用示例
- 21.4 WaitGroup类型
- WaitGroup类型的原理
- WaitGroup类型的使用示例
21.1 启动Go协程
Go语言的异步操作引入了协程(原单词为goroutines
)的概念,启动新的协程后会异步执行指定的函数。协程类似于线程,但它并非与线程一一对应,而是由Go运行时内部进行调度,有可能一个线程会运行多个协程,也有可能一个协程在不同线程间切换,这一切都是Go运行时自动分配和控制的。
Go协程没有名称,也没有ID值,程序代码无法获取协程的唯一标识。
应用程序在运行时至少会启动一个协程——执行main
函数的协程,此协程可以称为主协程,当该协程执行完毕后,整个程序就会退出。
在代码中开启新协程的方法很简单,只要在调用函数或方法时加上“go
”关键字即可。例如:
func test1() {fmt.Print("春")
}func test2() {fmt.Print("夏")
}func test3() {fmt.Print("秋")
}func test4() {fmt.Print("冬")
}func main() {// 启动四个新的协程go test1()go test2()go test3()go test4()
}
但是,运行上面代码后,屏幕上可能什么内容都没有输出。这是因为新开启的四个协程与主协程(共五个协程)都是独立执行的,四个新协程还没来得及打印消息,main
函数就退出了,进而导致整个程序的退出,所以屏幕上看不到任何输出。
在main
函数的最后增加一行对time.Sleep
函数的调用,让主协程暂停1秒钟。
func main() {// 启动四个新的协程go test1()go test2()go test3()go test4()// 暂停一下time.Sleep(time.Second)
}
由于主协程的执行时间被延长了,使得新启动的四个协程能够完成执行,再次运行上述程序,就能看到以下输出了:
春秋冬夏
此时,读者会发现,“春”“夏”“秋”“冬”四个字符的输出是无序的。不妨再执行三次上述程序,看看输出结果。
春冬秋夏 // 第一次
秋夏冬春 // 第二次
夏春秋冬 // 第三次
这是因为协程之间不仅是相互独立的,而且代码运行的顺序是随机的。如果需要控制它们的顺序,可以使用通道(channel
)。将上述程序代码进行以下修改:
var (A = make(chan int)B = make(chan int)C = make(chan int)D = make(chan int)
)func test1() {fmt.Print("春")A <- 1
}func test2() {<-Afmt.Print("夏")B <- 1
}func test3() {<-Bfmt.Print("秋")C <- 1
}func test4() {<-Cfmt.Print("冬")D <- 1
}func main() {// 启动四个新的协程go test1()go test2()go test3()go test4()<-D
}
变量A、B、C、D均为通道类型,支持传递int
类型的值。四个通道都是无缓冲的,通道的输入与输出必须同时进行,也就是说,如果有int
值写入通道,就得要求有其他对象同时将该值读出。
启动test1
函数时,test2
函数要等待通道A中的有输入的值才能读出,于是,test2
被阻塞(不能往后执行),test1
函数输出“春”之后,将整数值1写入通道A,此时test2
中顺利读到了通道A的值,就可以往下执行。
test2
、test3
、test4
函数的原理也一样,可以用下图来表示此过程。运行代码,这一次能得到预期的结果了。
春夏秋冬
21.2 通道
在Go的异步编程中,通道类型(channel
,类型名称为chan
)既可以用于协程之间的数据通信,也可以用于协程之间的同步。
通道类型有以下几种表示方式:
chan T
// 双向通道,既可以发送数据,也可以接收数据chan <- T
// 只能向通道发送数据<- chan T
// 只能从通道接收数据
其中,T
是通道中可存放的数据类型。例如:
chan int
上述格式表示双向通道,通道可以存放int
类型的值。
通道数据的输入输出是通过“<-
”运算符(称作“接收运算符”)来完成的。“<-
”位于通道变量之前表示从通道中接收数据;“<-
”位于通道变量后面表示向通道发送数据。
var ch = ……
ch <- 5 // 向通道发送数据
var n = <-ch // 从通道接收数据
注意上述代码中,<-ch
表达式仅表示从通道ch
中读出数据,若要将读出的值赋值给变量n
,则必须使用赋值运算符(=
)。
21.2.1 实例化通道
通道对象的实例是通过make
函数创建的,此函数可以创建切片、映射、通道类型的实例。
下面的代码创建一个可存储string
类型数据的通道实例。
var c = make(chan string)
也可以这样:
var c = make(chan string, 0)
make
函数的第二个参数(size
)表示通道对象的缓冲值,忽略此参数或者设置为0表示所创建的通道实例不使用缓冲。
下面的语句向通道发送数据。
c <- "hello"
然后可以从通道接收数据。
<-c
当不再使用通道实例时,可以调用close
函数将其关闭。
close(c)
通道常用于不同协程之间的通信,在同一个协程中使用通道意义不大。
21.2.2 数据缓冲
无缓冲的通道要求发送与接收操作同时进行——向通道发送数据的同时必须有另一个协程在接收。
请看下面的例子。
func main() {// 创建通道实例var mych = make(chan uint)// 启动新的协程go func() {fmt.Println("开始执行新协程")// 向通道发送数据mych <- 350fmt.Println("新协程执行完毕")}()// 暂停一下time.Sleep(time.Second)fmt.Println("主协程即将退出")
}
代码运行后输出的内容如下:
开始执行新协程
主协程即将退出
main
协程等待了1秒钟后退出,从输出结果可以看出,新启动的协程并没有完全被执行。程序在向通道mych
发送数据后就被阻塞,无法继续执行,这是因为整数350发送到通道后没有被及时被读取所致,解决方法是在main
函数中接收通道中的数据。修改后的代码如下:
func main() {// 创建通道实例var mych = make(chan uint)// 启动新的协程go func() {……}()// 从通道中接收数据<-mych……
}
由于新创建的协程与主协程是异步执行的,使得通道mych
的发送与接收行为可以同时完成,程序不会被阻塞,最终两个协程都顺利执行。
带缓冲的通道的读与写可以不同时进行,举个例子说明。
func main() {// 创建通道实例var mych = make(chan string, 1)go func() {fmt.Println("开始执行新的协程")// 向通道发送数据mych <- "Hello"fmt.Println("新协程执行完毕")}()// 暂停一下time.Sleep(time.Second * 2)fmt.Println("主协程即将退出")
}
这一次,虽然在主协程上没有接收通道中的数据,但程序可以正常完成执行,输出结果如下:
开始执行新的协程
新协程执行完毕
主协程即将退出
这是因为此次创建的通道实例是带缓冲的,缓冲的元素个数为1。所以,当新的协程代码向通道发送了一次数据后,数据会缓存在通道中,不要求立即被取出,代码就不会被阻塞——哪怕main
函数中未接收通道的数据也不会阻塞。
不过,若是通道中缓存的数据量已满,再次向通道发送数据就会被阻塞,直到数据被接收为止。就像下面这样:
func main() {// 创建通道实例var mych = make(chan string, 1)go func() {fmt.Println("开始执行新的协程")// 向通道发送数据mych <- "Hello"// 再发送一次就会阻塞mych <- "World"fmt.Println("新协程执行完毕")}()……
}
如果将make
函数的调用修改为:
var mych = make(chan string,2)
那么此时的缓存容量为2,发送两次数据不会被阻塞,当第三次向通道发送数据时就会阻塞。
21.2.3 单向通道
请看下面的代码。
var ch1 = make(<-chan bool)
var ch2 = make(chan<- bool)
ch1
为单向通道实例,只能从通道接收数据,不能向通道发送数据;ch2
也是单向通道实例,只能向通道发送数据,不能接收数据。
直接在代码中使用单向通道没有意义,因为数据无法完成输入和输出。不过,要是用于代码封装,作为数据进出的间接通道,单向通道就很合适。
下面的示例演示了单向通道的使用。
步骤1:定义demo
包,公开C
变量和Start
函数。
package demoimport "time"// 此变量仅在包内访问
var innerch = make(chan int, 1)
21.2.3 单向通道
var ch1 = make(<-chan bool)
var ch2 = make(chan<- bool)
ch1
为单向通道实例,只能从通道接收数据,不能向通道发送数据;ch2
也是单向通道实例,只能向通道发送数据,不能接收数据。
直接在代码中使用单向通道没有意义,因为数据无法完成输入和输出。不过,要是用于代码封装,作为数据进出的间接通道,单向通道就很合适。
下面的示例演示了单向通道的使用。
步骤1: 定义demo
包,公开C
变量和Start
函数。
package demoimport "time"// 此变量仅在包内访问
var innerch = make(chan int, 1)
// 此变量对外公开
var C <-chan int = innerch
// 此函数对外公开
func Start() {time.Sleep(time.Second * 2)innerch <- 10000
}
innerch
是双向通道,但只有demo
包内部才能访问。对外公开变量C
,类型为单向通道。经过封装后,外部代码只能访问C
来接收数据,不能发送数据,这样可以防止demo
包内部的数据被意外修改。
外部代码在使用demo
包时,先调用Start
函数,2秒钟后向通道(innerch
)发送整数值10000,此时外部代码只能通过变量C
来接收通道中的数据。
步骤2: 在主包中导入demo
包。
import (……"./demo"
)
步骤3: 在main
函数中先调用Start
函数,接着通过变量C
接收数据。
func main() {fmt.Println("等待结果……")demo.Start()var x = <-demo.Cfmt.Println("结果出来了")fmt.Printf("结果: %d\n", x)
}
步骤4: 运行示例程序,结果如下:
等待结果……
结果出来了
结果:10000
注意通道类型在单向与双向之间的转换规则,双向通道类型可以转换为单向通道类型。例如:
var ch1 chan float32 = make(chan float32, 0)
var ch2 <-chan float32 = ch1
var ch3 chan<- float32 = ch1
ch1
是双向通道类型,它既可以赋值给只接收数据的单向通道类型的变量,也可以赋值给只发送数据的单向通道类型的变量。
但是,只接收数据或者只发送数据的通道类型不能转换为双向通道类型。所以,下面代码会发生错误。
var ch4 <-chan int
var ch5 chan int = ch4
ch4
是单向通道类型的变量,ch5
为双向通道类型的变量,ch4
赋值给ch5
会发生错误。
21.2.4 通道与select
语句
select
语句跟switch
语句类似,都包含case
或default
子句。select
语句与通道一起使用,case
子句必须提供发送数据或者接收数据的操作。其格式如下:
select {
case ch <- n:……
case x <- ch :……
case <- ch :……
default:……
}
下面的示例演示了运用select
语句来生成范围为[1, 5]
的随机整数。
// 创建通道实例
var mych = make(chan int)
// 在新的协程上运行
go func() {select {case mych <- 1:case mych <- 2:case mych <- 3:case mych <- 4:case mych <- 5:}
}()
// 接收通道中的数据
n := <-mych
fmt.Printf("随机整数: %d\n", n)
上述代码中,首先调用make
函数创建一个无缓冲的通道实例,接着启动一个新的协程,在新协程的匿名函数中,使用select
语句和五个case
子句,子句中分别向通道发送数值1、2、3、4、5。五个case
子句只有一个会被执行,这个被执行的case
子句是由运行时随机选择的,而在main
协程中,<-mych
表达式接收的是被随机发送的值,于是便实现了产生随机整数的功能。
此示例的运行结果如下:
// 第一次运行
随机整数:2
// 第二次运行
随机整数:1
// 第三次运行
随机整数:5
结合通道和select
语句,也可以实现操作超时的功能。下面的示例实现一个简单的口算考试程序。程序运行后,随机生成两个100以内的整数值,然后要求用户口算出它们的和。用户需要在5秒钟内输入答题。
步骤1: 创建两个通道实例。
var (// 标志口算完毕chFinish = make(chan bool)// 标志已超时chTimeout = make(chan bool)
)
当用户输入口算结果,完成答题后,会发送数据到chFinish
通道;如果用户超出规定时间仍未完成答题,就会发送数据到chTimeout
通道。
步骤2: 定义一个常量,设定值为5,表示答题最长时间为5秒。
const maxSecond = 5
步骤3: 启动新的协程,处理生成口算题目以及用户答题逻辑。
go func() {rand.Seed(time.Now().UnixNano())// 产生100以内的随机整数a := rand.Intn(100)b := rand.Intn(100)var input int // 用户输入的计算结果r := a + b // 正确的计算结果fmt.Printf("题目: %d + %d =?\n", a, b)fmt.Print("请输入结果:")fmt.Scanln(&input)// 生成结果var cr bool = input == rchFinish <- cr
}()
步骤4: 再启动一个新的协程,负责计时,一旦超时,就会向chTimeout
通道发送数据。
go func() {time.Sleep(time.Second * maxSecond)chTimeout <- true
}()
步骤5: 在main
协程中,使用select
语句,并让各case
子句分别从前面创建的两个通道接收数据,最终给出考试结果。
select {
case res := <-chFinish:if res {fmt.Print("\n恭喜你,答对了\n")} else {fmt.Print("\n噢,答错了\n")}
case <-chTimeout:fmt.Print("\n很遗憾,时间到了\n")
}
步骤6: 当通道不再使用时,可将其关闭。
close(chFinish)
close(chTimeout)
如果用户能在规定的时间作答,就验证其输入的答案是否正确(由chFinish
通道中的值标识);如果用户超时未作答,则给出提示。
下面是三次运行该示例的结果。
// 正确作答
题目:9 + 33 =?
请输入结果:42
恭喜你,答对了// 答案不正确
题目:86 + 49 =?
请输入结果:112
噢,答错了// 超时未作答
题目:98 + 15 =?
请输入结果:
很遗憾,时间到了
21.3 互斥锁
当多个Go协程同时访问某一段代码时,会出现逻辑混乱的现象。举个例子,定义一个throw
函数,假设用于模拟抛球机工作。当小球的总数为0时,停止抛球。
func throw() {for {if Total < 1 {// 无球可抛时退出break}// 等待一下,抛球需要一定的时间time.Sleep(time.Millisecond * 300)Total --fmt.Printf("剩余 %d 个球\n", Total)}
}
在main
函数中启动四个新协程,表示四台抛球机在抛球,小球总数为20。
func main() {Total = 20for i := 0; i < 4; i++ {go throw()}// 暂停一下,等待其他协程完成time.Sleep(time.Second * 8)
}
然而,运行后会发现存在逻辑错误——剩余的小球总数会变为负数。
剩余18个球
剩余19个球
剩余17个球
剩余16个球
剩余15个球
剩余14个球
剩余13个球
剩余12个球
剩余11个球
剩余10个球
剩余9个球
剩余8个球
剩余7个球
剩余6个球
剩余5个球
剩余4个球
剩余3个球
剩余2个球
剩余1个球
剩余0个球
剩余-1个球
剩余-2个球
这是因为四个协程是相互独立的,它们同时执行throw
函数,当协程A判断还有剩余的球后,即将抛出一个球。正在此时协程B却把球抛出去了,而A根本不知道,于是它继续执行。也就是说,A并没有抛球,却把Total
减掉1。四个协程一起运行,这种情况会不断地发生,最终导致状态不统一,引发逻辑错误,就会出现剩余的小球总数为负数的结果。
要解决此问题,需要加一把 “锁” ,把抛一次球的整个过程锁定,只允许一个协程进行操作,其他协程“原地待命”。当这个协程抛完一次球,解除锁定,然后其他协程再去抛球。
接下来对上述例子进行修改,在throw
函数中加上互斥锁(sync
包公开的Mutex
类型)。
var locker = new(sync.Mutex)func throw() {for {// 此处开始上锁locker.Lock()if Total < 1 {// 无球可抛时退出break}// 等待一下,抛球需要一定的时间time.Sleep(time.Millisecond * 300)Total --fmt.Printf("剩余 %d 个球\n", Total)// 完成后要解锁locker.Unlock()}
}
互斥锁的锁定范围应覆盖从对Total
变量进行判断到将Total
变量减去1这个过程,在此过程中,始终只允许一个协程访问代码,防止Total
变量被意外更改。
Lock
方法与Unlock
方法的调用必须成对出现,即锁定资源后,要记得将其解锁,否则其他协程将永远无法访问资源。
经过修改后,就能得到正确的结果。
剩余19个球
剩余18个球
剩余17个球
剩余16个球
剩余15个球
剩余14个球
剩余13个球
剩余12个球
剩余11个球
剩余10个球
剩余9个球
剩余8个球
剩余7个球
剩余6个球
剩余5个球
剩余4个球
剩余3个球
剩余2个球
剩余1个球
剩余0个球
21.4 WaitGroup类型
sync.WaitGroup
类型内部维护一个计数器,某个Go协程调用Wait
方法后会被阻塞,直到WaitGroup
对象的计数器变为0。
调用Add
方法可以增加计数器的值,调用Done
方法会使计数器的值减1。实际上,Done
方法内部也调用了Add
方法,传递的参数值为-1。下面是Done
方法的源代码。
func (wg *WaitGroup) Done() {wg.Add(-1)
}
所以,调用Add
方法并向参数传递负值,也可以减少计数器的值。
在本章前面的各节中,有多个示例代码都会在main
函数结束之前调用time.Sleep
函数来让主协程暂停,用以等待其他协程执行完毕。就像下面这样:
func main() {……// 暂停一下,等待其他协程完成time.Sleep(time.Second * 8)
}
使用本节所介绍的WaitGroup
类型就不需要用Sleep
函数来暂停了,只要在主协程上调用其Wait
方法,主协程就会阻塞并且等到计数器为0时才会继续运行。
下面代码演示执行三个新的协程,计数器增加3,每个协程在执行完成时调用Done
方法让计数器减1。主协程上调用Wait
方法后会一直处于等待状态,直到三个协程都顺利完成。
func main() {var wg sync.WaitGroup// 增加计数器wg.Add(3)for i := 1; i <= 3; i++ {go func(n int) {// 执行完成时将计数器减1defer wg.Done()fmt.Printf("开始执行第 %d 个协程\n", n)time.Sleep(time.Second * 2)fmt.Printf("第 %d 个协程执行完毕\n", n)}(i)}// 等待上述各协程执行完成wg.Wait()fmt.Println("所有协程已完成")
}
运行结果如下:
开始执行第3个协程
开始执行第1个协程
开始执行第2个协程
第1个协程执行完毕
第2个协程执行完毕
第3个协程执行完毕
所有协程已完成
【思考】
- 如何创建双向通道?
- 在函数调用时加上 go 关键字有何作用?
- WaitGroup 是如何增加和减少任务数量的?
如何创建双向通道?
在Go语言里,双向通道能够进行数据的发送与接收操作。创建双向通道可以借助make
函数来实现,具体的语法格式如下:
channel := make(chan Type) // 无缓冲通道
channel := make(chan Type, capacity) // 有缓冲通道
这里的Type
指的是通道能够存放的数据类型,capacity
代表通道的缓冲容量。若不指定capacity
,创建的就是无缓冲通道;若指定了capacity
,创建的则是有缓冲通道。
以下是创建无缓冲和有缓冲双向通道的示例代码:
package mainimport "fmt"func main() {// 创建无缓冲的双向通道,可存放int类型数据unbufferedChan := make(chan int)// 创建有缓冲的双向通道,可存放int类型数据,缓冲容量为2bufferedChan := make(chan int, 2)fmt.Printf("无缓冲通道类型: %T\n", unbufferedChan)fmt.Printf("有缓冲通道类型: %T\n", bufferedChan)
}
在上述代码中,unbufferedChan
是无缓冲的双向通道,bufferedChan
是有缓冲的双向通道,二者都可以存放int
类型的数据。
在函数调用时加上go
关键字有何作用?
在函数调用时加上go
关键字,能够启动一个新的Go协程来异步执行该函数。协程是一种轻量级的线程,由Go运行时系统进行调度。借助go
关键字启动的协程会和主协程(也就是执行main
函数的协程)并行执行。
下面是一个简单的示例代码:
package mainimport ("fmt""time"
)func printNumbers() {for i := 1; i <= 5; i++ {fmt.Println(i)time.Sleep(time.Second)}
}func main() {// 启动一个新的协程来执行printNumbers函数go printNumbers()// 主协程继续执行fmt.Println("主协程继续执行...")time.Sleep(6 * time.Second)fmt.Println("主协程结束")
}
在上述代码中,go printNumbers()
启动了一个新的协程来执行printNumbers
函数,主协程不会等待该协程执行完毕,而是继续执行后续的代码。
WaitGroup是如何增加和减少任务数量的?
sync.WaitGroup
属于Go语言标准库中的一个类型,其作用是等待一组协程执行完毕。它内部维护着一个计数器,该计数器的数值表示尚未完成的协程数量。
Add(delta int)
:此方法用于增加计数器的值,delta
代表要增加的数量。通常在启动协程之前调用该方法,以告知WaitGroup
有多少个协程需要等待。Done()
:该方法用于减少计数器的值,实际上它调用了Add(-1)
。一般在协程执行完毕时调用此方法。Wait()
:调用此方法会使当前协程阻塞,直到计数器的值变为0,也就是所有协程都执行完毕。
以下是一个使用WaitGroup
的示例代码:
package mainimport ("fmt""sync""time"
)func worker(id int, wg *sync.WaitGroup) {defer wg.Done() // 协程执行完毕,计数器减1fmt.Printf("Worker %d starting\n", id)time.Sleep(time.Second)fmt.Printf("Worker %d done\n", id)
}func main() {var wg sync.WaitGroup// 启动3个协程,计数器加3wg.Add(3)for i := 1; i <= 3; i++ {go worker(i, &wg)}// 等待所有协程执行完毕wg.Wait()fmt.Println("All workers done")
}
在上述代码中,wg.Add(3)
把计数器的值设为3,代表有3个协程需要等待。每个协程在执行完毕时会调用wg.Done()
让计数器减1。wg.Wait()
会使主协程阻塞,直到计数器的值变为0,也就是所有协程都执行完毕。