第十四章:协程与通信
并发、并行和协程
不要使用全局变量或者共享内存,它们会给你的代码在并发运算的时候带来危险。
协程工作在相同的地址空间中,所以共享内存的方式一定是同步的。Go 使用 channels
来同步协程。
协程是通过使用关键字 go
调用(或执行)一个函数或者方法来实现的(也可以是匿名或者 lambda 函数)。这样会在当前的计算过程中开始一个同时进行的函数,在相同的地址空间中并且分配了独立的栈。
一个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main
import (
"fmt"
"time"
)
func main (){
fmt . Println ( "begin main" )
go LongWait ()
go ShortWait ()
fmt . Println ( "main sleep" )
time . Sleep ( 120 * time . Millisecond )
fmt . Println ( "main sleep end" )
}
func LongWait (){
fmt . Println ( "begin long wait" )
time . Sleep ( 100 * time . Millisecond )
fmt . Println ( "end long wait" )
}
func ShortWait (){
fmt . Println ( "begin short wait" )
time . Sleep ( 10 * time . Millisecond )
fmt . Println ( "end short wait" )
}
Go 协程意味着并行(或者可以以并行的方式部署),协程一般来说不是这样的
Go 协程通过通道来通信;协程通过让出和恢复操作来通信
协程间的通信
通常使用这样的格式来声明通道:var identifier chan datatype
通道也是引用类型,所以我们使用 make()
函数来给它分配内存。
所有的类型都可以用于通道。
通信操作符<-
流向通道(发送):ch <- xxx
流出通道(接收):xxx := <-ch
<- ch
可以单独调用获取通道的(下一个)值,当前值会被丢弃。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main
import (
"fmt"
"time"
)
func SendData ( ch chan string ){
ch <- "feng1"
ch <- "feng2"
ch <- "feng3"
}
func ReceiveData ( ch chan string ){
for {
data := <- ch
fmt . Println ( data )
}
}
func main (){
var ch1 chan string = make ( chan string )
go SendData ( ch1 )
go ReceiveData ( ch1 )
time . Sleep ( 1e9 )
}
通道阻塞
默认情况下,通信是同步且无缓冲的:在有接收者接收数据之前,发送不会结束。
通道的发送 / 接收操作在对方准备好之前是阻塞的。
一个很好的例子:
1
2
3
4
5
6
7
8
9
10
11
func main (){
c := make ( chan int )
go func () {
time . Sleep ( 5 * 1e9 ) //5秒
x := <- c
fmt . Println ( "received" , x )
}()
fmt . Println ( "sending" , 10 )
c <- 10
fmt . Println ( "sent" , 10 )
}
首先打印出sending
,然后协程里面等待了5s,c <- 10
这里因为没有接收者,所以处于阻塞状态。等到协程中5s等待结束,接收了数据后,c <- 10
不再阻塞,协程里面也继续运行。
书中的练习14.2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main
import (
"fmt"
)
func f1 ( in chan int ) {
fmt . Println ( <- in )
}
func main () {
out := make ( chan int )
out <- 2
go f1 ( out )
}
之所以会死锁,时因为首先out <- 2
,因为没有接收者或者阻塞了,导致了死锁。
而这样就不会了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main
import (
"fmt"
"time"
)
func f1 ( in chan int ) {
fmt . Println ( <- in )
}
func main () {
out := make ( chan int )
f1 ( out )
out <- 2
time . Sleep ( 1e9 )
}
带缓冲的通道
给make
传第二个参数可以设置通道的缓冲容量:
1
ch := make ( chan string , 5 )
ch :=make(chan type, value)
:
value == 0 -> synchronous, unbuffered (阻塞)
value > 0 -> asynchronous, buffered(非阻塞)取决于 value 元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main
import "fmt"
func main () {
ch := make ( chan string , 5 )
ch <- "1"
ch <- "2"
ch <- "3"
ch <- "4"
ch <- "5"
//ch <- "6"
fmt . Println ( <- ch )
}
不会报错,因为有5个缓冲,但如果把6的注释去掉就会报fatal了。
信号量模式
使用通道让 main
程序等待协程完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main
import "fmt"
func DoSomething ( v int ) int {
return v + v
}
type Empty interface {}
var (
empty Empty
data [] int = make ([] int , 5 )
res [] int = make ([] int , 5 )
ch chan Empty = make ( chan Empty )
)
func main () {
for i := 0 ; i < 5 ; i ++ {
data [ i ] = i + 1
}
for k , v := range data {
go func ( k , v int ) {
fmt . Println ( "进入协程" , k )
res [ k ] = DoSomething ( v )
ch <- empty
}( k , v )
}
for i := 0 ; i < 5 ; i ++ {
<- ch
}
fmt . Println ( res )
//等待协程运行完毕后再让main退出
}
用带缓冲通道实现一个信号量
这不就是PV操作嘛。。。离谱了。
互斥锁和PV操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* mutexes */
func ( s semaphore ) Lock () {
s . P ( 1 )
}
func ( s semaphore ) Unlock (){
s . V ( 1 )
}
/* signal-wait */
func ( s semaphore ) Wait ( n int ) {
s . P ( n )
}
func ( s semaphore ) Signal () {
s . V ( 1 )
}
通道工厂模式
不将通道作为参数传递给协程,而用函数来生成一个通道并返回(工厂角色);函数内有个匿名函数被协程调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main
import "fmt"
func Sum ( x , y int ) ( ch chan int ) {
ch = make ( chan int )
go func ( x , y int ) {
ch <- x + y
}( x , y )
return
}
func main () {
ch := Sum ( 5 , 12 )
fmt . Println ( <- ch )
}
给通道使用for循环
1
2
3
for v := range ch {
... .
}
这样可以不断地从通道中读取直至通道关闭。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main
import (
"fmt"
"time"
)
func main () {
ch := make ( chan int )
for i := 0 ; i < 10 ; i ++ {
go func ( i int ) {
ch <- i
}( i )
}
go func () {
for v := range ch {
fmt . Println ( v )
}
}()
time . Sleep ( 1e9 )
}
通道的方向
1
2
var send_only chan <- int // channel can only send data
var recv_only <- chan int // channel can only receive data
只接收的通道(<-chan T)无法关闭,因为关闭通道是发送者用来表示不再给通道发送值了。
协程的同步:关闭通道-测试阻塞的通道
检测通道是否被关闭:
1
2
3
4
5
v , ok := <- ch
if ! ok {
break
}
process ( v )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main
import (
"fmt"
)
func SendData ( ch chan int ){
ch <- 10
ch <- 1
ch <- 30
close ( ch )
}
func ReceiveData ( ch chan int ){
for {
v , ok := <- ch
if ! ok {
break
}
fmt . Println ( v )
}
}
func main (){
ch := make ( chan int )
go SendData ( ch )
ReceiveData ( ch )
}
上述代码还可以改写为下面这样:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main
import (
"fmt"
)
func SendData ( ch chan int ){
ch <- 10
ch <- 1
ch <- 30
close ( ch )
}
func ReceiveData ( ch chan int ){
for v := range ch {
fmt . Println ( v )
}
}
func main (){
ch := make ( chan int )
go SendData ( ch )
ReceiveData ( ch )
}
使用for-range语句,可以自动检测通道是否关闭。
使用select切换协程
从不同的并发执行的协程中获取值:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main
import (
"fmt"
"time"
)
func SendData ( ch chan int ){
ch <- 10
ch <- 1
ch <- 30
close ( ch )
}
func ReceiveData ( ch chan int ){
for v := range ch {
fmt . Println ( v )
}
}
func main (){
ch1 := make ( chan int )
ch2 := make ( chan int )
ch3 := make ( chan int )
go func () {
ch1 <- 10
}()
go func () {
ch2 <- 11
}()
go func () {
ch3 <- 12
}()
time . Sleep ( 1e9 )
select {
case u :=<- ch1 :
fmt . Println ( u )
case v :=<- ch2 :
fmt . Println ( v )
case j :=<- ch3 :
fmt . Println ( j )
default :
fmt . Println ( "no" )
}
}
select 做的就是:选择处理列出的多个通信情况中的一个。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main
import (
"fmt"
"os"
)
func tel ( ch chan int , quit chan bool ){
for i := 0 ; i < 15 ; i ++ {
ch <- i
}
quit <- true
}
func main (){
ch := make ( chan int )
quit := make ( chan bool )
go tel ( ch , quit )
for {
select {
case v :=<- ch :
fmt . Println ( v )
case <- quit :
os . Exit ( 0 )
}
}
}
新旧模型对比:任务和worker
使用锁的情景:
访问共享数据结构中的缓存信息
保存应用程序上下文和状态信息数据
使用通道的情景:
与异步操作的结果进行交互
分发任务
传递数据所有权