前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang框架实战-KisFlow流式计算框架(6)-Connector

Golang框架实战-KisFlow流式计算框架(6)-Connector

原创
作者头像
刘丹冰Aceld
发布2024-06-06 11:01:15
720
发布2024-06-06 11:01:15
举报

KisFlow源代码:https://github.com/aceld/kis-flow

本章将设计KisFlow的Connector模块,期功能及作用主要为挂载在某个Function下,执行第三方存储引擎的逻辑。

5.1 Connector定义

KisFlow中提供Connector,来给开发者定义第三方存储引擎的自定义读写插件模式。如果数据流的数据需要临时从某引擎去读或者需要存储到某个存储引擎中,可以通过Connector来编写相对应的读写逻辑,并且通过配置,自定义挂载在Flow中的某个Function中。Connector也是灵活配置的。这样具有相同逻辑的存储算法可以在多个Function中进行复用。

5.1.1 Connector的抽象层定义

kis-flow/kis/创建connector.go文件,这里用来定义Connector的抽象接口,如下:

kis-flow/kis/connector.go

代码语言:go
复制
package kis

import (
	"context"
	"kis-flow/config"
)

type Connector interface {
	// Init 初始化Connector所关联的存储引擎链接等
	Init() error
	// Call 调用Connector 外挂存储逻辑的读写操作
	Call(ctx context.Context, flow Flow, args interface{}) error
	// GetId 获取Connector的ID
	GetId() string
	// GetName 获取Connector的名称
	GetName() string
	// GetConfig 获取Connector的配置信息
	GetConfig() *config.KisConnConfig
}

Connector 目前阶段提供的主要接口有两个:

  • Init() 主要为当前Connector所关联的第三方存储引擎的初始化逻辑,如创建链接登操作,Init在Connector实例的生命周期只会被执行一次。
  • Call() 主要为Connector的调度入口,相关存储的读写自定义逻辑是通过Call()方法来触发调度,具体的回调函数原型在Router模块定义。

5.1.2 Connector相关路由成员类型定义

通过上述的接口,我们得知,一个Connector实例要配置两个自定义方法,一个通过Init() 接口来调用,一个通过Call() 接口来调用。下面就需要对这两种回调原型做定义。

(1) Connector Init

kis-flow/kis/router.go

代码语言:go
复制
/*
	Connector Init
*/
// ConnInit Connector 第三方挂载存储初始化
type ConnInit func(conn Connector) error

// connInitRouter
//key:
type connInitRouter map[string]ConnInit
  • ConnInit为初始化回调函数原型,参数为当前Connector实例指针。
  • connInitRouter为管理ConnInit的路由,key为ConnName。

(2)Connector Call

kis-flow/kis/router.go

代码语言:go
复制
/*
	Connector Call
*/
// CaaS Connector的存储读取业务实现
type CaaS func(context.Context, Connector, Function, Flow, interface{}) error

// connFuncRouter 通过FunctionName索引到CaaS回调存储业务的映射关系
// key: Function Name
// value: Connector的存储读取业务实现
type connFuncRouter map[string]CaaS
  • CaaS为Connector执行存储读写逻辑的自定义回调函数原型,参数为Connector、Function、Flow指针,开发者可以通过这三个实例得到业务想要的一些参数。最后一个参数为自定义参数,在Fcuntion调度Connector的时候,可以由开发者自定义输入。
  • connFuncRouter为管理CaaS的路由,注意:key为当前Connector所挂载的Function Name。所以在调度Connector Call必须是由Function进行调度。

这里由于Connector只有Function位Save或者Load模式才能够挂载,为了今后方便统计或者索引,还需要针对connFuncRouter进行分组,分成Save组和Load组。下面针对这两个组进行成员映射关系类型的定义,如下:

kis-flow/kis/router.go

代码语言:go
复制
// connSL 通过KisMode 将connFuncRouter分为两个子树
// key: Function KisMode S/L
// value: NsConnRouter
type connSL map[common.KisMode]connFuncRouter

// connTree
// key: Connector Name
// value: connSL 二级树
type connTree map[string]connSL
  • connSL是根据KisMode分的组,成员为之前的Connector的Call路由器。
  • connTree是通过Connector Name进行索引的映射管理,通过Connector Name+Function Mode + Function Name可以确定调度的Connector Call函数。

