前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >milvus对象存储和消息中间件设计模式分析

milvus对象存储和消息中间件设计模式分析

原创
作者头像
melodyshu
发布2024-04-23 18:24:00
1170
发布2024-04-23 18:24:00
举报
文章被收录于专栏:milvus数据库milvus数据库

milvus对象存储和消息中间件的工厂设计模式分析

需求

根据参数设置创建mq和storage

mq有kafka,pulsar

storage有local,minio,remote

配置文件

根据配置文件选择初始化mq和存储:

代码语言:yaml
复制
mq:
  type: pulsar
  
common:
  storageType: minio

对于这种类型一个是mq,一个是存储,相比工厂方法设计模式,使用抽象工厂设计模式更合理。

代码框架

工厂接口

代码路径:internal\util\dependency\factory.go

代码语言:go
复制
type Factory interface {
	msgstream.Factory
    // Init()给工厂传递参数。
	Init(p *paramtable.ComponentParam)
	NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}

// pkg\mq\msgstream\msgstream.go
// msgstream.Factory的code
type Factory interface {
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewTtMsgStream(ctx context.Context) (MsgStream, error)
	NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

dependency.Factory是一个工厂接口,里面包含了mq的工厂接口,和创建持久对象的方法。

这个接口创建消息中间件对象和持久存储对象。

这里为什么不这么写:

代码语言:go
复制
type Factory interface {
    Init(p *paramtable.ComponentParam)
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}

DefaultFactory

DefaultFactory结构体是dependency.Factory的实现。

代码语言:go
复制
// DefaultFactory is a factory that produces instances of storage.ChunkManager and message queue.
// internal\util\dependency\factory.go
type DefaultFactory struct {
	standAlone          bool
	chunkManagerFactory storage.Factory
	msgStreamFactory    msgstream.Factory
}

// storage.Factory
// internal\storage\factory.go
type Factory interface {
	NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error)
}

// msgstream.Factory
// pkg\mq\msgstream\msgstream.go
type Factory interface {
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewTtMsgStream(ctx context.Context) (MsgStream, error)
	NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

DefaultFactory实现了dependency.Factory接口的Init()函数。

在Init()函数内初始化了chunkManagerFactory、msgStreamFactory。

代码语言:go
复制
func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {
	// skip if using default factory
	if f.msgStreamFactory != nil {
		return
	}
    // 初始化chunkManagerFactory
	f.chunkManagerFactory = storage.NewChunkManagerFactoryWithParam(params)

	// initialize mq client or embedded mq.
    // 初始化msgStreamFactory
	if err := f.initMQ(f.standAlone, params); err != nil {
		panic(err)
	}
}

f.chunkManagerFactory:

代码语言:go
复制
return &ChunkManagerFactory{
		persistentStorage: persistentStorage,
		config:            c,
	}

f.msgStreamFactory:

代码语言:go
复制
func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error {
	mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable()})
	log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType))

	switch mqType {
	case mqTypeNatsmq:
		f.msgStreamFactory = msgstream.NewNatsmqFactory()
	case mqTypeRocksmq:
		f.msgStreamFactory = smsgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), &params.ServiceParam)
	case mqTypePulsar:
		f.msgStreamFactory = msgstream.NewPmsFactory(&params.ServiceParam)
	case mqTypeKafka:
		f.msgStreamFactory = msgstream.NewKmsFactory(&params.ServiceParam)
	}
	if f.msgStreamFactory == nil {
		return errors.New("failed to create MQ: check the milvus log for initialization failures")
	}
	return nil
}

持久存储

storage.Factory是创建持久存储的工厂接口。

storage.ChunkManagerFactory是storage.Factory的实现。

NewPersistentStorageChunkManager()接口的实现:

代码语言:go
复制
func (f *DefaultFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {
	return f.chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
}

func (f *ChunkManagerFactory) NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error) {
	return f.newChunkManager(ctx, f.persistentStorage)
}

func (f *ChunkManagerFactory) newChunkManager(ctx context.Context, engine string) (ChunkManager, error) {
	switch engine {
	case "local":
		return NewLocalChunkManager(RootPath(f.config.rootPath)), nil
	case "minio":
		return newMinioChunkManagerWithConfig(ctx, f.config)
	case "remote":
		return NewRemoteChunkManager(ctx, f.config)
	default:
		return nil, errors.New("no chunk manager implemented with engine: " + engine)
	}
}

根据传入的engine新建对应的持久存储对象。

LocalChunkManager、MinioChunkManager、RemoteChunkManager。

代码语言:go
复制
// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {
	localPath string
}

// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {
	*minio.Client
	bucketName string
	rootPath   string
}

// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {
	client ObjectStorage
	bucketName string
	rootPath   string
}

消息中间件

msgstream.Factory是创建mq的工厂接口。

工厂接口:

