func main(){
ci:=make(chan int)
close(ci)
close(ci)
}
「编译不会提示错误,运行直接报错」panic: close of closed channel goroutine 1 [running]:
func main() {
ci := make(chan int)
close(ci)
ci <- 1
}
「编译不会提示错误,运行直接报错」panic: send on closed channel goroutine 1 [running]:
总结起来有两点:
有同学会说,既然不正确操作channel会引发上述问题,那我写程序的时候都不关闭channel,不就避免了上述问题嘛。「理论上可行,实际是不行的」。channel是占资源的,在计算机中当资源不在使用的时候,要尽早规划给操作系统,让其他程序可以使用。
那有没有检测channel是否关闭的函数呢?不好意思,golang 官方没有提供检测方法。我们自己实现一个可以了吧。
func IsChannelClosed(ch <-chan int) bool {
select {
case <-ch:
return true
default:
}
return false
}
func main() {
ci := make(chan int)
fmt.Println("channel是否已经关闭:", IsChannelClosed(ci))
}
输出:channel是否已经关闭: false
func main() {
ci := make(chan int)
close(ci)
fmt.Println("channel是否已经关闭:", IsChannelClosed(ci))
}
输出:channel是否已经关闭: true
我们重新在仔细思考一下,看看IsChannelClosed存在什么问题:
func main() {
ci := make(chan int, 1)
ci <- 1
fmt.Println("channel是否已经关闭:", IsChannelClosed(ci))
}
输出:channel是否已经关闭: true
func main() {
ci := make(chan int)
if !IsChannelClosed(ci){
time.Sleep(time.Duration(1000)) //模拟耗时操作
//这里发送的时候,ci可能已经被其他goroutine操作关闭
ci<-1
}
...
go func(){
close(ci)
}()
...
}
channel关闭原则:
❝don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders ❞
即不要在receiver(接收端)关闭,也不要在有多个sender(发送端)的时候关闭。
1中的IsChannelClosed并不能真正检查channel是否关闭,那有没有真正可判断channel是否关闭的方法,有三种方法,一是通过defer+recover机制来判断,另一种是采用sync.Once,保证channel只关闭一次,最后一种与sync.Once类似,不过是采用sync.Mutex加锁实现。
func SafeCloseChannel(ch chan int) (justClosed bool) {
defer func() {
if recover() != nil {
justClosed = false
}
}()
close(ch)
return true
}
SafeCloseChannel对于未关闭的ch, 执行close(ch)会正常关闭,对于已关闭的ch, 执行close(ch)会引发panic, 由defer捕获,recover返回非空,识别出ch是已经关闭。
type MyChannel struct{
C chan struct{}
once sync.Once
}
func NewMyChannel() *MyChannel{
return &MyChannel{C:make(chan struct{})}
}
func (mc *MyChannel) SafeClose(){
mc.once.Do(func(){
close(mc.C)
})
}
type MyChannel2 struct{
C chan struct{}
closed bool
mutex sync.Mutex
}
func NewMyChannel2() *MyChannel2{
return &MyChannel2{C:make(chan struct{})}
}
func (mc *MyChannel2) SafeClose(){
mc.mutex.Lock()
if !mc.closed{
close(mc.C)
mc.closed=true
}
mc.mutex.Unlock()
}
func (mc *MyChannel2) IsClosed() bool{
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.closed
}
2中关闭channel的方法虽然都是正确的,在生产环境是可用的,但并不是优雅的做法。下面介绍优雅关闭channel的方法,按照receiver(接受者)和sender(发送者)的数量关系,可以分成4种情况:
发送者:接收者=1:1
发送者:接收者=1:N
发送者:接收者=N:1
发送者:接收者=M:N
// 生产者:消费者=1:1
func test11() {
chanInt := make(chan int)
wg := sync.WaitGroup{}
wg.Add(2)
//生产者1个
go func(ci chan int) {
defer wg.Done()
for i := 0; i < 10; i++ {
chanInt <- i
}
//关闭channel
close(chanInt)
}(chanInt)
//消费者1个
go func(ci chan int) {
defer wg.Done()
for v := range ci {
fmt.Println(v)
}
}(chanInt)
wg.Wait()
}
// 生产者:消费者=1:N
func test1N() {
chanInt := make(chan int)
wg := sync.WaitGroup{}
wg.Add(3)
//生产者1个
go func(ci chan int) {
defer wg.Done()
for i := 0; i < 10; i++ {
ci <- i
}
//关闭channel
close(ci)
}(chanInt)
//消费者2个
go func(ci chan int) {
defer wg.Done()
for v := range ci {
fmt.Println("consumer 1, ", v)
}
}(chanInt)
go func(ci chan int) {
defer wg.Done()
for v := range ci {
fmt.Println("consumer 2, ", v)
}
}(chanInt)
wg.Wait()
}
// 生产者:消费者=N:1
func testN1() {
chanInt := make(chan int)
wg := sync.WaitGroup{}
wgProducer := sync.WaitGroup{}
wg.Add(4)
//生产者2个
wgProducer.Add(2)
//生产者1
go func(ci chan int) {
defer wg.Done()
defer wgProducer.Done()
for i := 0; i < 10; i++ {
ci <- i
}
}(chanInt)
//生产者2
go func(ci chan int) {
defer wg.Done()
defer wgProducer.Done()
for i := 10; i < 20; i++ {
ci <- i
}
}(chanInt)
//消费者1个
go func(ci chan int) {
defer wg.Done()
for v := range ci {
fmt.Println(v)
}
}(chanInt)
//关闭channel goroutine
go func(ci chan int) {
defer wg.Done()
wgProducer.Wait()
close(ci)
}(chanInt)
wg.Wait()
}
// 生产者:消费者=M:N
func testMN() {
chanInt := make(chan int)
wg := sync.WaitGroup{}
wgProducer := sync.WaitGroup{}
//生产者2个
wgProducer.Add(2)
wg.Add(5)
//生产者1
go func(ci chan int) {
defer wg.Done()
defer wgProducer.Done()
for i := 0; i < 10; i++ {
ci <- i
}
}(chanInt)
//生产者2
go func(ci chan int) {
defer wg.Done()
defer wgProducer.Done()
for i := 10; i < 20; i++ {
ci <- i
}
}(chanInt)
//消费者1
for i := 0; i < 2; i++ {
go func(ci chan int, id int) {
defer wg.Done()
for v := range ci {
fmt.Printf("receive from consumer %d, %d\n", id, v)
}
}(chanInt, i)
}
//消费者2
go func() {
defer wg.Done()
wgProducer.Wait()
close(chanInt)
}()
wg.Wait()
}
上面的发送者:接收者为N:1和发送者:接收者为M:N两种情况讨论的都是发送者最终会执行完循环,主动退出goroutine的情况。对于某些情况下,发送者的goroutine是死循环不会退出的情况,优雅关闭channel方法分析如下:
func TestN1NoExit() {
ci := make(chan int, 10)
exitCh := make(chan struct{})
const numSenders = 10
const MaxValue = 100
wg := sync.WaitGroup{}
//senders
for i := 0; i < numSenders; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
for {
ri := rand.Intn(MaxValue)
select {
case ci <- ri:
fmt.Printf("put %d to channel from sender %d\n", ri, index)
case <-exitCh:
fmt.Printf("sender %d exit\n", index)
return
}
}
}(i)
}
//receivers
wg.Add(1)
go func() {
defer wg.Done()
for v := range ci {
if v == MaxValue-1 {
close(exitCh)
fmt.Println("send exit signal to senders")
return
}
fmt.Println(v)
}
}()
wg.Wait()
}
可以看到上面的处理中,并没有执行close(ci)操作,有同学会有疑问,上面的ci最后关闭了吗?「是关闭了,借东风(gc)来关闭的」。当一个channel没有被引用的时候,它会被gc回收,那channel什么时候是没有被引用呢,即没有发送者goroutine和接收者goroutine与之关联的时候。上面的代码中,执行完close(exitCh)之后,接着者直接return了, 这时channel ci的接着者goroutine已退出,然后发送者在select的时候,可能会选择case ci <- ri(这个时候ci还未满),不管它被选择多少次,最终肯定会选择case <-exitCh(ci满的时候),当选择到case <-exitCh的时候,发送者goroutine也退出了,当所有的发送者goroutine都退出的时候,channel ci就处于未被引用的状态,它会gc回收,也就不用关闭了。所以上面做法很巧妙的借助goroutine退出达到关闭的目的,通过引入一个exitCh channel,从接收者来close(exitCh),因为这种情况下接收者为1,发送者为N。这种方法可以概括为在接收方close 引入的中间channel间接实现关闭真正channel。
func TestMNNoExit() {
ci := make(chan int, 10)
exitCh := make(chan struct{})
toStop := make(chan struct{}, 1)
const numSenders = 10
const numReceivers = 5
const MaxValue = 100
wg := sync.WaitGroup{}
//senders
for i := 0; i < numSenders; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
for {
ri := rand.Intn(MaxValue)
if ri == 0 {
select {
case toStop <- struct{}{}:
default:
}
return
}
select {
case ci <- ri:
fmt.Printf("put %d to channel from sender %d\n", ri, index)
case <-exitCh:
fmt.Printf("sender %d exit\n", index)
return
}
}
}(i)
}
//receivers
for i := 0; i < numReceivers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case v := <-ci:
if v == MaxValue-1 {
select {
case toStop <- struct{}{}:
fmt.Println("send exit signal to senders")
default:
}
return
}
fmt.Println(v)
case <-exitCh:
return
}
}
}()
}
wg.Add(1)
go func() {
defer wg.Done()
<-toStop
close(exitCh)
}()
wg.Wait()
}
对于M:N的场景,就不能在receivers中直接close(exitCh)中进行了,因为存在多个receiver,会有重复关闭的情况。那怎么处理呢?找一个代理角色(新开一个goroutine),在代理角色中执行close(exitCh), 这个时候代理角色只要一个,所以不存在重复关闭的exitCh的情况。那还有一个问题要解决,什么时候执行close(exitCh)呢?我们可以在接收者中或发送者中通知代理角色,接收者、发送者和代理角色本质都是goroutine,那通知方式就是goroutine之间的通信方式,在golang用channel就可以达到次目的,所以要新申请一个channel,来通知代理角色执行close(exitCh), 新申请的channel就是上面代码中的toStop,注意,这里的「toStop要申请为带缓冲区的」,读者可以想一想,如果申请成非缓冲区会有什么问题。不申请带缓冲区的,第一个发生的关闭channel请求可能会丢失。
Gracefully Close Channels[1]
[1]
How to Gracefully Close Channels: https://go101.org/article/channel-closing.html