5.2 Connector路由管理

上一节对Connector需要的路由管理类型做了定义,接下来需要将这些路由的添加与调度进行管理。

Connector的路由管理和Function一样,统一被KisPool模块进行管理。

5.2.1 KisPool新增Connector相关路由管理成员

(1) 添加成员

kis-flow/kis/pool.go

代码语言:go
复制
//  kisPool 用于管理全部的Function和Flow配置的池子
type kisPool struct {
	fnRouter funcRouter   // 全部的Function管理路由
	fnLock   sync.RWMutex // fnRouter 锁

	flowRouter flowRouter   // 全部的flow对象
	flowLock   sync.RWMutex // flowRouter 锁

    // +++++++++++++++++ 
	cInitRouter connInitRouter // 全部的Connector初始化路由
	ciLock      sync.RWMutex   // cInitRouter 锁

	cTree      connTree             //全部Connector管理路由
	connectors map[string]Connector // 全部的Connector对象
	cLock      sync.RWMutex         // cTree 锁
    // +++++++++++++++++ 
}
(2) 相关Map变量初始化

kis-flow/kis/pool.go

代码语言:go
复制
// Pool 单例构造
func Pool() *kisPool {
	_poolOnce.Do(func() {
		//创建kisPool对象
		_pool = new(kisPool)

		// fnRouter初始化
		_pool.fnRouter = make(funcRouter)

		// flowRouter初始化
		_pool.flowRouter = make(flowRouter)

        // +++++++++++++++++++++++++
		// connTree初始化
		_pool.cTree = make(connTree)
		_pool.cInitRouter = make(connInitRouter)
		_pool.connectors = make(map[string]Connector)
        // +++++++++++++++++++++++++
	})

	return _pool
}
(3) 注册Connector Init方法

kis-flow/kis/pool.go

代码语言:go
复制
// CaaSInit 注册Connector初始化业务
func (pool *kisPool) CaaSInit(cname string, c ConnInit) {
	pool.ciLock.Lock() // 写锁
	defer pool.ciLock.Unlock()

	if _, ok := pool.cInitRouter[cname]; !ok {
		pool.cInitRouter[cname] = c
	} else {
		errString := fmt.Sprintf("KisPool Reg CaaSInit Repeat CName=%s\n", cname)
		panic(errString)
	}

	log.Logger().InfoF("Add KisPool CaaSInit CName=%s", cname)
}
(4) 执行Connector Init方法

kis-flow/kis/pool.go

代码语言:go
复制
// CallConnInit 调度 ConnInit
func (pool *kisPool) CallConnInit(conn Connector) error {
	pool.ciLock.RLock() // 读锁
	defer pool.ciLock.RUnlock()

	init, ok := pool.cInitRouter[conn.GetName()]

	if !ok {
		panic(errors.New(fmt.Sprintf("init connector cname = %s not reg..", conn.GetName())))
	}

	return init(conn)
}

逻辑很简单,先加锁保护,然后进行成员key/value键值对添加。这里由于是路由动作,添加失败则直接panic()退出进程。

(5) 注册Connector Call方法

kis-flow/kis/pool.go

代码语言:go
复制
// CaaS 注册Connector Call业务
func (pool *kisPool) CaaS(cname string, fname string, mode common.KisMode, c CaaS) {
	pool.cLock.Lock() // 写锁
	defer pool.cLock.Unlock()

	if _, ok := pool.cTree[cname]; !ok {
		//cid 首次注册,不存在,创建二级树NsConnSL
		pool.cTree[cname] = make(connSL)

		//初始化各类型FunctionMode
		pool.cTree[cname][common.S] = make(connFuncRouter)
		pool.cTree[cname][common.L] = make(connFuncRouter)
	}

	if _, ok := pool.cTree[cname][mode][fname]; !ok {
		pool.cTree[cname][mode][fname] = c
	} else {
		errString := fmt.Sprintf("CaaS Repeat CName=%s, FName=%s, Mode =%s\n", cname, fname, mode)
		panic(errString)
	}

	log.Logger().InfoF("Add KisPool CaaS CName=%s, FName=%s, Mode =%s", cname, fname, mode)
}

KisPool的CaaS方法就是注册自己的Connector连接器的逻辑处理回调函数,注册的时候,会通过传递进来的参数将函数注册到相对应的路由分组中去。

