前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka 并发消费单个 partition

Kafka 并发消费单个 partition

作者头像
Yuyy
发布2023-05-01 09:38:27
9630
发布2023-05-01 09:38:27
举报
文章被收录于专栏:yuyy.info技术专栏

背景

kafka可以通过多个partition实现并发,但是针对单个partition,必须顺序提交。假如消息发送顺序为1,2,3,如果先提交3,会导致1,2被提交。所以不能并发执行后立即提交。

解决思路

记录接收消息的顺序到listA,然后并发执行,执行成功的消息,记录到setB。起个goroutine定时提交,顺序遍历listA,依次判断该消息是否在setB里,如果不在,就把前面遍历的那部分提交,然后等待下一次定时执行。

实现

代码语言:javascript
复制
type ConsumerGroupRepo struct {
    reader          *kafka.Reader
    fetchMsgHeader  *msgNode
    fetchMsgTail    *msgNode
    commitMsgHeader *msgNode
    commitMsgTail   *msgNode
    msgChan         chan *kafka.Message
    queueLock       sync.Mutex
}

type msgNode struct {
    msg  *kafka.Message
    next *msgNode
}

func NewConsumerGroupRepo(brokers []string, groupID string, topics []string) *ConsumerGroupRepo {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:     brokers,
        GroupID:     groupID,
        GroupTopics: topics,
        MinBytes:    10e3, //10KB
        MaxBytes:    10e6, //10MB
    })
    return &ConsumerGroupRepo{
        reader:          reader,
        fetchMsgHeader:  nil,
        fetchMsgTail:    nil,
        commitMsgHeader: nil,
        commitMsgTail:   nil,
        msgChan:         make(chan *kafka.Message),
    }
}

func (c *ConsumerGroupRepo) Consume(ctx context.Context) error {
    defer close(c.msgChan)
    for {
        select {
        case <-ctx.Done():
            return errors.New("kafka consume stop, context cancel error")
        default:
            m, err := c.reader.FetchMessage(ctx)
            if err != nil {
                return err
            }
            c.queueLock.Lock()
            if c.fetchMsgHeader == nil {
                c.fetchMsgHeader = &msgNode{msg: &m, next: nil}
                c.fetchMsgTail = c.fetchMsgHeader
            } else {
                c.fetchMsgTail.next = &msgNode{msg: &m, next: nil}
                c.fetchMsgTail = c.fetchMsgTail.next
            }
            c.queueLock.Unlock()
            c.msgChan <- &m
        }
    }
}

func (c *ConsumerGroupRepo) Commit(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return errors.New("kafka commit stop, context cancel error")
        default:
            time.Sleep(time.Second)
            if err := c.commit(ctx); err != nil {
                return err
            }
        }
    }
}

func (c *ConsumerGroupRepo) commit(ctx context.Context) error {
    c.queueLock.Lock()
    commitMsgs := make([]kafka.Message, 0, 32)
    for c.fetchMsgHeader != nil {
        var commitNode *msgNode = nil
        cnode := c.commitMsgHeader
        var cnodePre *msgNode = nil
        // 可用set优化
    for cnode != nil {
            if cnode.msg == c.fetchMsgHeader.msg {
                commitNode = c.fetchMsgHeader
                commitMsgs = append(commitMsgs, *(commitNode.msg))
                // 移动fetch指针
                c.fetchMsgHeader = c.fetchMsgHeader.next
                if cnode == c.commitMsgHeader {
                    // 若过commit队列第一个匹配,则直接修改头指针
                    c.commitMsgHeader = c.commitMsgHeader.next
                } else {
                    // 删除中间节点
                    cnodePre.next = cnode.next
                }
                // 若删除的为tail节点
                if cnode == c.commitMsgTail {
                    c.commitMsgTail = cnodePre
                }
                break
            }
            cnodePre = cnode
            cnode = cnode.next
        }
        if commitNode == nil {
            break
        }
    }
    c.queueLock.Unlock()
    if len(commitMsgs) > 0 {
        err := c.reader.CommitMessages(ctx, commitMsgs...)
        return errors.Wrap(err, "commit kafka msg error")
    }
    return nil
}

func (c *ConsumerGroupRepo) GetMsgChan() <-chan *kafka.Message {
    return c.msgChan
}

func (c *ConsumerGroupRepo) CommitMsg(msg *kafka.Message) {
    c.queueLock.Lock()
    defer c.queueLock.Unlock()
    if c.commitMsgHeader == nil {
        c.commitMsgHeader = &msgNode{msg: msg, next: nil}
        c.commitMsgTail = c.commitMsgHeader
    } else {
        c.commitMsgTail.next = &msgNode{msg: msg, next: nil}
        c.commitMsgTail = c.commitMsgTail.next
    }
}

使用

代码语言:javascript
复制
func (u *xxxServer) Start(ctx context.Context) error {
    u.wg.Add(u.xxxConcurrentNum + 2)
    cancelCtx, cancel := context.WithCancel(context.Background())
    u.cancelFunc = cancel
    // 启动拉取消息goroutine
    go func() {
        defer u.wg.Done()
        if err := u.consumerGroupRepo.Consume(cancelCtx); err != nil {
            log.Errorf("kafka consumer error: %+v", err)
            u.cancelFunc()
        }
    }()
    // 启动commit goroutine
    go func() {
        defer u.wg.Done()
        if err := u.consumerGroupRepo.Commit(cancelCtx); err != nil {
            log.Errorf("kafka commit error: %+v", err)
            u.cancelFunc()
        }
    }()
    // 启动消费 goroutine
    for i := 0; i < u.xxxConcurrentNum; i++ {
        go func() {
            defer u.wg.Done()
            for msg := range u.consumerGroupRepo.GetMsgChan() {
                if err := u.handleMsg(cancelCtx, msg); err != nil {
                    log.Errorf("handle kafka msg error: %+v", err)
                    u.cancelFunc()
                }
                u.consumerGroupRepo.CommitMsg(msg)
            }
        }()
    }
    return nil
}

Post Views: 5

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-4-24 1,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 解决思路
  • 实现
  • 使用
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档