Golang协程通信

第十四章:协程与通信

并发、并行和协程

不要使用全局变量或者共享内存,它们会给你的代码在并发运算的时候带来危险。

协程工作在相同的地址空间中,所以共享内存的方式一定是同步的。Go 使用 channels 来同步协程。

协程是通过使用关键字 go 调用(或执行)一个函数或者方法来实现的(也可以是匿名或者 lambda 函数)。这样会在当前的计算过程中开始一个同时进行的函数,在相同的地址空间中并且分配了独立的栈。

一个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
	"fmt"
	"time"
)

func main(){
	fmt.Println("begin main")
	go LongWait()
	go ShortWait()
	fmt.Println("main sleep")
	time.Sleep(120*time.Millisecond)
	fmt.Println("main sleep end")
}

func LongWait(){
	fmt.Println("begin long wait")
	time.Sleep(100*time.Millisecond)
	fmt.Println("end long wait")
}
func ShortWait(){
	fmt.Println("begin short wait")
	time.Sleep(10*time.Millisecond)
	fmt.Println("end short wait")
}
  • Go 协程意味着并行(或者可以以并行的方式部署),协程一般来说不是这样的
  • Go 协程通过通道来通信;协程通过让出和恢复操作来通信

协程间的通信

通常使用这样的格式来声明通道:var identifier chan datatype

通道也是引用类型,所以我们使用 make() 函数来给它分配内存。

所有的类型都可以用于通道。

通信操作符<-

  1. 流向通道(发送):ch <- xxx
  2. 流出通道(接收):xxx := <-ch

<- ch 可以单独调用获取通道的(下一个)值,当前值会被丢弃。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
	"fmt"
	"time"
)

func SendData(ch chan string){
	ch <- "feng1"
	ch <- "feng2"
	ch <- "feng3"
}

func ReceiveData(ch chan string){
	for  {
		data := <-ch
		fmt.Println(data)
	}
}

func main(){
	var ch1 chan string = make(chan string)
	go SendData(ch1)
	go ReceiveData(ch1)
	time.Sleep(1e9)

}

通道阻塞

默认情况下,通信是同步且无缓冲的:在有接收者接收数据之前,发送不会结束。

通道的发送 / 接收操作在对方准备好之前是阻塞的。

一个很好的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func main(){
	c := make(chan int)
	go func() {
		time.Sleep(5 * 1e9)//5秒
		x := <-c
		fmt.Println("received", x)
	}()
	fmt.Println("sending", 10)
	c <- 10
	fmt.Println("sent", 10)
}

首先打印出sending,然后协程里面等待了5s,c <- 10这里因为没有接收者,所以处于阻塞状态。等到协程中5s等待结束,接收了数据后,c <- 10不再阻塞,协程里面也继续运行。

书中的练习14.2:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package main

import (
    "fmt"
)

func f1(in chan int) {
    fmt.Println(<-in)
}

func main() {
    out := make(chan int)
    out <- 2
    go f1(out)
}

之所以会死锁,时因为首先out <- 2,因为没有接收者或者阻塞了,导致了死锁。

而这样就不会了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package main

import (
	"fmt"
	"time"
)

func f1(in chan int) {
	fmt.Println(<-in)
}

func main() {
	out := make(chan int)
	f1(out)
	out <- 2
	time.Sleep(1e9)
}

带缓冲的通道

make传第二个参数可以设置通道的缓冲容量:

1
	ch := make(chan string,5)

ch :=make(chan type, value)

  • value == 0 -> synchronous, unbuffered (阻塞)

  • value > 0 -> asynchronous, buffered(非阻塞)取决于 value 元素

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
package main

import "fmt"

func main() {
	ch := make(chan string,5)
	ch <- "1"
	ch <- "2"
	ch <- "3"
	ch <- "4"
	ch <- "5"
	//ch <- "6"
	fmt.Println(<-ch)
}

不会报错,因为有5个缓冲,但如果把6的注释去掉就会报fatal了。

信号量模式

使用通道让 main 程序等待协程完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import "fmt"

func DoSomething(v int ) int{
	return v+v
}

type Empty interface {}
var (
	empty Empty
	data []int = make([]int,5)
	res []int = make([]int,5)
	ch chan Empty = make(chan Empty)
)

func main() {
	for i:=0;i<5;i++{
		data[i] = i+1
	}
	for k,v := range data{
		go func(k,v int) {
			fmt.Println("进入协程",k)
			res[k] = DoSomething(v)
			ch <- empty
		}(k,v)
	}

	for i:=0;i<5;i++ {
		<-ch
	}
	fmt.Println(res)
	//等待协程运行完毕后再让main退出
}

