前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多协程错误处理与errgroup

多协程错误处理与errgroup

作者头像
千灵域
发布2022-06-17 13:04:34
1.4K0
发布2022-06-17 13:04:34
举报
文章被收录于专栏:challenge filter

多协程错误处理

背景

多goroutine错误处理是个常见的请求,多个goroutine都会返回error,但是很多时候只要发生了一个错误,整体都是需要回退的。 我昨天遇见的情况大体如下面这个代码所示。 这个代码有个比较明显的问题是如果错误的数量短期内过多,它是可能填满errChan并导致goroutine阻塞的。

代码语言:javascript
复制
var wg sync.WaitGroup
errChan = make(chan error, currency)
for i:=0;i<x;i++{
    wg.Add(1)
    go func(){
        defer wg.Done()
        err := someFuncThatGenerateErr()
        errChan <- err
    }()
}
wg.Wait()
err := <-errChan
if err != nil{
    return err
}

使用channel处理

在昨天的时候,真的有一个请求错误数量达到指定值以上,把整个请求阻塞住了。因此发现了这个bug,对这个地方进行修改。 由于只需要读取第一个err,一个很自然地想法是使用一个外部的error变量来读取。 这会带来数据竞争(读写冲突),使得读取的错误变量error并不是第一个而是短时间内的最后一个,如本博客中的另外一篇文章《Go中的原子操作与实践(CAS篇)》所示。

具体来说,数据竞争并不会影响最终结果的好坏。尽管其会导致错误变量内记录的值本身发生变化,但是其实这段逻辑需要的只是任意的一个错误信息,因此最终结果是没问题的。 只是如果真的需要第一个发生的错误的值的话,采用CAS操作锁死无疑是最好的。但正如那篇文章所示,即使采用了CAS操作也是有可能写错的,届时一样会发生数据竞争的情况,导致结果不符合预期。

而更一般的情况是,循环的多个goroutine一旦发生错误,往往需要全部退出。此时如果能通过context进行终止,无疑是最好的选择。

errgroup

golang.org/x/sync/errgroup无疑是这方面最佳的选择。总体代码120行,这个包能够很方便地解决前文所说的需求。

使用说明

以如下的示例代码说明如何使用errgroup

  1. 采用errgroup.WithContext(ctx)创建errgroup与对应的context
  2. 使用errgroup.Do执行goroutine
  3. g.Wait()得到第一个非nil的error。errgroup替换掉了waitGroup

有一个问题:如go-redis或者cos等库中,非nil得错误并不一定就需要退出,比如说存在有专门的EOF错误。 这种情况下errgroup应该如何去进行处理?我看了一下,可能是要自己来实现了。

这份示例代码的问题在于没有使用context.Done。完整代码如下:

代码语言:javascript
复制
package main

import (
	"context"
	"fmt"
	"os"

	"golang.org/x/sync/errgroup"
)

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

type Result string
type Search func(ctx context.Context, query string) (Result, error)

func fakeSearch(kind string) Search {
	return func(_ context.Context, query string) (Result, error) {
		return Result(fmt.Sprintf("%s result for %q", kind, query)), nil
	}
}

func main() {
	Google := func(ctx context.Context, query string) ([]Result, error) {
		g, ctx := errgroup.WithContext(ctx) // 此处基于全局的context产生errgroup和对应的context

		searches := []Search{Web, Image, Video}
		results := make([]Result, len(searches))
		for i, search := range searches {
			i, search := i, search // 防止捕获循环变量造成数据竞争,https://golang.org/doc/faq#closures_and_goroutines
			g.Go(func() error { // 使用g.Go执行goroutine,该函数不能接受参数并返回一个error。
            // 在具体实现的时候可以采用另外的函数封装,并使用channel传递结果
				result, err := search(ctx, query)
				if err == nil {
					results[i] = result
				}
				return err
			})
		}
		if err := g.Wait(); err != nil {
			return nil, err
		}
		return results, nil
	}

	results, err := Google(context.Background(), "golang")
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}
	for _, result := range results {
		fmt.Println(result)
	}

}

包含context.Done的示例代码,采用select选择,如果发生了ctx.Done则直接退出。

代码语言:javascript
复制
// forever 持续打印数字,直到ctx结束
func forever(ctx context.Context, i int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("forever stop now")
            return
        default:
        }
        time.Sleep(time.Second)
        fmt.Printf("goroutine %d is running\n", i)
        runtime.Gosched()
    }
}

// delayError 5秒后报错
func delayError() error {
    time.Sleep(time.Second * 5)
    fmt.Println("dealyError return")
    return errors.New("should stop now")
}

func main() {
    g, ctx := errgroup.WithContext(context.Background())
    for i := 0; i < 2; i++ {
        i := i
        if i == 0 {
            g.Go(delayError)
            continue
        }
        g.Go(func() error {
            forever(ctx, i)
            return nil
        })
    }
    if err := g.Wait(); err != nil {
        fmt.Println(err)
    }
}

原理分析

带上之前的示例与分析

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-06-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 多协程错误处理
    • 背景
      • 使用channel处理
        • errgroup
          • 使用说明
          • 原理分析
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档