(6) 执行Connector Call方法

kis-flow/kis/pool.go

代码语言:go
复制
// CallConnector 调度 Connector
func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) error {
	fn := flow.GetThisFunction()
	fnConf := fn.GetConfig()
	mode := common.KisMode(fnConf.FMode)

	if callback, ok := pool.cTree[conn.GetName()][mode][fnConf.FName]; ok {
		return callback(ctx, conn, fn, flow, args)
	}

	log.Logger().ErrorFX(ctx, "CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.\n", conn.GetName(), fnConf.FName, mode)

	return errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
}

CallConnector则是做通过ConnectorName、Function Mode、Function Name做索引找到对应的CaaS函数,并且执行。

5.3 KisConnector

接下来我们来按照Connector的抽象层,来定义且实现KisConnector模块。在kis-flow/conn/目录下创建kis_connector.go文件。

5.3.1 KisConnector 定义

kis-flow/conn/kis_connector.go

代码语言:go
复制
package conn

import (
	"context"
	"kis-flow/common"
	"kis-flow/config"
	"kis-flow/id"
	"kis-flow/kis"
	"sync"
)

type KisConnector struct {
	// Connector ID
	CId string
	// Connector Name
	CName string
	// Connector Config
	Conf *config.KisConnConfig

	// Connector Init
	onceInit sync.Once
}

KisConnector 除了标识实例的KisID 还有 名称外,还包含了当前KisConnector的配置信息KisConnConfig,这里面具有一个sync.Once成员,这个是用来控制Connector Init在生命周期只被执行一次的限定,稍后会提到。

5.3.1 KisConnector 构造方法

kis-flow/conn/kis_connector.go

代码语言:go
复制
// NewKisConnector 根据配置策略创建一个KisConnector
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
	conn := new(KisConnector)
	conn.CId = id.KisID(common.KisIdTypeConnnector)
	conn.CName = config.CName
	conn.Conf = config

	return conn
}

创建一个KisConnector实例是需要先有KisConnConfig的配置信息。

5.3.2 实现Connector接口

kis-flow/conn/kis_connector.go

代码语言:go
复制
// Init 初始化Connector所关联的存储引擎链接等
func (conn *KisConnector) Init() error {
	var err error

	//一个Connector只能执行初始化业务一次
	conn.onceInit.Do(func() {
		err = kis.Pool().CallConnInit(conn)
	})

	return err
}

// Call 调用Connector 外挂存储逻辑的读写操作
func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) error {
	if err := kis.Pool().CallConnector(ctx, flow, conn, args); err != nil {
		return err
	}

	return nil
}

func (conn *KisConnector) GetName() string {
	return conn.CName
}

func (conn *KisConnector) GetConfig() *config.KisConnConfig {
	return conn.Conf
}

func (conn *KisConnector) GetId() string {
	return conn.CId
}
  • Init()方法通过Once来限定调用次数,最终会通过KisPool来进行路由映射且调度。
  • Call()同样通过KisPool进行调度。

那么KisConnector的Init()和Call()在什么时候会被调用呢? 接下来我们需要实现KisConnConfig,将Connector的层级关系与Function和Flow进行关联。

5.4 KisConnConfig配置信息

NewKisConnector()形参为KisConnConfig,所以开发者在创建一个Connector实例,首先要先创建一个Connector的配置信息KisConnConfig。在V0.1版本中,我们已经实现了KisConnConfig模块的定义以及创建,代码如下:

kis-flow/config/kis_conn_config.go

代码语言:go
复制
package config

import (
	"errors"
	"fmt"
	"kis-flow/common"
)

// KisConnConfig KisConnector 策略配置
type KisConnConfig struct {
	//配置类型
	KisType string `yaml:"kistype"`
	//唯一描述标识
	CName string `yaml:"cname"`
	//基础存储媒介地址
	AddrString string `yaml:"addrs"`
	//存储媒介引擎类型"Mysql" "Redis" "Kafka"等
	Type common.KisConnType `yaml:"type"`
	//一次存储的标识:如Redis为Key名称、Mysql为Table名称,Kafka为Topic名称等
	Key string `yaml:"key"`
	//配置信息中的自定义参数
	Params map[string]string `yaml:"params"`
	//存储读取所绑定的NsFuncionID
	Load []string `yaml:"load"`
	Save []string `yaml:"save"`
}