用带缓冲通道实现一个信号量

这不就是PV操作嘛。。。离谱了。

互斥锁和PV操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
/* mutexes */
func (s semaphore) Lock() {
    s.P(1)
}

func (s semaphore) Unlock(){
    s.V(1)
}

/* signal-wait */
func (s semaphore) Wait(n int) {
    s.P(n)
}

func (s semaphore) Signal() {
    s.V(1)
}

通道工厂模式

不将通道作为参数传递给协程,而用函数来生成一个通道并返回(工厂角色);函数内有个匿名函数被协程调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package main

import "fmt"

func Sum(x,y int) (ch chan int) {
	ch = make(chan int)
	go func(x,y int) {
		ch <- x+y
	}(x,y)
	return
}

func main() {
	ch := Sum(5,12)
	fmt.Println(<-ch)
}

给通道使用for循环

1
2
3
for v := range ch {
    ....
}

这样可以不断地从通道中读取直至通道关闭。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan int)
	for i:=0;i<10;i++{
		go func(i int ) {
			ch <- i
		}(i)
	}
	go func() {
		for v := range ch{
			fmt.Println(v)
		}
	}()
	time.Sleep(1e9)
}

通道的方向

1
2
var send_only chan<- int         // channel can only send data
var recv_only <-chan int        // channel can only receive data

只接收的通道(<-chan T)无法关闭,因为关闭通道是发送者用来表示不再给通道发送值了。

协程的同步:关闭通道-测试阻塞的通道

检测通道是否被关闭:

1
2
3
4
5
v, ok := <-ch
if !ok {
  break
}
process(v)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
	"fmt"
)

func SendData(ch chan int){
	ch<-10
	ch<-1
	ch<-30
	close(ch)
}
func ReceiveData(ch chan int){
	for  {
		v,ok := <-ch
		if !ok{
			break
		}
		fmt.Println(v)
	}
}

func main(){
	ch := make(chan int)
	go SendData(ch)
	ReceiveData(ch)
}

上述代码还可以改写为下面这样:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
	"fmt"
)

func SendData(ch chan int){
	ch<-10
	ch<-1
	ch<-30
	close(ch)
}
func ReceiveData(ch chan int){
	for v := range ch{
		fmt.Println(v)
	}
}

func main(){
	ch := make(chan int)
	go SendData(ch)
	ReceiveData(ch)
}

使用for-range语句,可以自动检测通道是否关闭。

使用select切换协程

从不同的并发执行的协程中获取值:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
	"fmt"
	"time"
)

func SendData(ch chan int){
	ch<-10
	ch<-1
	ch<-30
	close(ch)
}
func ReceiveData(ch chan int){
	for v := range ch{
		fmt.Println(v)
	}
}

func main(){
	ch1 := make(chan int)
	ch2 := make(chan int)
	ch3 := make(chan int)
	go func() {
		ch1 <- 10
	}()
	go func() {
		ch2 <- 11
	}()
	go func() {
		ch3 <- 12
	}()
	time.Sleep(1e9)

	select {
	case u:=<-ch1:
		fmt.Println(u)
	case v:=<-ch2:
		fmt.Println(v)
	case j:=<-ch3:
		fmt.Println(j)
	default:
		fmt.Println("no")
	}
}

select 做的就是:选择处理列出的多个通信情况中的一个。

  • 如果都阻塞了,会等待直到其中一个可以处理

  • 如果多个可以处理,随机选择一个

  • 如果没有通道操作可以处理并且写了 default 语句,它就会执行:default 永远是可运行的(这就是准备好了,可以执行)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
	"fmt"
	"os"
)

func tel(ch chan int,quit chan bool){
	for i:=0;i<15;i++{
			ch<-i
	}
	quit<-true
}
func main(){
	ch := make(chan int)
	quit := make(chan bool)
	go tel(ch,quit)
	for  {
		select {
		case v:=<-ch:
			fmt.Println(v)
		case <-quit:
			os.Exit(0)
		}
	}

}

新旧模型对比:任务和worker

  • 使用锁的情景:
    • 访问共享数据结构中的缓存信息
    • 保存应用程序上下文和状态信息数据
  • 使用通道的情景:
    • 与异步操作的结果进行交互
    • 分发任务
    • 传递数据所有权
0%