kafka可以通过多个partition实现并发,但是针对单个partition,必须顺序提交。假如消息发送顺序为1,2,3,如果先提交3,会导致1,2被提交。所以不能并发执行后立即提交。
记录接收消息的顺序到listA,然后并发执行,执行成功的消息,记录到setB。起个goroutine定时提交,顺序遍历listA,依次判断该消息是否在setB里,如果不在,就把前面遍历的那部分提交,然后等待下一次定时执行。
type ConsumerGroupRepo struct {
reader *kafka.Reader
fetchMsgHeader *msgNode
fetchMsgTail *msgNode
commitMsgHeader *msgNode
commitMsgTail *msgNode
msgChan chan *kafka.Message
queueLock sync.Mutex
}
type msgNode struct {
msg *kafka.Message
next *msgNode
}
func NewConsumerGroupRepo(brokers []string, groupID string, topics []string) *ConsumerGroupRepo {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
GroupTopics: topics,
MinBytes: 10e3, //10KB
MaxBytes: 10e6, //10MB
})
return &ConsumerGroupRepo{
reader: reader,
fetchMsgHeader: nil,
fetchMsgTail: nil,
commitMsgHeader: nil,
commitMsgTail: nil,
msgChan: make(chan *kafka.Message),
}
}
func (c *ConsumerGroupRepo) Consume(ctx context.Context) error {
defer close(c.msgChan)
for {
select {
case <-ctx.Done():
return errors.New("kafka consume stop, context cancel error")
default:
m, err := c.reader.FetchMessage(ctx)
if err != nil {
return err
}
c.queueLock.Lock()
if c.fetchMsgHeader == nil {
c.fetchMsgHeader = &msgNode{msg: &m, next: nil}
c.fetchMsgTail = c.fetchMsgHeader
} else {
c.fetchMsgTail.next = &msgNode{msg: &m, next: nil}
c.fetchMsgTail = c.fetchMsgTail.next
}
c.queueLock.Unlock()
c.msgChan <- &m
}
}
}
func (c *ConsumerGroupRepo) Commit(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return errors.New("kafka commit stop, context cancel error")
default:
time.Sleep(time.Second)
if err := c.commit(ctx); err != nil {
return err
}
}
}
}
func (c *ConsumerGroupRepo) commit(ctx context.Context) error {
c.queueLock.Lock()
commitMsgs := make([]kafka.Message, 0, 32)
for c.fetchMsgHeader != nil {
var commitNode *msgNode = nil
cnode := c.commitMsgHeader
var cnodePre *msgNode = nil
// 可用set优化
for cnode != nil {
if cnode.msg == c.fetchMsgHeader.msg {
commitNode = c.fetchMsgHeader
commitMsgs = append(commitMsgs, *(commitNode.msg))
// 移动fetch指针
c.fetchMsgHeader = c.fetchMsgHeader.next
if cnode == c.commitMsgHeader {
// 若过commit队列第一个匹配,则直接修改头指针
c.commitMsgHeader = c.commitMsgHeader.next
} else {
// 删除中间节点
cnodePre.next = cnode.next
}
// 若删除的为tail节点
if cnode == c.commitMsgTail {
c.commitMsgTail = cnodePre
}
break
}
cnodePre = cnode
cnode = cnode.next
}
if commitNode == nil {
break
}
}
c.queueLock.Unlock()
if len(commitMsgs) > 0 {
err := c.reader.CommitMessages(ctx, commitMsgs...)
return errors.Wrap(err, "commit kafka msg error")
}
return nil
}
func (c *ConsumerGroupRepo) GetMsgChan() <-chan *kafka.Message {
return c.msgChan
}
func (c *ConsumerGroupRepo) CommitMsg(msg *kafka.Message) {
c.queueLock.Lock()
defer c.queueLock.Unlock()
if c.commitMsgHeader == nil {
c.commitMsgHeader = &msgNode{msg: msg, next: nil}
c.commitMsgTail = c.commitMsgHeader
} else {
c.commitMsgTail.next = &msgNode{msg: msg, next: nil}
c.commitMsgTail = c.commitMsgTail.next
}
}
func (u *xxxServer) Start(ctx context.Context) error {
u.wg.Add(u.xxxConcurrentNum + 2)
cancelCtx, cancel := context.WithCancel(context.Background())
u.cancelFunc = cancel
// 启动拉取消息goroutine
go func() {
defer u.wg.Done()
if err := u.consumerGroupRepo.Consume(cancelCtx); err != nil {
log.Errorf("kafka consumer error: %+v", err)
u.cancelFunc()
}
}()
// 启动commit goroutine
go func() {
defer u.wg.Done()
if err := u.consumerGroupRepo.Commit(cancelCtx); err != nil {
log.Errorf("kafka commit error: %+v", err)
u.cancelFunc()
}
}()
// 启动消费 goroutine
for i := 0; i < u.xxxConcurrentNum; i++ {
go func() {
defer u.wg.Done()
for msg := range u.consumerGroupRepo.GetMsgChan() {
if err := u.handleMsg(cancelCtx, msg); err != nil {
log.Errorf("handle kafka msg error: %+v", err)
u.cancelFunc()
}
u.consumerGroupRepo.CommitMsg(msg)
}
}()
}
return nil
}
Post Views: 5