前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【并发编程】如果用 Channel 解决并发问题?

【并发编程】如果用 Channel 解决并发问题?

作者头像
了凡银河系
发布2022-08-22 13:49:01
4500
发布2022-08-22 13:49:01
举报
文章被收录于专栏:了凡的专栏

前言

什么是Channel?

在Go语言基础中应该就学过Channel,那个时候应该都认为只是一个基础类型,是一个管道一样类似的东西,方便快速读写操作,但是Channel在并发中扮演什么角色呢?Channel是Go语言内建的first-class类型,也是Go语言与众不同的特性之一。Go语言的Channel设计非常精巧而且简单,在其他语言也有类似Go风格的Channel库,但是并不像Go语言一样把Channel内置到了语言规范中。

Channel的发展

Channel这种Go编程语言中的特有的树结构,还是要从CSP模型学习,看看CSP模型对Go创始人设计Channel类型的影响。

CSP是Communicating Sequential Process 的简称,中文为通信顺序进程,或者叫交换信息的循序进程,是用来描述并发系统中进行交互的一种模式。

CSP最早出现于计算机科学家发布Quciksort算法的作者所发表的论文中。最初,论文中提出的CSP版本在本质上不是一种进程演算,而是一种并发编程,但之后经过了很多改进,最终发展并精炼出CSP的理论。CSP允许使用进程组件来描述系统,它们独立运行,并且只通过消息传递的方式通信

Go的创始人之一Rob Pike也曾经说过:每一个计算机程序员都应该读一下Tony Hoare1978年的关于CSP的论文,后来将CSP理论真正应用于语言本身,通过引入Channel这个新的类型,来实现CSP的思想。

Channel类型是Go语言内置的类型,你无需引入某个包,就能使用。虽然Go也提供了传统的并发原语,但是它们都是通过库的方式提供的,你必须要引入sync包或者atomic包才能使用它们,而Channel就不一样了,它是内置类型,使用起来很方面。

Channel和Go的另一个独特的特性goroutine一起为并发编程提供了优雅的、便利的、与传统并发控制不同的方案,并演化出很多 并发模式。

Channel的应用场景

执行业务处理的goroutine不要通过共享内存的方式通信,而是要通过Channel通信的方式分享数据

从Channel的历史和设计哲学上,可以知道,Channel类型和基本并发原语是有竞争关系的,它应用于并发场景,涉及到goroutine之间的通讯,可以提供并发的保护,等等。

作者在这里把Channel的应用场景分为五种类型,很值得思考。

  1. 数据交流:当作并发的buffer(缓冲区)或者queue,解决生产者-消费者问题。多个goroutine可以并发当作生产者(Producer)和消费者(Consumer)。
  2. 数据传递:一个goroutine将数据交给另一个goroutine,相当于把数据的拥有权(引用)托付出去。
  3. 信号通知:一个goroutine可以将信号(closing、closed、data ready等)传递给另一个或者另一组goroutine。
  4. 任务编排:可以让一组goroutine按照一定的顺序并发或者串行的执行,这就是编排的功能。
  5. :利用Channel也可以实现互斥锁的机制。

Channel基本用法

可以往Channel中发送数据,也可以从Channel中接收数据,所以,Channel类型(为了方便,Channel也叫做chan)分为只能接收、只能发送、既可以接收又可以发送三种类型。

代码语言:javascript
复制
ChannelType = ("chan" | "chan" "<-" | "<-" "chan") ElementType

相应地,Channel的正确语法如下:

代码语言:javascript
复制
chan string           // 可以发送接收string
chan<- struct{}       // 只能发送struct{}
<-chan int            // 只能从chan接收int

把既能接收又能发送的chan叫做双向的chan,把只能发送和只能接收的chan叫做单向的chan。其中,“<-”表示单向的chan,一个简便的记法:箭头总是指向左边的,元素类型总是在右边。如果箭头指向chan,就表示可以往chan中塞数据;如果箭头远离chan,就表示chan会往外吐数据。

chan中的元素是任意的类型,所以也可能是chan类型,例如:

代码语言:javascript
复制
chan<- chan int
chan<- <-chan int
<-chan <-chan int
chan (<-chan int)

如何判断箭头符号属于哪个chan?“<-”有个规则,总是尽量和左边的chan结合,因此可以理解为:

代码语言:javascript
复制
chan<- chan int   // 《- 和第一个chan结合
chan<- <-chan int // 第一个<-和最左边的chan结合,第二个<-和左边第二个chan结合
<-chan <-chan int // 第一个<-和最左边的chan结合,第二个<-和左边的第二个chan结合
chan (<-chan int) // 因为括号的原因,<-和括号内第一个chan结合

通过make,可以初始化一个chan,未初始化的chan的零值是nil。你可以设置它的容量,比如下面的chan的容量是9527,把这样的chan叫做buffered chan;如果没有设置,它的容量是0,把这样的chan叫做unbuffered chan(无缓冲 chan)。

代码语言:javascript
复制
meke(chan int, 9527)

