第十四章:协程与通信 
  
     
   
并发、并行和协程 
  
     
   
不要使用全局变量或者共享内存,它们会给你的代码在并发运算的时候带来危险。 
协程工作在相同的地址空间中,所以共享内存的方式一定是同步的。Go 使用 channels 来同步协程。
协程是通过使用关键字 go 调用(或执行)一个函数或者方法来实现的(也可以是匿名或者 lambda 函数)。这样会在当前的计算过程中开始一个同时进行的函数,在相同的地址空间中并且分配了独立的栈。
一个例子:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
  
package  main 
 import  ( 
	"fmt" 
 	"time" 
 ) 
 func  main (){ 
	fmt . Println ( "begin main" ) 
 	go  LongWait () 
 	go  ShortWait () 
 	fmt . Println ( "main sleep" ) 
 	time . Sleep ( 120 * time . Millisecond ) 
 	fmt . Println ( "main sleep end" ) 
 } 
 func  LongWait (){ 
	fmt . Println ( "begin long wait" ) 
 	time . Sleep ( 100 * time . Millisecond ) 
 	fmt . Println ( "end long wait" ) 
 } 
func  ShortWait (){ 
	fmt . Println ( "begin short wait" ) 
 	time . Sleep ( 10 * time . Millisecond ) 
 	fmt . Println ( "end short wait" ) 
 }  
 
Go 协程意味着并行(或者可以以并行的方式部署),协程一般来说不是这样的 
Go 协程通过通道来通信;协程通过让出和恢复操作来通信 
 
协程间的通信 
  
     
   
通常使用这样的格式来声明通道:var identifier chan datatype
通道也是引用类型,所以我们使用 make() 函数来给它分配内存。
所有的类型都可以用于通道。
通信操作符<- 
  
     
   
流向通道(发送):ch <- xxx 
流出通道(接收):xxx := <-ch 
 
<- ch 可以单独调用获取通道的(下一个)值,当前值会被丢弃。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
  
package  main 
 import  ( 
	"fmt" 
 	"time" 
 ) 
 func  SendData ( ch  chan  string ){ 
	ch  <-  "feng1" 
 	ch  <-  "feng2" 
 	ch  <-  "feng3" 
 } 
 func  ReceiveData ( ch  chan  string ){ 
	for   { 
 		data  :=  <- ch 
 		fmt . Println ( data ) 
 	} 
 } 
 func  main (){ 
	var  ch1  chan  string  =  make ( chan  string ) 
 	go  SendData ( ch1 ) 
 	go  ReceiveData ( ch1 ) 
 	time . Sleep ( 1e9 ) 
 
 }  
 
通道阻塞 
  
     
   
默认情况下,通信是同步且无缓冲的:在有接收者接收数据之前,发送不会结束。
通道的发送 / 接收操作在对方准备好之前是阻塞的。
一个很好的例子:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
  
func  main (){ 
	c  :=  make ( chan  int ) 
 	go  func ()  { 
 		time . Sleep ( 5  *  1e9 ) //5秒 
 		x  :=  <- c 
 		fmt . Println ( "received" ,  x ) 
 	}() 
 	fmt . Println ( "sending" ,  10 ) 
 	c  <-  10 
 	fmt . Println ( "sent" ,  10 ) 
 }  
 
首先打印出sending,然后协程里面等待了5s,c <- 10这里因为没有接收者,所以处于阻塞状态。等到协程中5s等待结束,接收了数据后,c <- 10不再阻塞,协程里面也继续运行。
书中的练习14.2:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
  
package  main 
 import  ( 
    "fmt" 
 ) 
 func  f1 ( in  chan  int )  { 
    fmt . Println ( <- in ) 
 } 
 func  main ()  { 
    out  :=  make ( chan  int ) 
     out  <-  2 
     go  f1 ( out ) 
 }  
 
之所以会死锁,时因为首先out <- 2,因为没有接收者或者阻塞了,导致了死锁。
而这样就不会了:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
  
