最近有个项目 go-patterns 挺火的,目前已经 16.9k star 了(虽然是项目还是不太完善/健全的状态)。本文也参考
go-patterns
列举几个个人认为比较重要的 go patterns, 这里的 pattern 并不是设计模式
, 更多是是广义的在 golang 中的一些有效的设计和开发模式.
这是一种很常见的模式,但是在 golang 中,这种模式能够提供更多有用/高级的选项。比如 我们可以定义三种消费者:第一种,生产者生产的消息会阻塞,等所有消费者都消费完,第二种,生产者不等消费者,生产完消息就返回,消费者异步消费;第三种,消费者并行消费,生产者等所有消费者都消费完再返回。
模式1: [生产] --> [消费1] --> [消费2] --> [生产返回]
模式1: [生产] --> [生产返回]
--> [消费1]
--> [消费2]
模式1: [生产] -->[消费1] --> [生产返回]
--> [消费2]
核心代码如下:
type EventType string
type Event struct {
EventType EventType
Data interface{}
}
type Subscriber interface {
// 处理 Event
HandleEvent(ctx context.Context, event *Event) error
}
type EventBus interface {
Subscribe(topic EventType, s Subscriber) // Subscribe 注册会阻塞 Publish 方法, Subscriber 会依次执行
SubscribeAsync(topic EventType, s Subscriber) // SubscribeAsync 注册不会阻塞 Publish 方法, Subscriber 会并发执行
SubscribeAsyncWait(topic EventType, s Subscriber) // SubscribeAsyncWait 注册会阻塞 Publish 方法, Subscriber 会并发执行, Publish 会在所有 Subscriber 执行完成之后返回
Unsubscribe(topic EventType, s Subscriber) // Unsubscribe 取消注册
Publish(ctx context.Context, e *Event) error // 发送消息
}
type executeMode string
var (
executeModeSync executeMode = "sync"
executeModeAsync executeMode = "async"
executeModeAsyncWait executeMode = "async_wait"
)
// EventBusImp
type EventBusImp struct {
syncHandlers map[EventType][]*eventHandler
asyncHandlers map[EventType][]*eventHandler
asyncWaitHandlers map[EventType][]*eventHandler
lock sync.Mutex // a lock for the map
wait sync.WaitGroup // shared wait for async
}
type eventHandler struct {
name string
s Subscriber
}
// New returns new EventBus with empty handlers.
func New() *EventBusImp {
b := &EventBusImp{
make(map[EventType][]*eventHandler),
make(map[EventType][]*eventHandler),
make(map[EventType][]*eventHandler),
sync.Mutex{},
sync.WaitGroup{},
&defaultExecutor{},
}
return b
}
// Subscribe 注册会阻塞 Publish 方法, Subscriber 会依次执行
func (bus *EventBusImp) Subscribe(eventType EventType, s Subscriber) {
bus.subscribe(eventType, executeModeSync, s)
}
// SubscribeAsync 注册不会阻塞 Publish 方法, Subscriber 会并发执行
func (bus *EventBusImp) SubscribeAsync(eventType EventType, s Subscriber) {
bus.subscribe(eventType, executeModeAsync, s)
}
// SubscribeAsyncWait 注册会阻塞 Publish 方法, Subscriber 会并发执行, Publish 会在所有 Subscriber 执行完成之后返回
func (bus *EventBusImp) SubscribeAsyncWait(eventType EventType, s Subscriber) {
bus.subscribe(eventType, executeModeAsyncWait, s)
}
func (bus *EventBusImp) subscribe(eventType EventType, mode executeMode, s Subscriber) {
bus.lock.Lock()
defer bus.lock.Unlock()
h := &eventHandler{name: subscriberName(s), s: s}
switch mode {
case executeModeAsync:
bus.asyncHandlers[eventType] = append(bus.asyncHandlers[eventType], h)
case executeModeAsyncWait:
bus.asyncWaitHandlers[eventType] = append(bus.asyncWaitHandlers[eventType], h)
default:
bus.syncHandlers[eventType] = append(bus.syncHandlers[eventType], h)
}
}
func (bus *EventBusImp) Publish(ctx context.Context, e *Event) error {
var errList []error
handlers := bus.syncHandlers
if l, ok := handlers[e.EventType]; ok {
for index := range l {
errList = append(errList, bus.invoke(l[index], ctx, e, executeModeSync))
}
}
handlers = bus.asyncHandlers
if l, ok := handlers[e.EventType]; ok {
for index := range l {
innerIndex := index
bus.wait.Add(1)
err := go func() error {
defer bus.wait.Done()
return bus.invoke(l[innerIndex], context.Background(), e, executeModeAsync)
}()
errList = append(errList, err)
}
}
handlers = bus.asyncWaitHandlers
if l, ok := handlers[e.EventType]; ok && len(l) > 0 {
waitG := sync.WaitGroup{}
var err1List = make([]error, len(l))
for index := range l {
innerIndex := index
waitG.Add(1)
err := go func() error {
defer waitG.Done()
err1 := bus.invoke(l[innerIndex], ctx, e, executeModeAsyncWait)
err1List[innerIndex] = err1
return err1
}()
err1List[index] = err
}
waitG.Wait()
errList = append(errList, err1List...)
}
return errors.NewAggregate(errList)
}
这种模式是 golang 中的常见并发模式,完成这样的目标:有一组任务,给定指定的并发度,把这批任务完成并返回。
下面是这个模式的核心实现,注意使用的时候,甚至不需要加锁。
type DoWorkPieceFunc func(piece int)
func Parallelize(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
var stop <-chan struct{}
if ctx != nil {
stop = ctx.Done()
}
toProcess := make(chan int, pieces)
for i := 0; i < pieces; i++ {
toProcess <- i
}
close(toProcess)
if pieces < workers {
workers = pieces
}
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
for piece := range toProcess {
select {
case <-stop:
return
default:
doWorkPiece(piece)
}
}
}()
}
wg.Wait()
}
# 使用无须加锁
var tasks []task
var results = make(result, len(tasks)
Parallelize(context.Background(), 5, len(tasks), func(piece int){
results[piece] = do_some_work_with_task(tasks[piece])
})
比如我们有一个复杂的可配置的 client,他有很多可选的参数,那我们在设计 client 的 新建方法的时候通常有有两种方式,一种是 Options 模式,如下:
type Options struct {
A int
B int
Flags int
}
type Option func(*Options)
func OptionA(a int) Option {
return func(args *Options) {
args.A = a
}
}
func OptionB(b int) Option {
return func(args *Options) {
args.B = b
}
}
type Client struct{
o *Options
}
func NewClient(options...Option) *Client{
c := &Client{}
for _, option := range options{
option(c.o)
}
return c
}
# 使用
client := NewClient(OptionA(1), OptionB(2)
另一种模式则是 Builder 模式,使用链式的方法构建 client,这里不必有 build 方法
func NewClient() *Client{
return &Client{}
}
func (c *Client)WithA(a int) *Client{
c.o.A = a
return c
}
func (c *Client)WithB(b int) *Client{
c.o.B = b
return c
}
# 使用
client = NewClient().WithA(1).WithB(2)
golang 中没有严格的可选参数,但是可以通过可变参数变相的实现可选参数,比如下面一个例子,我们把 A, B 作为可选参数,并且 A, B 严格按照先后顺序,那么我们就可以这样做:
func NewClient(options...int) *Client{
c := &Client{}
if len(options) >= 1{
c.A = options[0]
}
if len(options) >= 2{
c.B = options[1]
}
}
# 使用
client := NewClient()
client := NewClient(1)
client := NewClient(1, 2)
context 是 golang 种非常常用的一种模式,详细可以参考我这篇文章:golang context实战; 这里我们讲另一种 context 的更复杂的使用方式:bigcontext
,这种模式的方式为:把与 context 所有生命周期相关的东西都放在这个 context 里面,成为一个 big context【他本身因为包含了 context.Context, 所以他也是 context Interface 的一个实现】,这种模式尤其在实现网络服务的时候会很方便,一些常用得信息都可以在 context 中获取,并且大部分时候并不需要使用锁。比如下面的一个实现:
// bContext wraps context and provides additional functionalities
type bContext struct {
context.Context
tracer tracer.Tracer // tracer: optional, use global tracer if not set
wait *sync.WaitGroup // wait shared between child and parent
pool GoPool // pool: optional
cache map[CacheType]Cache // cache should be thread safe
err error // go routine error
db *gorm.DB // gorm db
}
type CacheType = string
var CacheNotFoundErr = errors.New("cache not found")
type defaultPool struct {
}
func (p *defaultPool) Submit(task func()) error {
go func() {
defer runtime.HandleCrash()
task()
}()
return nil
}
// NewBContext init a bContext with given ctx
func NewBContext(ctx context.Context) BContext {
return &bContext{
Context: ctx,
wait: &sync.WaitGroup{},
tracer: tracer.GlobalTracer(),
cache: map[string]Cache{},
pool: &defaultPool{},
}
}
// NewContextWithBContext will inherit old bContext's pool/tracer/db/wait..
func NewContextWithBContext(ctx context.Context, c *bContext) BContext {
return &bContext{
Context: ctx,
pool: c.pool,
tracer: c.tracer,
wait: c.wait,
cache: c.cache,
db: c.db,
}
}
func (ctx *bContext) WithCache(cacheType CacheType, cache Cache) BContext {
ctx.cache[cacheType] = cache
return ctx
}
// refer: https://gorm.io/zh_CN/docs/context.html#Chi-%E4%B8%AD%E9%97%B4%E4%BB%B6%E7%A4%BA%E4%BE%8B
func (ctx *bContext) WithDb(db *gorm.DB) BContext {
ctx.db = db
return ctx
}
func (ctx *bContext) WithTracer(t tracer.Tracer) BContext {
ctx.tracer = t
return ctx
}
func (ctx *bContext) WithGoPool(p GoPool) BContext {
ctx.pool = p
return ctx
}
func (ctx *bContext) WithValue(k, v interface{}) BContext {
return NewContextWithBContext(context.WithValue(ctx.Context, k, v), ctx)
}
func (ctx *bContext) Go(fn func(ctx BContext) error) {
ctx.wait.Add(1)
err := ctx.pool.Submit(func() {
defer ctx.wait.Done()
if err1 := fn(ctx); err1 != nil {
}
})
if err != nil {
ctx.wait.Done()
}
}
func (ctx *bContext) Wait() {
ctx.wait.Wait()
}
func (ctx *bContext) Transaction(fc func(ctx BContext, tx *gorm.DB) error) (err error) {
return ctx.TransactionWithDb(ctx.db, fc)
}
func (ctx *bContext) Branch(branch string) BContext {
_, newCtx := ctx.tracer.StartServerSpan(ctx.Context, branch)
return NewContextWithBContext(newCtx, ctx)
}
func (ctx *bContext) BranchWrapped(branch string, fn func(BContext) error) error {
child := ctx.Branch(branch)
defer child.Span().Finish()
if err := fn(child); err != nil {
child.Span().LogFields(log.String("message", err.Error()))
return err
}
return nil
}
// Get span from current context
func (ctx *bContext) Span() tracer.Span {
span := tracer.GetSpanFromContext(ctx)
if span == nil {
span, _ = tracer.StartServerSpan(ctx, "default")
}
return span
}
// SetWithCache inserts or updates the specified key-value pair.
func (ctx *bContext) SetWithCache(cacheType CacheType, key, value interface{}) error {
if cache, ok := ctx.cache[cacheType]; !ok {
return CacheNotFoundErr
} else {
return cache.Set(key, value)
}
}
// GetWithCache returns the value for the specified key if it is present in the cache.
func (ctx *bContext) GetWithCache(cacheType CacheType, key interface{}) (interface{}, error) {
if cache, ok := ctx.cache[cacheType]; !ok {
return nil, CacheNotFoundErr
} else {
return cache.Get(key)
}
}
golang 里面没有方便的依赖注入,这里可以参考本人做的一个实现, 这个实现不同于 wire 之类的实现,特点是不用生成代码,完全运行时处理,同时实现极简,但是能满足大部分得需求。 具体的用法如下:
类型容器,支持依赖倒置、自动注入, 支持以下几种用法
// 1. 注册 type + new 函数, provide 返回新建实例
type ExampleA struct {
d map[string]string
}
func (a *ExampleA) SayHello() {
fmt.Printf("say hello from: %+v\n", a)
}
type SayHelloInterface interface {
SayHello()
}
func TestContainerExample1(t *testing.T) {
c := newThreadSafeContainer()
c.RegisterType(&ExampleA{}, func() (interface{}, error) {
t.Logf("ExampleA's new function called")
return &ExampleA{d: map[string]string{"1": "2"}}, nil
})
ret, err := c.Provide(&ExampleA{})
assert.Nil(t, err)
assert.Equal(t, map[string]string{"1": "2"}, ret.(*ExampleA).d)
}
// 2.注册单例 type + new 函数, provide 返回单例
func TestContainerExample2(t *testing.T) {
c := newThreadSafeContainer()
c.RegisterType(&ExampleA{}, func() (interface{}, error) {
t.Logf("ExampleA's new function called")
return &ExampleA{d: map[string]string{"1": "2"}}, nil
})
ret, err := c.Provide(&ExampleA{})
assert.Nil(t, err)
assert.Equal(t, map[string]string{"1": "2"}, ret.(*ExampleA).d)
}
// 3. 注册单例 object, provide 返回单例
func TestContainerExample3(t *testing.T) {
c := newThreadSafeContainer()
exampleA := &ExampleA{d: map[string]string{"1": rand.String(10)}}
c.RegisterObjectSingleton(exampleA)
ret, err := c.Provide(&ExampleA{})
assert.Nil(t, err)
ret2, err := c.Provide(&ExampleA{})
assert.Nil(t, err)
assert.Equal(t, ret, ret2)
}
// 4. 注册单例 type + new 函数, provide 的时候用 interface 获取单例
func TestContainerExample4(t *testing.T) {
c := newThreadSafeContainer()
c.RegisterType(&ExampleA{}, func() (interface{}, error) {
t.Logf("ExampleA's new function called")
return &ExampleA{d: map[string]string{"1": "2"}}, nil
})
ret, err := c.Provide((*SayHelloInterface)(nil))
assert.Nil(t, err)
assert.Equal(t, map[string]string{"1": "2"}, ret.(*ExampleA).d)
ret2, err := c.Provide(reflect.TypeOf((*SayHelloInterface)(nil)).Elem())
assert.Nil(t, err)
assert.Equal(t, ret, ret2)
}
// 5. 注册单例 type + new 函数, provide 的时候用 interface 获取单例, 同时使用自动注入字段
type ExampleB struct {
d map[string]string
exampleA1 *ExampleA `autowired:"true"`
exampleA2 SayHelloInterface `autowired:"true"`
}
func (a *ExampleB) Foo() {
fmt.Printf("foo from: %+v\n", a)
}
type FooInterface interface {
Foo()
}
func TestContainerExample5(t *testing.T) {
c := newThreadSafeContainer()
c.RegisterTypeSingleton(&ExampleA{}, func() (interface{}, error) {
t.Logf("ExampleA's new function called")
return &ExampleA{d: map[string]string{"1": rand.String(5)}}, nil
})
c.RegisterTypeSingleton(&ExampleB{}, func() (interface{}, error) {
t.Logf("ExampleB's new function called")
return &ExampleB{d: map[string]string{"3": "4"}}, nil
})
ret, err := c.Provide((*FooInterface)(nil))
assert.Nil(t, err)
assert.Equal(t, map[string]string{"3": "4"}, ret.(*ExampleB).d)
assert.Equal(t, ret.(*ExampleB).exampleA1, ret.(*ExampleB).exampleA2)
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。