// NewConnConfig 创建一个KisConnector策略配置对象, 用于描述一个KisConnector信息
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
	strategy := new(KisConnConfig)
	strategy.CName = cName
	strategy.AddrString = addr

	strategy.Type = t
	strategy.Key = key
	strategy.Params = param

	return strategy
}

// WithFunc Connector与Function进行关系绑定
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {

	switch common.KisMode(fConfig.FMode) {
	case common.S:
		cConfig.Save = append(cConfig.Save, fConfig.FName)
	case common.L:
		cConfig.Load = append(cConfig.Load, fConfig.FName)
	default:
		return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
	}

	return nil
}

5.4.1 KisFuncConfig 添加KisConnConfig成员

首先我们给KisFuncConfig 添加KisConnConfig成员。

kis-flow/config/kis_func_config.go

代码语言:go
复制
// KisFuncConfig 一个KisFunction策略配置
type KisFuncConfig struct {
	KisType  string        `yaml:"kistype"`
	FName    string        `yaml:"fname"`
	FMode    string        `yaml:"fmode"`
	Source   KisSource     `yaml:"source"`
	Option   KisFuncOption `yaml:"option"`
    // ++++++++++
	connConf *KisConnConfig
}

然后提供对该成员的添加和读取方法

kis-flow/config/kis_func_config.go

代码语言:go
复制
func (fConf *KisFuncConfig) AddConnConfig(cConf *KisConnConfig) error {
	if cConf == nil {
		return errors.New("KisConnConfig is nil")
	}

	// Function需要和Connector进行关联
	fConf.connConf = cConf

	// Connector需要和Function进行关联
	_ = cConf.WithFunc(fConf)

	return nil
}

func (fConf *KisFuncConfig) GetConnConfig() (*KisConnConfig, error) {
	if fConf.connConf == nil {
		return nil, errors.New("KisFuncConfig.connConf not set")
	}

	return fConf.connConf, nil
}

这样我们可以通过Funciton的配置信息就能够查到相关联的Connector配置信息。

5.5 Function/Flow与Connector关联

5.5.1 Function与Connector关联

kis-flow/kis/function.go

代码语言:go
复制
package kis

import (
	"context"
	"kis-flow/config"
)

// Function 流式计算基础计算模块,KisFunction是一条流式计算的基本计算逻辑单元,
// 			   任意个KisFunction可以组合成一个KisFlow
type Function interface {
	// Call 执行流式计算逻辑
	Call(ctx context.Context, flow Flow) error

	// SetConfig 给当前Function实例配置策略
	SetConfig(s *config.KisFuncConfig) error
	// GetConfig 获取当前Function实例配置策略
	GetConfig() *config.KisFuncConfig

	// SetFlow 给当前Function实例设置所依赖的Flow实例
	SetFlow(f Flow) error
	// GetFlow 获取当前Functioin实力所依赖的Flow
	GetFlow() Flow

    // ++++++++++++++++++++
	// AddConnector 给当前Function实例添加一个Connector
	AddConnector(conn Connector) error
	// GetConnector 获取当前Function实例所关联的Connector
	GetConnector() Connector
    // ++++++++++++++++++++

	// CreateId 给当前Funciton实力生成一个随机的实例KisID
	CreateId()
	// GetId 获取当前Function的FID
	GetId() string
	// GetPrevId 获取当前Function上一个Function节点FID
	GetPrevId() string
	// GetNextId 获取当前Function下一个Function节点FID
	GetNextId() string

	// Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil
	Next() Function
	// Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil
	Prev() Function
	// SetN 设置下一层Function实例
	SetN(f Function)
	// SetP 设置上一层Function实例
	SetP(f Function)
}

接下来我们在BaseFunction中实现:

kis-flow/function/kis_base_function.go

代码语言:go
复制
type BaseFunction struct {
	// Id , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象
	Id     string
	Config *config.KisFuncConfig

	// flow
	flow kis.Flow //上下文环境KisFlow

    // ++++++++++++++
	// connector
	connector kis.Connector
    // ++++++++++++++

	// link
	N kis.Function //下一个流计算Function
	P kis.Function //上一个流计算Function
}

// ........
// ........

// AddConnector 给当前Function实例添加一个Connector
func (base *BaseFunction) AddConnector(conn kis.Connector) error {
	if conn == nil {
		return errors.New("conn is nil")
	}

	base.connector = conn

	return nil
}