如果chan中还有数据,那么,从这个chan接收数据的时候就不会阻塞,如果chan还未满(“满”指达到其容量),给它发送数据也不会阻塞,否则就会阻塞。unbuffered chan只有读写都准备好之后才不会阻塞,这也是很多使用unbuffered chan时常见的Bug。

注意:nil是chan的零值,是一种特殊的chan,对值是nil的chan的发送接收调用者总是会阻塞

1. 发送数据

往chan中发送一个数据使用“ch<-”,发送数据是一条语句:

代码语言:javascript
复制
ch <- 2000

这里的ch是chan int类型或者是chan <- int。

2. 接收数据

从chan中接收一条数据使用“<-chan”,接收数据也是一条语句:

代码语言:javascript
复制
x := <-ch // 把接收的一条数据赋值给变量x
foo(<-ch) // 把接收的一个的数据作为参数传给函数
<-ch // 丢弃接收的一条数据

这里的ch类型是chan T或者 <-chan T。

接收数据时,还可以返回两个值。第一个值是返回的chan中的元素,很多人不太熟悉的是第二个值。第二个值是bool类型,代表是否成功地从chan中读取到一个值,如果第二个参数是false,chan已经被close而且chan中没有缓存的数据,这个时候,第一个值是零值。所以,如果从chan读取到一个零值,可能是sender真正发达的零值,也可能是closed的并且没有缓存元素产生的零值。

3. 其它操作

Go内建的函数close、cap、len都可以操作chan类型:chose会把chan关闭掉,cap返回chan的容量,len返回chan中缓存的还未取走的元素数量。

send和recv都可以作为select语句的case clause,如下面的例子:

代码语言:javascript
复制
func main() {
    var ch = make(chan int, 10)
    for i := 0; i < 10; i++ {
        select {
            case ch <- i:
            case v := <-ch:
                fmt.Println(v)
        }
    }
}

chan还可以应用于for-range语句中,比如:

代码语言:javascript
复制
for v := range ch {
    fmt.Println(v)
}

或者是忽略读取的值,只是清空chan:

代码语言:javascript
复制
for range ch {
}

Channel的实现原理

chan的数据结构、初始化方法以及三个重要的操作方法都是什么?

chan数据结构

chan类型的数据结构如下图所示,它的数据类型是

源码结构体:

代码语言:javascript
复制
type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

每个字段都有什么意义?

  • qcount: 代表chan中已经接收但还没被取走的元素的个数。内建函数len可以返回这个字段的值。
  • dataqsiz:队列的大小。chan使用一个循环队列来存放元素,循环队列很适合这种生产者-消费者的场景
  • buf:存放元素的循环队列的buffer。
  • elemtype和elemsize:chan中元素的类型和size。因为chan一旦声明,它的元素类型是固定的,即普通类型或者指定类型,所以元素大小也是固定的。
  • sendx:处理发送数据的指针在buf中的位置。一旦接收了新的数据,指针就会加上elemsize,移向下一个位置。buf的总大小是elemsize的整数倍,而且buf是一个循环列表。
  • recvx:处理接收请求时的指针在buf中的位置。一旦取出数据,此指针会移动到下一个位置。
  • recvq:chan是多生产者多消费者的模式。如果消费者因为没有数据可读而被阻塞了,就会被加入到recvq队列中。
  • sendq:如果生产者因为buf满了而阻塞,会被加入到sendq队列中。

推荐文章:https://learnku.com/articles/61513

初始化

只关注makechan就好,因为makechan64只是做了size检查,底层还是调用makechan实现的。makechan的目标就是生成hchan对象。

看一下makechan的主要逻辑。主要的逻辑我都加上了注释,它会根据chan的容量的大小和元素的类型不同,初始化不同的存储空间:

代码语言:javascript
复制
func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    // 略去检查代码
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    
    var c *hchan
    switch {
    case mem == 0:
        // chan的size或者元素的size是0,不必创建buf
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // 元素不是指针,分配一块连续的内存给hchan数据结构和buf
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        // hchan数据结构后面紧接着就是buf
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 元素包含指针,那么单独分配buf
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
    // 元素大小、类型、容量都记录下来
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)
    
    return c
}

最终,针对不同的容量和元素类型,这段代码分配了不同的对象来初始化hchan对象的字段,返回hchan对象。

send

Go在编译发送数据给chan的时候,会把send语句转换成chansend1函数,chansend1函数会调用chansend,分析学习它的逻辑:

代码语言:javascript
复制
func chansend1(c *hchan, elem unsafe.Pointer) {
   chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   // 第一部分
   if c == nil {
      if !block {
         return false
      }
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }
   ......
}

最开始,第一部分是进行判断:如果chan是nil的话,就把调用者goroutine park(阻塞休眠),调用者就永远被阻塞住了 ,所以,第11行是不可能执行到的代码。

代码语言:javascript
复制
// 第二部分,如果chan没有被close,并且chan满了,直接返回
if !block && c.closed == 0 && full(c) {
    return false
}

第二部分的逻辑是当你往一个已经满了的chan实例发送数据时,并且想不阻塞当前调用,那么这里的逻辑是直接返回。chansend1方法在调用chansend的时候阻塞参数,所以不会执行到第二部分的分支里。

