前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang中的channel解析与实战

Golang中的channel解析与实战

作者头像
素履coder
发布2022-10-05 16:00:35
5800
发布2022-10-05 16:00:35
举报
文章被收录于专栏:素履coder

1. 简介#

channel也叫通道,类似于一个队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。channel一般分为无缓存通道和有缓存通道,无缓存通道指缓存为0的channel,有缓存通道指缓存大于0的channel

如下是无缓存通道的示例:

代码语言:javascript
复制
func TestChannelNoBuffer(t *testing.T) {
	ch1 := make(chan string)	// 初始化一个缓存为0的通道
	go func() {
		val1 := <-ch1
		fmt.Println(val1)
	}()
	// 如果没有用goroutine去接收通道内的值,这一步将会阻塞
	// 所以goroutine需要写在阻塞这一步的前面
	ch1 <- "value"
    
    // Output
    // value
}

如下是有缓存通道的示例:

代码语言:javascript
复制
func TestWithBuffer(t *testing.T) {
	ch2 := make(chan string, 2)		// 初始化一个缓存为2的通道
	for i := 0; i < 2; i++ {
		ch2 <- "value " + strconv.Itoa(i)
	}
	for i := 0; i < 2; i++ {
		temp := <-ch2
		fmt.Println(temp)
	}
    
    // Output
    // value 1
    // value 2
}

2. channel实现细节#

先来看一下channel的结构体

代码语言:javascript
复制
type hchan struct {
	qcount   uint           // channel中元素个数
	dataqsiz uint           // channel中循环队列的大小
	buf      unsafe.Pointer // 指向datasiz元素的数组
	elemsize uint16
	closed   uint32
	elemtype *_type // 元素类型
	sendx    uint   // 发送操作的下标
	recvx    uint   // 接收操作的下标
	recvq    waitq  // 接收的时候,如果channel缓冲区为空,也没发送者,就把goroutine放到这个链表
	sendq    waitq  // 发送的时候,如果channel缓冲区满了,也没接收者,就把goroutine放到这个链表

	lock mutex
}

type waitq struct {	// 双向链表
	first *sudog
	last  *sudog
}

2.1 make初始化channel#

make用于初始化channel,如下是make的源码主体部分