// GetConnector 获取当前Function实例所关联的Connector
func (base *BaseFunction) GetConnector() kis.Connector {
	return base.connector
}

这样一个Function实例就可以获取到Connector实例的信息。

5.5.2 Flow与Connector关联

同样,Flow中也需要获取到Connector的信息,这里面也需要将Flow和Connector的关系进行简单的关联。

kis-flow/kis/flow.go

代码语言:go
复制
package kis

import (
	"context"
	"kis-flow/common"
	"kis-flow/config"
)

type Flow interface {
	// Run 调度Flow,依次调度Flow中的Function并且执行
	Run(ctx context.Context) error
	// Link 将Flow中的Function按照配置文件中的配置进行连接
	Link(fConf *config.KisFuncConfig, fParams config.FParam) error
	// CommitRow 提交Flow数据到即将执行的Function层
	CommitRow(row interface{}) error
	// Input 得到flow当前执行Function的输入源数据
	Input() common.KisRowArr
	// GetName 得到Flow的名称
	GetName() string
	// GetThisFunction 得到当前正在执行的Function
	GetThisFunction() Function
	// GetThisFuncConf 得到当前正在执行的Function的配置
	GetThisFuncConf() *config.KisFuncConfig
	// GetConnector 得到当前正在执行的Function的Connector
    // +++++++++++++++++++++++++++++++++
	GetConnector() (Connector, error)
	// GetConnConf 得到当前正在执行的Function的Connector的配置
	GetConnConf() (*config.KisConnConfig, error)
    // +++++++++++++++++++++++++++++++++
}

新增 GetConnector()GetConnConf()接口,分别获取Connector实例和Connector配置。

之后,在KisFlow中实现这两个方法。

kis-flow/flow/kis_flow.go

代码语言:go
复制
// GetConnector 得到当前正在执行的Function的Connector
func (flow *KisFlow) GetConnector() (kis.Connector, error) {
	if conn := flow.ThisFunction.GetConnector(); conn != nil {
		return conn, nil
	} else {
		return nil, errors.New("GetConnector(): Connector is nil")
	}
}

// GetConnConf 得到当前正在执行的Function的Connector的配置
func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) {
	if conn := flow.ThisFunction.GetConnector(); conn != nil {
		return conn.GetConfig(), nil
	} else {
		return nil, errors.New("GetConnConf(): Connector is nil")
	}
}

实际上依然是通过当前Flow正在执行的Function来获取到Connector信息。

5.5.3 Function链接Connector

按照之前的配置文件定义,function的yaml配置文件如下:

代码语言:yaml
复制
nstype: func
fname: 测试Nsfunction_L1
fmode: Load
source:
  name: 被校验的测试数据源1-学生班级维度
  must:
    - stuid
    - classid
option:
  cname: 测试NsConnector_2

这里面有一个Option,其中有一个成员cname,如果当前Function配置了Connector,则需要在当前的Option中配置cname,并填写Connector的名称cname。

那么在当Flow去链接一个Funciton的时候,在Function的实例被创建之后,可以通过Function的配置得到是否携带Connector,如果携带则也要新建Connector实例,代码如下:

kis-flow/flow/kis_flow.go

代码语言:go
复制
// Link 将Function链接到Flow中
// fConf: 当前Function策略
// fParams: 当前Flow携带的Function动态参数
func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) error {
	// 创建Function实例
	f := function.NewKisFunction(flow, fConf)

    // ++++++++++++++++++++++++++++++
	if fConf.Option.CName != "" {
		// 当前Function有Connector关联,需要初始化Connector实例

		// 获取Connector配置
		connConfig, err := fConf.GetConnConfig()
		if err != nil {
			panic(err)
		}

		// 创建Connector对象
		connector := conn.NewKisConnector(connConfig)

		// 初始化Connector, 执行Connector Init 方法
		if err = connector.Init(); err != nil {
			panic(err)
		}

		// 关联Function实例和Connector实例关系
		_ = f.AddConnector(connector)
	}
    // ++++++++++++++++++++++++++++++

    
	// Flow 添加 Function
	if err := flow.appendFunc(f, fParams); err != nil {
		return err
	}

	return nil
}

这样一个Connector实例就被创建了。

5.6 KisConnector单元测试