代码语言:go
复制
// pkg\mq\msgstream\msgstream.go
type Factory interface {
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewTtMsgStream(ctx context.Context) (MsgStream, error)
	NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

实现有:

CommonFactory、KmsFactory、PmsFactory

代码语言:go
复制
// CommonFactory is a Factory for creating message streams with common logic.
// It contains a function field named newer, which is a function that creates
// an mqwrapper.Client when called.
// pkg\mq\msgstream\common_mq_factory.go
type CommonFactory struct {
	Newer             func(context.Context) (mqwrapper.Client, error) // client constructor
	DispatcherFactory ProtoUDFactory
	ReceiveBufSize    int64
	MQBufSize         int64
}

// pkg\mq\msgstream\mq_factory.go
// kafka工厂
type KmsFactory struct {
	dispatcherFactory ProtoUDFactory
	config            *paramtable.KafkaConfig
	ReceiveBufSize    int64
	MQBufSize         int64
}

// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
// pkg\mq\msgstream\mq_factory.go
// pulsar工厂
type PmsFactory struct {
	dispatcherFactory ProtoUDFactory
	// the following members must be public, so that mapstructure.Decode() can access them
	PulsarAddress    string
	PulsarWebAddress string
	ReceiveBufSize   int64
	MQBufSize        int64
	PulsarAuthPlugin string
	PulsarAuthParams string
	PulsarTenant     string
	PulsarNameSpace  string
	RequestTimeout   time.Duration
	metricRegisterer prometheus.Registerer
}

mq产品

mq的产品接口是msgstream.MsgStream

代码语言:go
复制
// MsgStream is an interface that can be used to produce and consume message on message queue
type MsgStream interface {
	Close()

	AsProducer(channels []string)
	Produce(*MsgPack) error
	SetRepackFunc(repackFunc RepackFunc)
	GetProduceChannels() []string
	Broadcast(*MsgPack) (map[string][]MessageID, error)

	AsConsumer(ctx context.Context, channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) error
	Chan() <-chan *MsgPack
	Seek(ctx context.Context, offset []*MsgPosition) error

	GetLatestMsgID(channel string) (MessageID, error)
	CheckTopicValid(channel string) error

	EnableProduce(can bool)
}

具体产品实现有:

msgstream.mqMsgStream、msgstream.MqTtMsgStream

代码语言:go
复制
type mqMsgStream struct {
	ctx              context.Context
	client           mqwrapper.Client
	producers        map[string]mqwrapper.Producer
	producerChannels []string
	consumers        map[string]mqwrapper.Consumer
	consumerChannels []string

	repackFunc    RepackFunc
	unmarshal     UnmarshalDispatcher
	receiveBuf    chan *MsgPack
	closeRWMutex  *sync.RWMutex
	streamCancel  func()
	bufSize       int64
	producerLock  *sync.RWMutex
	consumerLock  *sync.Mutex
	closed        int32
	onceChan      sync.Once
	enableProduce atomic.Value
}

// MqTtMsgStream is a msgstream that contains timeticks
type MqTtMsgStream struct {
	*mqMsgStream
	chanMsgBuf         map[mqwrapper.Consumer][]TsMsg
	chanMsgPos         map[mqwrapper.Consumer]*msgpb.MsgPosition
	chanStopChan       map[mqwrapper.Consumer]chan bool
	chanTtMsgTime      map[mqwrapper.Consumer]Timestamp
	chanMsgBufMutex    *sync.Mutex
	chanTtMsgTimeMutex *sync.RWMutex
	chanWaitGroup      *sync.WaitGroup
	lastTimeStamp      Timestamp
	syncConsumer       chan int
}

存储产品

存储的产品接口是storag.ChunkManagere

代码语言:go
复制
// ChunkManager is to manager chunks.
// Include Read, Write, Remove chunks.
type ChunkManager interface {
	// RootPath returns current root path.
	RootPath() string
	// Path returns path of @filePath.
	Path(ctx context.Context, filePath string) (string, error)
	// Size returns path of @filePath.
	Size(ctx context.Context, filePath string) (int64, error)
	// Write writes @content to @filePath.
	Write(ctx context.Context, filePath string, content []byte) error
	// MultiWrite writes multi @content to @filePath.
	MultiWrite(ctx context.Context, contents map[string][]byte) error
	// Exist returns true if @filePath exists.
	Exist(ctx context.Context, filePath string) (bool, error)
	// Read reads @filePath and returns content.
	Read(ctx context.Context, filePath string) ([]byte, error)
	// Reader return a reader for @filePath
	Reader(ctx context.Context, filePath string) (FileReader, error)
	// MultiRead reads @filePath and returns content.
	MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)
	ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error)
	// ReadWithPrefix reads files with same @prefix and returns contents.
	ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error)
	Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)
	// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.
	// if all bytes are read, @err is io.EOF.
	// return other error if read failed.
	ReadAt(ctx context.Context, filePath string, off int64, length int64) (p []byte, err error)
	// Remove delete @filePath.
	Remove(ctx context.Context, filePath string) error
	// MultiRemove delete @filePaths.
	MultiRemove(ctx context.Context, filePaths []string) error
	// RemoveWithPrefix remove files with same @prefix.
	RemoveWithPrefix(ctx context.Context, prefix string) error
}

具体产品实现有:

LocalChunkManager、MinioChunkManager、RemoteChunkManager

代码语言:go
复制
// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {
	localPath string
}

// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {
	*minio.Client

	//	ctx        context.Context
	bucketName string
	rootPath   string
}

// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {
	client ObjectStorage

	//	ctx        context.Context
	bucketName string
	rootPath   string
}

总结

从代码框架可以看出每一种mq都有一个工厂,存储只有一个工厂

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • milvus对象存储和消息中间件的工厂设计模式分析
    • 需求
      • 配置文件
        • 代码框架
          • 工厂接口
            • DefaultFactory
              • 持久存储
                • 消息中间件
                  • mq产品
                    • 存储产品
                      • 总结
                      相关产品与服务
                      向量数据库
                      腾讯云向量数据库(Tencent Cloud VectorDB)是一款全托管的自研企业级分布式数据库服务,专用于存储、检索、分析多维向量数据。该数据库支持多种索引类型和相似度计算方法,单索引支持千亿级向量规模,可支持百万级 QPS 及毫秒级查询延迟。腾讯云向量数据库不仅能为大模型提供外部知识库,提高大模型回答的准确性,还可广泛应用于推荐系统、自然语言处理等 AI 领域。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档