package  main 
 import  ( 
	"fmt" 
 	"time" 
 ) 
 func  f1 ( in  chan  int )  { 
	fmt . Println ( <- in ) 
 } 
 func  main ()  { 
	out  :=  make ( chan  int ) 
 	f1 ( out ) 
 	out  <-  2 
 	time . Sleep ( 1e9 ) 
 }  
 
带缓冲的通道 
  
     
   
给make传第二个参数可以设置通道的缓冲容量:
1
  
	ch  :=  make ( chan  string , 5 )   
 
ch :=make(chan type, value):
value == 0 -> synchronous, unbuffered (阻塞)
 
value > 0 -> asynchronous, buffered(非阻塞)取决于 value 元素
 
 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
  
package  main 
 import  "fmt" 
 func  main ()  { 
	ch  :=  make ( chan  string , 5 ) 
 	ch  <-  "1" 
 	ch  <-  "2" 
 	ch  <-  "3" 
 	ch  <-  "4" 
 	ch  <-  "5" 
 	//ch <- "6" 
 	fmt . Println ( <- ch ) 
 }  
 
不会报错,因为有5个缓冲,但如果把6的注释去掉就会报fatal了。
信号量模式 
  
     
   
使用通道让 main 程序等待协程完成。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
  
package  main 
 import  "fmt" 
 func  DoSomething ( v  int  )  int { 
	return  v + v 
 } 
 type  Empty  interface  {} 
var  ( 
	empty  Empty 
 	data  [] int  =  make ([] int , 5 ) 
 	res  [] int  =  make ([] int , 5 ) 
 	ch  chan  Empty  =  make ( chan  Empty ) 
 ) 
 func  main ()  { 
	for  i := 0 ; i < 5 ; i ++ { 
 		data [ i ]  =  i + 1 
 	} 
 	for  k , v  :=  range  data { 
 		go  func ( k , v  int )  { 
 			fmt . Println ( "进入协程" , k ) 
 			res [ k ]  =  DoSomething ( v ) 
 			ch  <-  empty 
 		}( k , v ) 
 	} 
 
 	for  i := 0 ; i < 5 ; i ++  { 
 		<- ch 
 	} 
 	fmt . Println ( res ) 
 	//等待协程运行完毕后再让main退出 
 }  
 
用带缓冲通道实现一个信号量 
  
     
   
这不就是PV操作嘛。。。离谱了。
互斥锁和PV操作:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
  
/* mutexes */ 
func  ( s  semaphore )  Lock ()  { 
    s . P ( 1 ) 
 } 
 func  ( s  semaphore )  Unlock (){ 
    s . V ( 1 ) 
 } 
 /* signal-wait */ 
func  ( s  semaphore )  Wait ( n  int )  { 
    s . P ( n ) 
 } 
 func  ( s  semaphore )  Signal ()  { 
    s . V ( 1 ) 
 }  
 
通道工厂模式 
  
     
   
不将通道作为参数传递给协程,而用函数来生成一个通道并返回(工厂角色);函数内有个匿名函数被协程调用。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
  
package  main 
 import  "fmt" 
 func  Sum ( x , y  int )  ( ch  chan  int )  { 
	ch  =  make ( chan  int ) 
 	go  func ( x , y  int )  { 
 		ch  <-  x + y 
 	}( x , y ) 
 	return 
 } 
 func  main ()  { 
	ch  :=  Sum ( 5 , 12 ) 
 	fmt . Println ( <- ch ) 
 }  
 
给通道使用for循环 
  
     
   
1
 2
 3
  
for  v  :=  range  ch  { 
    ... . 
 }  
 
这样可以不断地从通道中读取直至通道关闭。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
  
package  main 
 import  ( 
	"fmt" 
 	"time" 
 ) 
 func  main ()  { 
	ch  :=  make ( chan  int ) 
 	for  i := 0 ; i < 10 ; i ++ { 
 		go  func ( i  int  )  { 
 			ch  <-  i 
 		}( i ) 
 	} 
 	go  func ()  { 
 		for  v  :=  range  ch { 
 			fmt . Println ( v ) 
 		} 
 	}() 
 	time . Sleep ( 1e9 ) 
 }  
 