代码语言:javascript
复制
// 第三部分,chan已经被close的情景
lock(&c.lock) // 开始加锁
if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
}

第三部分显示的是,如果chan已经被close了,再往里面发送数据的话会panic。

代码语言:javascript
复制
// 第四部分,从接收队列中出队一个等待的receiver
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

第四部分,如果等待队列中有等待的receiver,那么这段代码就把它从队列中弹出,然后直接把数据交给它(通过 memmove(dst,src,t.size)),而不需要放入到buf中,速度可以更快一些。

代码语言:javascript
复制
// 第五部分,buf还么满
if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    if raceenabled {
        raceacquire(qp)
        racerelease(qp)
    }
    typedmemmove(c.elemtype,qp,ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
}

第五部分说明当前没有receiver, 需要把数据放入到buf中,放入之后,就成功返回了。

代码语言:javascript
复制
// 第六部分,buf满。
// chansend1不会进入if块里,因为chansend1的block=true
if !block {
    unlock(&c.lock)
    return false
}
......

第六部分是处理buf满的情况。如果buf满了,发送者的goroutine就会加入到发送者的等待队列中,直到被唤醒。这个时候,数据或者被取走了,或者chan被close了。

recv

再处理从chan中接收数据时,Go会把代码换成chanrecv1函数,如果要返回值,会转换成chanrecv2,chanrecv1函数和chanrecv2会调用chanrecv。源码分段为:

代码语言:javascript
复制
func chanrecv1(c *hchan, elem unsafe.Pointer){
    chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 第一部分,chan为nil
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
}

chanrecv1和chanrecv2传入的block参数的值是true,都是阻塞方式,所以分析chanrecv的实现的时候,不考虑block=false的情况。

第一部分是chan为nil的情况。和send一样,从nil chan中接收(读取、获取)数据时,调用者会被永远阻塞。

代码语言:javascript
复制
// 第二部分,block=false且c为空
if !block && empty(c) {
    ......
}

第二部分你可以直接忽略,因为不是我们这次要分析的场景。

代码语言:javascript
复制
// 加锁,返回时释放锁
lock(&c.lock)
// 第三部分,c已经被close,且chan为空empty
if c.closed != 0 && c.qcount == 0 {
    unlock(&c.lock)
    if ep != nil {
        typedmemclr(c.elemtype, ep)
    }
    return true, false
}

第三部分是chan已经被close的情况。如果chan已经被close了,并且队列中没有缓存的元素,那么返回true、false。

代码语言:javascript
复制
// 第四部分,如果sendq队列中等待发送的sender
if sg := c.sendq.dequeue(); sg != nil {
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true,true
}

第四部分是处理sendq队列中有等待者的情况。这个时候,如果buf中有数据,优先从buf中读取数据,否则直接从等待队列中弹出一个sender,把它的数据赋值给这个receiver。

代码语言:javascript
复制
// 第五部分,没有等待的sender,buf中有数据
if c.qcount > 0 {
    qp := chanbuf(c, c.recvx)
    if ep != nil {
        typedmemmove(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx ++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount --
        unlock(&c.lock)
        return true, true
    }
    
    if !block {
        unlock(&c.lock)
        return false, false
    }
    // 第六部分,buf中没有元素,阻塞
}

第五部分是处理没有等待的sendr的情况。这个是和chansend共用一把大锁,所以不会并发的问题。如果buf有元素,就取出一个元素给receiver。

第六部分是处理buf中没有元素的情况。如果没有元素,那么当前的receiver就会被阻塞,直到它从sender中接收了数据,或者是chan被close,才返回。

close

通过close函数,可以把chan关闭,编译器会替换closechan方法的调用。

看代码,如果chan为nil,close会panic;如果chan已经closed,再次close也会panic。否则的话,如果chan不为nil,chan也没有closed,就把等待队列中的sender(writer)和receiver(reader)从队列中全部移除并唤醒。

代码语言:javascript
复制
func closechan(c *hchan) {
    if c == nil  { // chan为nil,panic
        panic(plainError("close of channel"))
    }
    
    lock(&c.lock)
    if c.closed != 0 { // chan已经closed,panic
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    
    c.closed = 1
    
    var glist gList
    
    // 释放所有的reader
    for {
        sg := c.recvq.dequeue()
        ......
        gp := sg.g
        ......
        glist.push(gp)
    }
    
    // 释放所有的writer (它们会panic)
    for {
        sg := c.sendq.dequeue()
        ......
        gp := sg.g
        ......
        glist.push(gp)
    }
    unlock(&c.lock)
    
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

总结

chan的值和状态有多种情况,而不同的操作(send、recv、close)可能得到不同的结果,这是使用chan类型时经常让人困惑的地方。

注意:关注panic的情况,另外还要掌握哪些会block的场景,都是导致死锁或goroutine泄漏的罪魁祸首

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-02-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 了凡银河系 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • 什么是Channel?
      • Channel的发展
      • Channel的应用场景
      • Channel基本用法
        • 1. 发送数据
          • 2. 接收数据
            • 3. 其它操作
            • Channel的实现原理
              • chan数据结构
                • 初始化
                  • send
                    • close
                    • 总结
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档