代码语言:javascript
复制
func makechan(t *chantype, size int) *hchan {
	elem := t.elem
	......
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	var c *hchan
	switch {
	case mem == 0:	// 当创建无缓冲channel时,只会为chan分配一段内存空间
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:	// 当传入的chan类型不是指针类型时,会为chan和buf一次性分配一块连续的内存空间
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:	// 否则会为chan和buf新创建一块内存空间
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

    // 统一更新如下字段
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	......
	return c
}

初始化channel一定要用make的方式,不然channel会处于nil的状态,如这种创建方式就会导致channel处于nil状态:var ch chan int

2.2 向chan发送数据 chan <- i#

下图是channel常见的异常总结,对于理解channel源码有一定帮助

在此之前先看该源码的大致逻辑可以更加轻松的理解

如下源码是向channel发送数据时调用的主体部分

代码语言:javascript
复制
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {	// 如上图,如果向为nil的channel发送数据,则会阻塞
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
    
    ......
    
    lock(&c.lock)
	if c.closed != 0 {	// 向已经关闭的channel发送数据会panic,
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
    // 如果存在阻塞的接收者,则直接把值发送给它,绕过发送通道缓冲区
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
    
    // 如果前面没有发现阻塞的接收者,则判断channel的缓冲区是否满了,
    // 如果没有满,则把发送者放到发送通道缓冲区
    if c.qcount < c.dataqsiz {
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
    
    ......
    
    // 若上面的缓冲区也满了,则会丢给sendq并阻塞,等待接收者唤醒自己
    c.sendq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	KeepAlive(ep)	// 确保值不会被gc掉
    
    // someone woke us up
    // 被唤醒
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
    
    ......
}

2.3 从chan接收数据 i := <- chan#

从chan接收数据即把channel里面的值取出来,如下是源码部分,先来看一张逻辑图,便于理解

如下是源码部分

代码语言:javascript
复制
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ......
    if c == nil {	// 如果channel为nil,则会阻塞
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
    
    ......
    
    // 如果存在阻塞的发送者,则直接从这个发送者接收数据
    if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
	
    // 如果接收通道缓冲区没有放满,则把这个接收者放到接收通道缓冲区
	if c.qcount > 0 {
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
    
    ......
    
    // 如果缓冲区也放满了,则会丢到recvq并阻塞,等待有发送者把自己唤醒
    c.recvq.enqueue(mysg)
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    
    // someone woke us up
    // 被唤醒
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
    
    ......
}

这里有一个点要注意一下,从已关闭的channel取数据是不会panic的,可以正常取值,如果取完了channel里面的值且channel已关闭,仍然取值的话,就会返回类型相应的默认值,如下例:

代码语言:javascript
复制
func TestChannelNoBuffer(t *testing.T) {
	ch1 := make(chan int)
	go func() {
		time.Sleep(time.Second)
		val1 := <-ch1
		fmt.Println(val1)
		val1 = <-ch1
		fmt.Println(val1)
		val1 = <-ch1
		fmt.Println(val1)
	}()
	// 如果没有用goroutine去接收通道内的值,这一步将会阻塞
	// 所以goroutine需要写在阻塞这一步的前面
	ch1 <- 10
	close(ch1)
    
    // Output
    // 10
    // 0
    // 0
}

如果从没有关闭的channel取值,当里面的值取完了仍然取值的话,是取不到值的,会造成另外两个goroutine泄漏,如下例:

代码语言:javascript
复制
func TestChannelNoBuffer(t *testing.T) {
	ch1 := make(chan int)
	go func() {
		time.Sleep(time.Second)
		val1 := <-ch1
		fmt.Println(val1)
	}()
    go func() {
		time.Sleep(time.Second)
		val1 := <-ch1
		fmt.Println(val1)
	}()
	go func() {
		time.Sleep(time.Second)
		val1 := <-ch1
		fmt.Println(val1)
	}()
	// 如果没有用goroutine去接收通道内的值,这一步将会阻塞
	// 所以goroutine需要写在阻塞这一步的前面
	ch1 <- 10
	//close(ch1)
    
    // Output
    // 10
}

2.4 close关闭channel#

如下是channel的关闭操作的源码

代码语言:javascript
复制
func closechan(c *hchan) {
	if c == nil {	// 关闭nil的channel会panic
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 {	// 关闭已经关闭的channel会panic
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

    ......

    // 把所有接收队列里阻塞的等待者出队,并放入glist
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// 把所有发送队列里阻塞的等待者出队,并放入glist
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

    // 如果glist不为空,则为这些被阻塞的goroutine调用goready函数唤醒这些goroutine
    // 并重新对这些goroutine进行调度
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

3. 生产者-消费者模型#

如下是一个简单的生产者消费者模型

代码语言:javascript
复制
var wg sync.WaitGroup

func producer(data chan<- int) {
	for i := 0; i < 4; i++ {
		data <- i
	}
    // 这里记得要关闭channel,不然会发生阻塞,因为消费者的数量没有限制,
    // 当消费者从空的channel取值的时候会阻塞
	close(data)
}

func consumer(data <-chan int) {
	defer wg.Done()
	for {
		v, ok := <-data
		if !ok {
			break
		}
		fmt.Println("---:", v, "  ===:", ok)
	}
}

func main() {
	data := make(chan int)
	go producer(data)

	wg.Add(1)
	go consumer(data)
	wg.Wait()
}

// Output
// ---: 0   ===: true
// ---: 1   ===: true
// ---: 2   ===: true
// ---: 3   ===: true

4. channel模拟实现消息队列#

由于channel所具有的队列特性,因此可以尝试使用它来模拟实现一个消息队列。需要声明channel具有如下一些特性:

  • channel里面的每个数据只能被一个goroutine消费,拿走了就没有了
  • channel一般是按照先进先出的方式消费,无法随机消费

关于消息队列的介绍,可以参考我的这篇文章: Title 消息队列学习 | 基础

其实普通的channel使用就是消息队列的队列模型,一边发送,另一边接收,顺序是FIFO,队列模型是早期的消息队列使用的数据结构,本身比较简单,就不做模拟实现,现在来看看如何实现一些其他的模型

3.1 发布订阅模型#

发布订阅模型的基本逻辑如下图:

生产者往Broker里发送消息,然后供多个消费者订阅,此处没有深入到Broker的细节如Topic、Partition等,只是使用channel简单模拟消息队列中生产者和消费者的关系。此处模拟一个消费者发送消息,有多个消费者消费的场景

代码语言:javascript
复制
type Broker struct {
	consumers []*Consumer
}

type Consumer struct {
	ch chan string
}

func (b *Broker) produce(msg string) {
	// 轮询给消费者发送消息
	for _, v := range b.consumers {
		v.ch <- msg
	}
}

func (b *Broker) subscribe(consumer *Consumer) {
	b.consumers = append(b.consumers, consumer)
}

func TestMq1(t *testing.T) {
	// 初始化一个Broker节点
	b := &Broker{
		consumers: make([]*Consumer, 0, 4),
	}

	// 创建2个消费者
	consumer1 := &Consumer{
		ch: make(chan string, 1),
	}
	consumer2 := &Consumer{
		ch: make(chan string, 1),
	}

	// 这2个消费者订阅Broker
	b.subscribe(consumer1)
	b.subscribe(consumer2)

	// 生产者发送一个消息
	b.produce("一条消息")

	// 2个消费者拿到了刚才生产者发送的消息
	fmt.Println(<-consumer1.ch)
	fmt.Println(<-consumer2.ch)
	
	// Output
	// 一条消息
	// 一条消息
}

5. 参考链接#

极客时间go实战训练营

https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/channel.html

https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#64-channel

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-09-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 简介#
  • 2. channel实现细节#
    • 2.1 make初始化channel#
      • 2.2 向chan发送数据 chan <- i#
        • 2.3 从chan接收数据 i := <- chan#
          • 2.4 close关闭channel#
          • 3. 生产者-消费者模型#
          • 4. channel模拟实现消息队列#
            • 3.1 发布订阅模型#
            • 5. 参考链接#
            相关产品与服务
            消息队列 CMQ 版
            消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档