接下来来对KisConnector做单元测试。

5.6.1 单元测试

创建kis-flow/test/kis_connector_test.go文件:

代码语言:go
复制
package test

import (
	"context"
	"kis-flow/common"
	"kis-flow/config"
	"kis-flow/flow"
	"kis-flow/kis"
	"kis-flow/test/caas"
	"kis-flow/test/faas"
	"testing"
)

func TestNewKisConnector(t *testing.T) {

	ctx := context.Background()

	// 0. 注册Function 回调业务
	kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
	kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
	kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

	// 0. 注册ConnectorInit 和 Connector 回调业务
	kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
	kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

	// 1. 创建3个KisFunction配置实例, 其中myFuncConfig2 有Connector配置
	source1 := config.KisSource{
		Name: "公众号抖音商城户订单数据",
		Must: []string{"order_id", "user_id"},
	}

	source2 := config.KisSource{
		Name: "用户订单错误率",
		Must: []string{"order_id", "user_id"},
	}

	myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
	if myFuncConfig1 == nil {
		panic("myFuncConfig1 is nil")
	}

	option := config.KisFuncOption{
		CName: "ConnName1",
	}

	myFuncConfig2 := config.NewFuncConfig("funcName2", common.S, &source2, &option)
	if myFuncConfig2 == nil {
		panic("myFuncConfig2 is nil")
	}

	myFuncConfig3 := config.NewFuncConfig("funcName3", common.E, &source2, nil)
	if myFuncConfig3 == nil {
		panic("myFuncConfig3 is nil")
	}

	// 2. 创建一个KisConnector配置实例
	myConnConfig1 := config.NewConnConfig("ConnName1", "0.0.0.0:9998", common.REDIS, "redis-key", nil)
	if myConnConfig1 == nil {
		panic("myConnConfig1 is nil")
	}

	// 3. 将KisConnector配置实例绑定到KisFunction配置实例上
	_ = myFuncConfig2.AddConnConfig(myConnConfig1)

	// 4. 创建一个 KisFlow 配置实例
	myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)

	// 5. 创建一个KisFlow对象
	flow1 := flow.NewKisFlow(myFlowConfig1)

	// 6. 拼接Functioin 到 Flow 上
	if err := flow1.Link(myFuncConfig1, nil); err != nil {
		panic(err)
	}
	if err := flow1.Link(myFuncConfig2, nil); err != nil {
		panic(err)
	}
	if err := flow1.Link(myFuncConfig3, nil); err != nil {
		panic(err)
	}

	// 7. 提交原始数据
	_ = flow1.CommitRow("This is Data1 from Test")
	_ = flow1.CommitRow("This is Data2 from Test")
	_ = flow1.CommitRow("This is Data3 from Test")

	// 8. 执行flow1
	if err := flow1.Run(ctx); err != nil {
		panic(err)
	}
}

注意这里面funcName2 为关联Connector的Function。 所以在创建Function2的Config的时候提供了Option信息,且提供了关联的Connector名称。

5.6.2 Function Callback 与 Conn CallBack

为了方便管理CallBack业务,我们在kis-flow/test/目录下,分别创建kis-flow/test/faas/kis-flow/test/caas/目录。

分别建立如下文件,每一个文件写一种自定义业务。

代码语言:bash
复制
├── caas
│   ├── caas_demo1.go
│   └── caas_init1.go
├── faas
│   ├── faas_demo1.go
│   ├── faas_demo2.go
│   └── faas_demo3.go
(1) FuncName1 的回调业务

kis-flow/test/faas/faas_demo1.go

代码语言:go
复制
package faas

import (
	"context"
	"fmt"
	"kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call funcName1Handler ----")

	for index, row := range flow.Input() {
		// 打印数据
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)

		// 计算结果数据
		resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

		// 提交结果数据
		_ = flow.CommitRow(resultStr)
	}

	return nil
}

这个作为我们第一个Function,打印数据,并且再生产一些数据。

(2) FuncName2 的回调业务

kis-flow/test/faas/faas_demo2.go

代码语言:go
复制
package faas

import (
	"context"
	"fmt"
	"kis-flow/kis"
	"kis-flow/log"
)

// type FaaS func(context.Context, Flow) error