通道的方向 
  
     
   
1
 2
  
var  send_only  chan <-  int          // channel can only send data 
var  recv_only  <- chan  int         // channel can only receive data  
 
只接收的通道(<-chan T)无法关闭,因为关闭通道是发送者用来表示不再给通道发送值了。
协程的同步:关闭通道-测试阻塞的通道 
  
     
   
检测通道是否被关闭:
1
 2
 3
 4
 5
  
v ,  ok  :=  <- ch 
if  ! ok  { 
  break 
 } 
process ( v )  
 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
  
package  main 
 import  ( 
	"fmt" 
 ) 
 func  SendData ( ch  chan  int ){ 
	ch <- 10 
 	ch <- 1 
 	ch <- 30 
 	close ( ch ) 
 } 
func  ReceiveData ( ch  chan  int ){ 
	for   { 
 		v , ok  :=  <- ch 
 		if  ! ok { 
 			break 
 		} 
 		fmt . Println ( v ) 
 	} 
 } 
 func  main (){ 
	ch  :=  make ( chan  int ) 
 	go  SendData ( ch ) 
 	ReceiveData ( ch ) 
 }  
 
上述代码还可以改写为下面这样:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
  
package  main 
 import  ( 
	"fmt" 
 ) 
 func  SendData ( ch  chan  int ){ 
	ch <- 10 
 	ch <- 1 
 	ch <- 30 
 	close ( ch ) 
 } 
func  ReceiveData ( ch  chan  int ){ 
	for  v  :=  range  ch { 
 		fmt . Println ( v ) 
 	} 
 } 
 func  main (){ 
	ch  :=  make ( chan  int ) 
 	go  SendData ( ch ) 
 	ReceiveData ( ch ) 
 }  
 
使用for-range语句,可以自动检测通道是否关闭。
使用select切换协程 
  
     
   
从不同的并发执行的协程中获取值:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
  
package  main 
 import  ( 
	"fmt" 
 	"time" 
 ) 
 func  SendData ( ch  chan  int ){ 
	ch <- 10 
 	ch <- 1 
 	ch <- 30 
 	close ( ch ) 
 } 
func  ReceiveData ( ch  chan  int ){ 
	for  v  :=  range  ch { 
 		fmt . Println ( v ) 
 	} 
 } 
 func  main (){ 
	ch1  :=  make ( chan  int ) 
 	ch2  :=  make ( chan  int ) 
 	ch3  :=  make ( chan  int ) 
 	go  func ()  { 
 		ch1  <-  10 
 	}() 
 	go  func ()  { 
 		ch2  <-  11 
 	}() 
 	go  func ()  { 
 		ch3  <-  12 
 	}() 
 	time . Sleep ( 1e9 ) 
 
 	select  { 
 	case  u :=<- ch1 : 
 		fmt . Println ( u ) 
 	case  v :=<- ch2 : 
 		fmt . Println ( v ) 
 	case  j :=<- ch3 : 
 		fmt . Println ( j ) 
 	default : 
 		fmt . Println ( "no" ) 
 	} 
 }  
 
select 做的就是:选择处理列出的多个通信情况中的一个。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
  
package  main 
 import  ( 
	"fmt" 
 	"os" 
 ) 
 func  tel ( ch  chan  int , quit  chan  bool ){ 
	for  i := 0 ; i < 15 ; i ++ { 
 			ch <- i 
 	} 
 	quit <- true 
 } 
func  main (){ 
	ch  :=  make ( chan  int ) 
 	quit  :=  make ( chan  bool ) 
 	go  tel ( ch , quit ) 
 	for   { 
 		select  { 
 		case  v :=<- ch : 
 			fmt . Println ( v ) 
 		case  <- quit : 
 			os . Exit ( 0 ) 
 		} 
 	} 
 
 }  
 
新旧模型对比:任务和worker 
  
     
   
使用锁的情景:
访问共享数据结构中的缓存信息 
保存应用程序上下文和状态信息数据 
 
 
使用通道的情景:
与异步操作的结果进行交互 
分发任务 
传递数据所有权