1.使用无缓存Channel进行goroutine通信 在前面的关于Channel的一些认识当中,我们了解基于无缓存Channels的发送和接收操作将导致两个goroutine做一次同步操作,故无缓存Channels有时候也被称为同步Channels,那么我们就可以使用无缓存的Channel进行简单的goroutine通信了,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package mainimport ( "fmt" "time" ) func main () { ch := make (chan int ) go func () { fmt.Println("三秒之后开始启动!" ) time.Sleep(3 * time.Second) ch <- 1 }() <- ch close (ch) fmt.Println("收到通知!" ) }
当然对于如何使用stuct{}
空结构体进行同步Channels的操作我是一直耿耿于怀,将上面的代码改一改,加深一下认识:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package mainimport ( "fmt" "time" ) func main () { ch := make (chan struct {}) go func () { fmt.Println("三秒之后开始启动!" ) time.Sleep(3 * time.Second) ch <- struct {}{} }() <- ch close (ch) fmt.Println("收到通知!" ) }
2.结合Select多路复用 一言以蔽之:Golang 中的 select 提供了对多个 channel 的统一管理 。这个就是最简洁的答案,如果我们想要在程序中对多个Channel的管理,我们可以选择使用select,select语句的一般形式的代码如下:
1 2 3 4 5 6 7 8 9 10 select { case <-ch1: case x := <-ch2: case ch3 <- y: default : }
select和switch语句稍微有点相似,有case也有最后的default默认分支来设置当其它的操作都不能够马上被处理时程序需要执行哪些逻辑。每一个case代表一个通信操作(在某个channel上进行发送或者接收)并且会包含一些语句组成的一个语句块。
select会等待case中有能够执行的case时去执行 。当条件满足时,select才会去通信并执行case之后的语句;这时候其它通信是不会执行的。一个没有任何case的select语句写作select{},会永远地等待下去 。
2.1 监听一个或者多个Channel 关于select结合channel还是有几种情况出现的,比如我们的select可以监听一个或者多个channel ,只要有一个channel做好准备进行数据发送,则select则会马上进行处理,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package mainimport ( "fmt" "time" ) func test1 (ch chan string ) { time.Sleep(time.Second * 3 ) ch <- "test1" } func test2 (ch chan string ) { time.Sleep(time.Second * 2 ) ch <- "test2" } func main () { chan1 := make (chan string ) chan2 := make (chan string ) go test1(output1) go test2(output2) select { case s1 := <-chan1: fmt.Println("chan1:" , s1) case s2 := <-chan2: fmt.Println("chan2:" , s2) } }
2.2 多个Case随机处理 那肯定有些情况我们不知道那么些个channel哪个先准备好,哪个后准备好,在这种个case同时就绪时的情况下,select会随机处理 ,即随机地选择一个执行,这样来保证每一个channel都有平等的被select的机会。比如下面的代码中,有时候会打印 ch1:1
,有时候则打印 ch2:"hello"
,这是一个随机处理的过程!代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package mainimport ( "fmt" ) func main () { ch1 := make (chan int , 1 ) ch2 := make (chan string , 1 ) go func () { ch1 <- 1 }() go func () { ch2 <- "hello" }() select { case value := <-ch1: fmt.Println("ch1:" , value) case value := <-ch2: fmt.Println("ch2:" , value) } }
2.3 充分利用Default 我们还可以利用default 这个巧妙的设定来进行一些判断,比如判断channel是否已经写满,下面的代码会在ch channel中有值时,从其中接收值;无值时什么都不做。这是一个非阻塞的接收操作;反复地做这样的操作叫做“轮询channel”。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package mainimport ( "fmt" "time" ) func main () { ch := make (chan string , 10 ) go write(ch) for s := range ch { fmt.Println("res:" , s) time.Sleep(time.Second) } } func write (ch chan string ) { for { select { case ch <- "hello" : fmt.Println("write hello" ) default : fmt.Println("channel full" ) } time.Sleep(time.Millisecond * 500 ) } }
2.4 超时控制 还有其他很多很简单的一些实现,比如可以使用select+channel
做超时控制,在很多操作情况下都需要超时控制,利用 select 实现超时控制,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package mainimport ( "fmt" "time" ) func main () { ch := make (chan string ) go func () { time.Sleep(time.Second * 2 ) ch <- "result" }() select { case res := <-ch: fmt.Println(res) case <-time.After(time.Second * 1 ): fmt.Println("timeout" ) } }
上面的代码中,select
语句会阻塞等待最先返回数据的channel
,当先接收到time.After
的通道数据时,select
则会停止阻塞并执行该case
的代码。此时就已经实现了对业务代码的超时处理。
3.并发协程的安全退出 有时候我们需要通知goroutine停止它正在干的事情,比如一个正在执行计算的web服务,然而它的客户端已经断开了和服务端的连接。Golang 没有提供这么一个goroutine中终止另一个goroutine的方法,为啥不直接提供一个goroutine直接终止另外一个goroutine的方法呢?因为这样会导致goroutine之间的共享变量落在未定义的状态上 。
通常我们可以使用select
和default
分支可以很容易实现一个Goroutine的退出控制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package mainimport ( "fmt" "time" ) func worker (cancel chan bool ) { for { select { case <-cancel: return default : fmt.Println("hello" ) } } } func main () { cancel := make (chan bool ) go worker(cancel) time.Sleep(3 * time.Second) cancel <- true }
假如我们不是使用cancel <- true
这种传值的方式,而是想通过close()
方法关闭一个通道进而关闭相对于的协程呢?这就要使用到ok-idiom
了!
关于使用ok-idiom
我们可以结合select进行 ,往往我们需要结合for-select
这种结构来实现,因为select提供了多路复用的能力,所以for-select
可以让函数具有持续多路处理多个Channel的能力 。
需要注意的是在使用ok-idiom过程中进行退出的时候,select没有感知channel的关闭 ,这引出了2个问题:
继续在关闭的通道上读,会读到通道传输数据类型的零值,如果是指针类型,读到nil,继续处理还会产生nil。
继续在关闭的通道上写,将会panic。
问题2可以这样解决,通道只由发送方关闭,接收方不可关闭 ,即某个写通道只由使用该select的协程关闭,select中就不存在继续在关闭的通道上写数据的问题。关于这点,官方close()
的注释中也明确讲了:
1 2 3 4 5 6 7 8 9 func close (c chan <- Type)
问题1可以使用,ok
来检测通道的关闭,使用情况有2种。
使用ok-idiom
结合for-select
结构第一种情况:如果某个通道关闭后,需要退出协程,直接return即可 。例代码中,该协程需要从in通道读数据,还需要定时打印已经处理的数量,有2件事要做,所以不能使用for-range,需要使用for-select,当in关闭时,ok=false
,我们直接返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package mainimport ( "fmt" "time" ) func worker (cancel chan bool ) { for { select { case _, ok := <-cancel: if !ok { fmt.Println("I am out!" ) return } default : fmt.Println("hello" ) } } } func main () { cancel := make (chan bool ) go worker(cancel) time.Sleep(3 * time.Second) close (cancel) }
使用ok-idiom
结合for-select
结构第二种情况:如果某个通道关闭了,不再处理该通道,而是继续处理其他case,退出是等待所有的通道关闭。
我们需要使用select的一个特征:select不会在nil的通道上进行等待 。这种情况,把只读通道设置为nil即可解决。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 package mainimport ( "fmt" "time" ) func worker (ch1, ch2 chan int ,) { for { select { case _, ok := <-ch1: if !ok { fmt.Println("ch1 is over!" ) ch1 = nil } case _, ok := <-ch2: if !ok { fmt.Println("ch2 is over!" ) ch2 = nil } default : time.Sleep(1 * time.Second) fmt.Println("I am doing other work!" ) } if ch1 == nil && ch2 == nil { fmt.Println("worker is done!" ) return } } } func main () { ch1 := make (chan int ) ch2 := make (chan int ) go worker(ch1, ch2) time.Sleep(2 * time.Second) close (ch1) time.Sleep(5 * time.Second) close (ch2) time.Sleep(2 * time.Second) }
打印结果如下:
但问题是如果我们想要退出两个或者任意多个Goroutine怎么办呢? 这里大概有两种办法:
一种是向所有的goroutine发送同样数量的信号给对应的同步Channel来进行退出提示(上面的示例就是一个例子了!) 。但是这样并不是保险的,想想如果在发出发出信号的时候有些goroutine自动退出了,那么是不是Channel中的事件数比需要关闭的goroutine还多 ?这样一来,我们的发送就直接被阻塞了!除了发送到Channel的事件数目过多的情况,过少的情况也可能出现 ,比如待关闭的goroutine又生成了其他的goroutine,那样一来就会产生有些需要关闭的goroutine却没有收到退出的消息
最重要的一点在于Go的并发十分强大,我们很难知道某一个时刻具体运行着的goroutine数目,所以采用这种方法精确的去关闭多个goroutine是很困难的。
管道的发送操作和接收操作是一一对应的,如果要停止多个Goroutine那么可能需要创建同样数量的管道,这个代价太大了!
而另外一种则是通过Channel进行消息广播,使用一个专门的通道,发送退出的信号 ,我们看看如何进行一步步改进。
首先我们可以通过不向Channel发送值而是使用close
关闭一个Channel,从而实现广播的效果! 为什么不使用发送值而是使用close呢?因为当一个goroutine从一个channel中接收到一个值的时候,他会消费掉这个值,这样其它的goroutine就没法看到这条信息了。
比如说,我们启动了10个worker时,只要main()
执行关闭cancel通道,每一个worker都会都到信号,进而关闭。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package mainimport ( "fmt" "time" ) func worker (cancel chan bool ) { for { select { case <-cancel: return default : fmt.Println("hello" ) } } } func main () { cancel := make (chan bool ) for i := 0 ; i < 10 ; i++ { go worker(cancel) } time.Sleep(time.Second) close (cancel) }
这里存在的问题就是:当每个Goroutine收到退出指令退出时一般会进行一定的清理工作,但是退出的清理工作并不能保证被完成,因为main
线程并没有等待各个工作Goroutine退出工作完成的机制 。我们可以结合sync.WaitGroup
来改进:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package mainimport ( "fmt" "sync" "time" ) func worker (wg *sync.WaitGroup, cannel chan bool ) { defer wg.Done() for { select { default : fmt.Println("hello" ) case <-cannel: return } } } func main () { cancel := make (chan bool ) var wg sync.WaitGroup for i := 0 ; i < 10 ; i++ { wg.Add(1 ) go worker(&wg, cancel) } time.Sleep(time.Second) close (cancel) wg.Wait() }
现在每个工作者并发体的创建、运行、暂停和退出都是在main
函数的安全控制之下了。
小结一下处理并发协程的安全退出的几种方法:
使用ok-idiom
处理一个或者多个goroutine的关闭,但是多个goroutine的关闭并不推荐使用这种方式进行。
通过Channel进行消息广播,使用一个专门的Channel,通过close()
发送退出的信号 。
在第二点的基础上结合sync.WaitGroup
来改进,完善为main
线程等待各个工作Goroutine退出工作完成的机制
4.使用管道(Pipeline)优雅的从Channel循环取值 当通过Channel发送有限的数据时,我们可以通过close()
函数关闭Channel来告知从该Channel接收值的goroutine停止等待。当Channel被关闭时,再继续往该Channel发送值则会引发panic,如果从该Channel里接收的值一直都是类型零值。那如何判断一个通道是否被关闭了呢? 在前面的关于Channel的认识中我们了解到,可以使用ok-idiom 进行判断,接收操作有一个变体形式 :它多接收一个结果,多接收的第二个结果是一个布尔值ok,ture表示成功从channels接收到值,false表示channels已经被关闭并且里面没有值可接收。
在下面的代码中,第一个goroutine是一个计数器,用于生成0、1、2、……形式的整数序列,然后通过channel将该整数序列发送给第二个goroutine;第二个goroutine是一个求平方的程序,对收到的每个整数求平方,然后将平方后的结果通过第二个channel发送给第三个goroutine;第三个goroutine是一个打印程序,打印收到的每个整数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package mainimport ( "fmt" ) func main () { ch1 := make (chan int ) ch2 := make (chan int ) go func () { for i := 0 ; i < 100 ; i++ { ch1 <- i } close (ch1) }() go func () { for { i, ok := <-ch1 if !ok { break } ch2 <- i * i } close (ch2) }() for i := range ch2 { fmt.Println(i) } }
从上面的例子中我们看到有两种方式在接收值的时候判断通道是否被关闭:
一种是使用ok-idiom
另外一种就是使用for range
了,而我们通常使用的是for range
的方式。
为什么for range能够起到作用呢?因为range channel 可以直接取到 channel 中的值。当我们使用 range 来操作 channel 的时候,它依次从channel接收数据,当channel被关闭并且没有值可接收时跳出循环 。这应该和for range
的语法糖相关,后续了解到for range
的语法糖的时候,再返回来解决详细的解决这个疑惑!
5.实现生产者消费者模型 生产者消费者模型是很常见的了,在操作系统看见的次数可不少,这是并发编程中最常见的例子了,该模式主要通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。简单地说,就是生产者生产一些数据,然后放到成果队列中,同时消费者从成果队列中来取这些数据。这样就让生产消费变成了异步的两个过程。当成果队列中没有数据时,消费者就进入饥饿的等待中;而当成果队列中数据已满时,生产者则面临因产品挤压导致CPU被剥夺的下岗问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package mainimport ( "fmt" "time" ) func Producer (factor int , out chan <- int ) { for i := 0 ; i <= 100 ; i++ { fmt.Println("生产了:" , i*factor) out <- i*factor } } func Consumer (in <-chan int ) { for v := range in { fmt.Println("消费了:" ,v) } } func main () { ch := make (chan int , 64 ) go Producer(2 , ch) go Consumer(ch) time.Sleep(2 * time.Second) }
还可以进行改进,我们让main
函数保存阻塞状态不退出,只有当用户输入Ctrl-C
时才真正退出程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package mainimport ( "fmt" "os" "os/signal" "syscall" ) func Producer (factor int , out chan <- int ) { for i := 0 ; i <= 100 ; i++ { fmt.Println("生产了:" , i*factor) out <- i*factor } } func Consumer (in <-chan int ) { for v := range in { fmt.Println("消费了:" ,v) } } func main () { ch := make (chan int , 64 ) go Producer(2 , ch) go Consumer(ch) sig := make (chan os.Signal, 1 ) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) fmt.Printf("quit (%v)\n" , <-sig) }
6.实现循环队列 7.控制并发数 8.发布订阅模型 参考文章