func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call funcName2Handler ----")

	for index, row := range flow.Input() {
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)

		conn, err := flow.GetConnector()
		if err != nil {
			log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error())
			return err
		}

		if conn.Call(ctx, flow, row) != nil {
			log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
			return err
		}

		// 计算结果数据
		resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

		// 提交结果数据
		_ = flow.CommitRow(resultStr)
	}

	return nil
}

FuncName2 是一个关联Connector的业务。通过flow.GetConnector()可以得到Connector实例,然后可以通过执行conn.Call()来执行业务逻辑。

(3) FuncName3 的回调业务

kis-flow/test/faas/faas_demo3.go

代码语言:go
复制
package faas

import (
	"context"
	"fmt"
	"kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call funcName3Handler ----")

	for _, row := range flow.Input() {
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)
	}

	return nil
}
(4) ConnName1 Init方法

kis-flow/test/caas/caas_init1.go

代码语言:go
复制
package caas

import (
	"fmt"
	"kis-flow/kis"
)

// type ConnInit func(conn Connector) error

func InitConnDemo1(connector kis.Connector) error {
	fmt.Println("===> Call Connector InitDemo1")
	//config info
	connConf := connector.GetConfig()

	fmt.Println(connConf)

	// init connector , 如 初始化数据库链接等

	return nil
}
(5) ConnName1 的回调业务

kis-flow/test/caas/caas_demo1.go

代码语言:go
复制
package caas

import (
	"context"
	"fmt"
	"kis-flow/kis"
)

// type CaaS func(context.Context, Connector, Function, Flow, interface{}) error

func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
	fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
		flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)

	fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)

	return nil
}

5.6.3 运行

cd到kis-flow/test/下,执行命令:

代码语言:bash
复制
go test -test.v -test.paniconexit0 -test.run TestNewKisConnector

结果如下:

代码语言:bash
复制
=== RUN   TestNewKisConnector
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{ ConnName1 0.0.0.0:9998 redis redis-key map[] [] [funcName2]}
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionC, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c240 ThisFunctionId:func-f594da0e28da417db6b15ce9c9530f84 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionS, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c2a0 ThisFunctionId:func-f0b4bebf87614828a9375d888c54d13b PrevFunctionId:func-f594da0e28da417db6b15ce9c9530f84 funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}

---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f0b4bebf87614828a9375d888c54d13b:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionE, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c300 ThisFunctionId:func-66e2b0afa4e14d179aa94c357c412cf8 PrevFunctionId:func-f0b4bebf87614828a9375d888c54d13b funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f0b4bebf87614828a9375d888c54d13b:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]}

---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 2
--- PASS: TestNewKisConnector (0.00s)
PASS

经过仔细查看日志,得知Connector的Init被执行,请Connector也在执行FunctionName2期间也被同步执行且有日志输出,结果和我们的预期一致。

5.7 【V0.4】源代码

https://github.com/aceld/kis-flow/releases/tag/v0.4

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 5.1 Connector定义
  • 5.1.1 Connector的抽象层定义
  • 5.1.2 Connector相关路由成员类型定义
    • (1) Connector Init
      • (2)Connector Call
        • (1) 添加成员
        • (2) 相关Map变量初始化
        • (3) 注册Connector Init方法
        • (4) 执行Connector Init方法
        • (5) 注册Connector Call方法
        • (6) 执行Connector Call方法
    • 5.2 Connector路由管理
    • 5.2.1 KisPool新增Connector相关路由管理成员
    • 5.3 KisConnector
    • 5.3.1 KisConnector 定义
    • 5.3.1 KisConnector 构造方法
      • 5.3.2 实现Connector接口
        • (1) FuncName1 的回调业务
        • (2) FuncName2 的回调业务
        • (3) FuncName3 的回调业务
        • (4) ConnName1 Init方法
        • (5) ConnName1 的回调业务
    • 5.4 KisConnConfig配置信息
    • 5.4.1 KisFuncConfig 添加KisConnConfig成员
    • 5.5 Function/Flow与Connector关联
    • 5.5.1 Function与Connector关联
    • 5.5.2 Flow与Connector关联
    • 5.5.3 Function链接Connector
    • 5.6 KisConnector单元测试
    • 5.6.1 单元测试
    • 5.6.2 Function Callback 与 Conn CallBack
    • 5.6.3 运行
    • 5.7 【V0.4】源代码
    相关产品与服务
    流计算 Oceanus
    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档