参考

http://www.hangdaowangluo.com/archives/2526

前言

通道关闭原则: 在使用Go channel的时候,一个适用的原则是不要从接收端关闭channel,也不要在多个并发发送端中关闭channel。

  • 在不能更改channel状态的情况下,没有简单普遍的方式来检查channel是否已经关闭了
  • 关闭已经关闭的channel会导致panic,所以在closer(关闭者)不知道channel是否已经关闭的情况下去关闭channel是很危险的
  • 发送值到已经关闭的channel会导致panic,所以如果sender(发送者)在不知道channel是否已经关闭的情况下去向channel发送值是很危险的

换句话说,如果sender(发送者)只是唯一的sender或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知receiver(s)(接收者们)已经没有值可以读了。维持这条原则将保证永远不会发生向一个已经关闭的channel发送值或者关闭一个已经关闭的channel。下面,我们将会称上面的原则为channel closing principle<通道关闭原则>

保持channel closing principle的优雅方案

针对各种场景,下面介绍不用使用panic/recover和sync包,纯粹是利用channel的解决方案。在下面的例子中,sync.WaitGroup只是用来让例子完整的。它的使用在实践中不一定一直都有用。

M个receivers,一个sender,sender通过关闭data channel说“不再发送”

这是最简单的场景了,就只是当sender不想再发送的时候让sender关闭data 来关闭channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
"time"
"math/rand"
"sync"
"log"
)

func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)

// ...
const MaxRandomNumber = 100000
const NumReceivers = 100

wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)

// ...
dataCh := make(chan int, 100)

// the sender
go func() {
for {
if value := rand.Intn(MaxRandomNumber); value == 0 {
// the only sender can close the channel safely.
close(dataCh)
return
} else {
dataCh <- value
}
}
}()

// receivers
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()

// receive values until dataCh is closed and
// the value buffer queue of dataCh is empty.
for value := range dataCh {
log.Println(value)
}
}()
}

wgReceivers.Wait()
}

一个receiver,N个sender,receiver通过关闭一个额外的signal channel说“请停止发送”

这种场景比上一个要复杂一点。我们不能让receiver关闭data channel,因为这么做将会打破channel closing principle。但是我们可以让receiver关闭一个额外的signal channel来通知sender停止发送值:

TIPS:此时,对于 data channel 的 关闭, 交给golang的GC处理就好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import (
"time"
"math/rand"
"sync"
"log"
)

func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)

// ...
const MaxRandomNumber = 100000
const NumSenders = 1000

wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)

// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel dataCh.
// Its reveivers are the senders of channel dataCh.

// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
value := rand.Intn(MaxRandomNumber)

select {
case <- stopCh:
// 伴随着goroutine的全部退出, dataCh将会因为缺少引用而被GC扫描回收。
return
case dataCh <- value:
}
}
}()
}

// the receiver
go func() {
defer wgReceivers.Done()

for value := range dataCh { // dataCh的接收方
if value == MaxRandomNumber-1 {
// the receiver of the dataCh channel is
// also the sender of the stopCh cahnnel.
// It is safe to close the stop channel here.
close(stopCh)
// 我们在退出当前goroutine之前, 打开了潘多拉魔盒(即通过stopCh的关闭, 触发所有对应的data channel的sender所在goroutine退出。
return
}

log.Println(value)
}
}()

// ...
wgReceivers.Wait()
}

正如注释说的,对于额外的signal channel来说,它的sender是data channel的receiver。这个额外的signal channel被它唯一的sender关闭,遵守了channel closing principle。

M个receiver,N个sender,它们当中任意一个通过通知一个moderator(仲裁者)关闭额外的signal channel来说“让我们结束游戏吧”

这是最复杂的场景了。我们不能让任意的receivers和senders关闭data channel,也不能让任何一个receivers通过关闭一个额外的signal channel来通知所有的senders和receivers退出游戏。这么做的话会打破channel closing principle。但是,我们可以引入一个moderator来关闭一个额外的signal channel。这个例子的一个技巧是怎么通知moderator去关闭额外的signal channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main

import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)

func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)

// ...
const MaxRandomNumber = 100000
const NumReceivers = 10
const NumSenders = 1000

wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)

// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown below.
// Its reveivers are all senders and receivers of dataCh.
toStop := make(chan string, 1)
// the channel toStop is used to notify the moderator
// to close the additional signal channel (stopCh).
// Its senders are any senders and receivers of dataCh.
// Its reveiver is the moderator goroutine shown below.

var stoppedBy string

// moderator
go func() {
stoppedBy = <- toStop // part of the trick used to notify the moderator
// to close the additional signal channel.
close(stopCh)
}()

// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(MaxRandomNumber)
if value == 0 {
// here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}

// the first select here is to try to exit the
// goroutine as early as possible.
select {
case <- stopCh:
return
default:
}

select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}

// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()

for {
// same as senders, the first select here is to
// try to exit the goroutine as early as possible.
select {
case <- stopCh:
return
default:
}

select {
case <- stopCh:
return
case value := <-dataCh:
if value == MaxRandomNumber-1 {
// the same trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "receiver#" + id:
default:
}
return
}

log.Println(value)
}
}
}(strconv.Itoa(i))
}

// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}

在这个例子中,仍然遵守着channel closing principle。
请注意channel toStop的缓冲大小是1.这是为了避免当mederator goroutine 准备好之前第一个通知就已经发送了,导致丢失。

更多的场景?
很多的场景变体是基于上面三种的。举个例子,一个基于最复杂情况的变体可能要求receivers读取buffer channel中剩下所有的值。这应该很容易处理,所有这篇文章也就不提了。
尽管上面三种场景不能覆盖所有Go channel的使用场景,但它们是最基础的,实践中的大多数场景都可以分类到那三种中。

结论

这里没有一种场景要求你去打破channel closing principle。如果你遇到了这种场景,请思考一下你的设计并重写你的代码。用Go编程就像在创作艺术。