前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数

Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数

原创
作者头像
刘丹冰Aceld
发布2024-07-18 11:38:53
1130
发布2024-07-18 11:38:53
举报
文章被收录于专栏:KisFlow-Golang流式计算框架

8.1 Flow Cache 数据流缓存

KisFlow也提供流式计算中的共享缓存,采用简单的本地缓存供开发者按需使用,有关本地缓存的第三方技术依赖选型: https://github.com/patrickmn/go-cache

8.1.1 go-cache

(1)安装

代码语言:bash
复制
go get github.com/patrickmn/go-cache

(2)使用

代码语言:go
复制
import (
	"fmt"
	"github.com/patrickmn/go-cache"
	"time"
)

func main() {
	// Create a cache with a default expiration time of 5 minutes, and which
	// purges expired items every 10 minutes
	c := cache.New(5*time.Minute, 10*time.Minute)

	// Set the value of the key "foo" to "bar", with the default expiration time
	c.Set("foo", "bar", cache.DefaultExpiration)

	// Set the value of the key "baz" to 42, with no expiration time
	// (the item won't be removed until it is re-set, or removed using
	// c.Delete("baz")
	c.Set("baz", 42, cache.NoExpiration)

	// Get the string associated with the key "foo" from the cache
	foo, found := c.Get("foo")
	if found {
		fmt.Println(foo)
	}

	// Since Go is statically typed, and cache values can be anything, type
	// assertion is needed when values are being passed to functions that don't
	// take arbitrary types, (i.e. interface{}). The simplest way to do this for
	// values which will only be used once--e.g. for passing to another
	// function--is:
	foo, found := c.Get("foo")
	if found {
		MyFunction(foo.(string))
	}

	// This gets tedious if the value is used several times in the same function.
	// You might do either of the following instead:
	if x, found := c.Get("foo"); found {
		foo := x.(string)
		// ...
	}
	// or
	var foo string
	if x, found := c.Get("foo"); found {
		foo = x.(string)
	}
	// ...
	// foo can then be passed around freely as a string

	// Want performance? Store pointers!
	c.Set("foo", &MyStruct, cache.DefaultExpiration)
	if x, found := c.Get("foo"); found {
		foo := x.(*MyStruct)
			// ...
	}
}

详细参考:https://github.com/patrickmn/go-cache

8.1.2 KisFlow集成go-cache能力

(1) Flow提供抽象层接口

在Flow中提供有关Cache的操作的接口,如下:

kis-flow/kis/flow.go

代码语言:go
复制
type Flow interface {
	// Run 调度Flow,依次调度Flow中的Function并且执行
	Run(ctx context.Context) error
	// Link 将Flow中的Function按照配置文件中的配置进行连接
	Link(fConf *config.KisFuncConfig, fParams config.FParam) error
	// CommitRow 提交Flow数据到即将执行的Function层
	CommitRow(row interface{}) error
	// Input 得到flow当前执行Function的输入源数据
	Input() common.KisRowArr
	// GetName 得到Flow的名称
	GetName() string
	// GetThisFunction 得到当前正在执行的Function
	GetThisFunction() Function
	// GetThisFuncConf 得到当前正在执行的Function的配置
	GetThisFuncConf() *config.KisFuncConfig
	// GetConnector 得到当前正在执行的Function的Connector
	GetConnector() (Connector, error)
	// GetConnConf 得到当前正在执行的Function的Connector的配置
	GetConnConf() (*config.KisConnConfig, error)
	// GetConfig 得到当前Flow的配置
	GetConfig() *config.KisFlowConfig
	// GetFuncConfigByName 得到当前Flow的配置
	GetFuncConfigByName(funcName string) *config.KisFuncConfig
	// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
	Next(acts ...ActionFunc) error

    // ++++++++++++++++++++++++++++++++++++++++
	// GetCacheData 得到当前Flow的缓存数据
	GetCacheData(key string) interface{}
	// SetCacheData 设置当前Flow的缓存数据
	SetCacheData(key string, value interface{}, Exp time.Duration)
}

SetCacheData()为设置本地缓存,Exp为超时时间,如果Exp为0,则为永久。

GetCacheData()为读取本地缓存。

(2)提供一些常量

提供有关缓存超时时间的一些常量。

kis-flow/common/const.go

代码语言:go
复制
// cache
const (
	// DeFaultFlowCacheCleanUp KisFlow中Flow对象Cache缓存默认的清理内存时间
	DeFaultFlowCacheCleanUp = 5 //单位 min
	// DefaultExpiration 默认GoCahce时间 ,永久保存
	DefaultExpiration time.Duration = 0
)
(3) KisFlow新增成员及初始化

kis-flow/flow/kis_flow.go

代码语言:go
复制
// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {

    // ... ...
    // ... ...

	// flow的本地缓存
	cache *cache.Cache // Flow流的临时缓存上线文环境
}

// NewKisFlow 创建一个KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
	flow := new(KisFlow)

    // ... ...
    // ... ...

	// 初始化本地缓存
	flow.cache = cache.New(cache.NoExpiration, common.DeFaultFlowCacheCleanUp*time.Minute)


	return flow
}
(4)实现接口

最后实现有关缓存读写操作的两个接口,代码如下:

kis-flow/flow/kis_flow_data.go

代码语言:go
复制
func (flow *KisFlow) GetCacheData(key string) interface{} {

	if data, found := flow.cache.Get(key); found {
		return data
	}

	return nil
}

func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Duration) {
	if Exp == common.DefaultExpiration {
		flow.cache.Set(key, value, cache.DefaultExpiration)
	} else {
		flow.cache.Set(key, value, Exp)
	}
}

8.2 MetaData 临时缓存参数

MetaData我们定义为Flow、Function、Connector每个层级都会提供一个map[string]interface{} 的结构来存放临时数据,这个数据的生命周期与各个实例的生命周期一致。

8.2.1 Flow添加MetaData

首先,KisFlow的成员新增metaData map[string]interface{}和对应的读写锁。

kis-flow/flow/kis_flow.go

代码语言:go
复制
// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
	// ... ...

    // ... ...

    // +++++++++++++++++++++++++++++++++++++++++++
	// flow的metaData
	metaData map[string]interface{} // Flow的自定义临时数据
	mLock    sync.RWMutex           // 管理metaData的读写锁
}

且在KisFlow的构造函数下对metaData成员进行内存初始化,如下:

kis-flow/flow/kis_flow.go

代码语言:go
复制
// NewKisFlow 创建一个KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
	flow := new(KisFlow)
    
	// ... ...
    // ... ...

    // ++++++++++++++++++++++++++++++++++++++
	// 初始化临时数据
	flow.metaData = make(map[string]interface{})

	return flow
}

之后,给Flow添加MetaData的读写接口,实现非常的简单,如下:

kis-flow/kis/flow.go

代码语言:go
复制
type Flow interface {
	// Run 调度Flow,依次调度Flow中的Function并且执行
	Run(ctx context.Context) error
	// Link 将Flow中的Function按照配置文件中的配置进行连接
	Link(fConf *config.KisFuncConfig, fParams config.FParam) error
	// CommitRow 提交Flow数据到即将执行的Function层
	CommitRow(row interface{}) error
	// Input 得到flow当前执行Function的输入源数据
	Input() common.KisRowArr
	// GetName 得到Flow的名称
	GetName() string
	// GetThisFunction 得到当前正在执行的Function
	GetThisFunction() Function
	// GetThisFuncConf 得到当前正在执行的Function的配置
	GetThisFuncConf() *config.KisFuncConfig
	// GetConnector 得到当前正在执行的Function的Connector
	GetConnector() (Connector, error)
	// GetConnConf 得到当前正在执行的Function的Connector的配置
	GetConnConf() (*config.KisConnConfig, error)
	// GetConfig 得到当前Flow的配置
	GetConfig() *config.KisFlowConfig
	// GetFuncConfigByName 得到当前Flow的配置
	GetFuncConfigByName(funcName string) *config.KisFuncConfig
	// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
	Next(acts ...ActionFunc) error
	// GetCacheData 得到当前Flow的缓存数据
	GetCacheData(key string) interface{}
	// SetCacheData 设置当前Flow的缓存数据
	SetCacheData(key string, value interface{}, Exp time.Duration)

    // ++++++++++++++++++++++++++++
	// GetMetaData 得到当前Flow的临时数据
	GetMetaData(key string) interface{}
	// SetMetaData 设置当前Flow的临时数据
	SetMetaData(key string, value interface{})
}

定义接口GetMetaData()SetMetaData(),分别作为读写接口。

最后来实现,如下:

kis-flow/flow/kis_flow_data.go

代码语言:go
复制
// GetMetaData 得到当前Flow对象的临时数据
func (flow *KisFlow) GetMetaData(key string) interface{} {
	flow.mLock.RLock()
	defer flow.mLock.RUnlock()

	data, ok := flow.metaData[key]
	if !ok {
		return nil
	}

	return data
}

// SetMetaData 设置当前Flow对象的临时数据
func (flow *KisFlow) SetMetaData(key string, value interface{}) {
	flow.mLock.Lock()
	defer flow.mLock.Unlock()

	flow.metaData[key] = value
}

8.2.2 Function 添加MetaData

首先在BaseFunciton中添加成员metaData,如下:

kis-flow/function/kis_base_funciton.go

代码语言:go
复制
type BaseFunction struct {
	// Id , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象
	Id     string
	Config *config.KisFuncConfig

	// flow
	flow kis.Flow //上下文环境KisFlow

	// connector
	connector kis.Connector

    // ++++++++++++++++++++++++
	// Function的自定义临时数据
	metaData map[string]interface{}
	// 管理metaData的读写锁
	mLock sync.RWMutex

	// link
	N kis.Function //下一个流计算Function
	P kis.Function //上一个流计算Function

在Funciton构造函数的地方,这里需要进行改进下,每个具体的Funciton都需要一个构造函数来初始化metaData成员,改动如下:

kis-flow/function/kis_base_function.go

代码语言:go
复制
func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
	var f kis.Function

	//工厂生产泛化对象
    // ++++++++++++++
	switch common.KisMode(config.FMode) {
	case common.V:
		f = NewKisFunctionV() // +++
	case common.S:
		f = NewKisFunctionS() // +++
	case common.L:
		f = NewKisFunctionL() // +++
	case common.C:
		f = NewKisFunctionC() // +++
	case common.E:
		f = NewKisFunctionE() // +++
	default:
		//LOG ERROR
		return nil
	}

	// 生成随机实例唯一ID
	f.CreateId()

	// 设置基础信息属性
	if err := f.SetConfig(config); err != nil {
		panic(err)
	}

	// 设置Flow
	if err := f.SetFlow(flow); err != nil {
		panic(err)
	}

	return f
}

其中每个构造函数如下:

kis-flow/function/kis_function_c.go

代码语言:go
复制
func NewKisFunctionC() kis.Function {
	f := new(KisFunctionC)

	// 初始化metaData
	f.metaData = make(map[string]interface{})

	return f
}

kis-flow/function/kis_function_v.go

代码语言:go
复制
func NewKisFunctionV() kis.Function {
	f := new(KisFunctionV)

	// 初始化metaData
	f.metaData = make(map[string]interface{})

	return f
}

kis-flow/function/kis_function_e.go

代码语言:go
复制
func NewKisFunctionE() kis.Function {
	f := new(KisFunctionE)

	// 初始化metaData
	f.metaData = make(map[string]interface{})

	return f
}

kis-flow/function/kis_function_s.go

代码语言:go
复制
func NewKisFunctionS() kis.Function {
	f := new(KisFunctionS)

	// 初始化metaData
	f.metaData = make(map[string]interface{})

	return f
}

kis-flow/function/kis_function_l.go

代码语言:go
复制
func NewKisFunctionL() kis.Function {
	f := new(KisFunctionL)

	// 初始化metaData
	f.metaData = make(map[string]interface{})

	return f
}

接下来,给Funciton抽象层,添加获取metaData成员的接口,如下:

kis-flow/kis/function.go

代码语言:go
复制
type Function interface {
	// Call 执行流式计算逻辑
	Call(ctx context.Context, flow Flow) error

	// SetConfig 给当前Function实例配置策略
	SetConfig(s *config.KisFuncConfig) error
	// GetConfig 获取当前Function实例配置策略
	GetConfig() *config.KisFuncConfig

	// SetFlow 给当前Function实例设置所依赖的Flow实例
	SetFlow(f Flow) error
	// GetFlow 获取当前Functioin实力所依赖的Flow
	GetFlow() Flow

	// AddConnector 给当前Function实例添加一个Connector
	AddConnector(conn Connector) error
	// GetConnector 获取当前Function实例所关联的Connector
	GetConnector() Connector

	// CreateId 给当前Funciton实力生成一个随机的实例KisID
	CreateId()
	// GetId 获取当前Function的FID
	GetId() string
	// GetPrevId 获取当前Function上一个Function节点FID
	GetPrevId() string
	// GetNextId 获取当前Function下一个Function节点FID
	GetNextId() string

	// Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil
	Next() Function
	// Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil
	Prev() Function
	// SetN 设置下一层Function实例
	SetN(f Function)
	// SetP 设置上一层Function实例
	SetP(f Function)

    // ++++++++++++++++++++++++++++++++++
	// GetMetaData 得到当前Function的临时数据
	GetMetaData(key string) interface{}
	// SetMetaData 设置当前Function的临时数据
	SetMetaData(key string, value interface{})
}

对上述新增的两个接口的实现,在BaseFunction中实现就可以了。

kis-flow/funciton/kis_base_function.go

代码语言:go
复制
// GetMetaData 得到当前Function的临时数据
func (base *BaseFunction) GetMetaData(key string) interface{} {
	base.mLock.RLock()
	defer base.mLock.RUnlock()

	data, ok := base.metaData[key]
	if !ok {
		return nil
	}

	return data
}

// SetMetaData 设置当前Function的临时数据
func (base *BaseFunction) SetMetaData(key string, value interface{}) {
	base.mLock.Lock()
	defer base.mLock.Unlock()

	base.metaData[key] = value
}

8.2.3 Connector添加MetaData

首先,给KisConnector添加metaData成员,如下:

kis-flow/conn/kis_connector.go

代码语言:go
复制
type KisConnector struct {
	// Connector ID
	CId string
	// Connector Name
	CName string
	// Connector Config
	Conf *config.KisConnConfig
	// Connector Init
	onceInit sync.Once
    
    // ++++++++++++++
	// KisConnector的自定义临时数据
	metaData map[string]interface{}
	// 管理metaData的读写锁
	mLock sync.RWMutex
}

// NewKisConnector 根据配置策略创建一个KisConnector
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
	conn := new(KisConnector)
	conn.CId = id.KisID(common.KisIdTypeConnnector)
	conn.CName = config.CName
	conn.Conf = config

    // +++++++++++++++++++++++++++++++++++
	conn.metaData = make(map[string]interface{})

	return conn
}

且在构造函数中进行对metaData的初始化。

其次,给Connector抽象层,提供获取和设置MetaData的接口,如下:

kis-flow/kis/connector.go

代码语言:go
复制
type Connector interface {
	// Init 初始化Connector所关联的存储引擎链接等
	Init() error
	// Call 调用Connector 外挂存储逻辑的读写操作
	Call(ctx context.Context, flow Flow, args interface{}) error
	// GetId 获取Connector的ID
	GetId() string
	// GetName 获取Connector的名称
	GetName() string
	// GetConfig 获取Connector的配置信息
	GetConfig() *config.KisConnConfig
	// GetMetaData 得到当前Connector的临时数据

    // +++++++++++++++++++++++++++++++
	GetMetaData(key string) interface{}
	// SetMetaData 设置当前Connector的临时数据
	SetMetaData(key string, value interface{})
}

最后在KisConnector实现上述两个接口,如下:

kis-flow/conn/kis_connector.go

代码语言:go
复制
// GetMetaData 得到当前Connector的临时数据
func (conn *KisConnector) GetMetaData(key string) interface{} {
	conn.mLock.RLock()
	defer conn.mLock.RUnlock()

	data, ok := conn.metaData[key]
	if !ok {
		return nil
	}

	return data
}

// SetMetaData 设置当前Connector的临时数据
func (conn *KisConnector) SetMetaData(key string, value interface{}) {
	conn.mLock.Lock()
	defer conn.mLock.Unlock()

	conn.metaData[key] = value
}

8.3 Params 配置文件参数

KisFlow提供了配置文件中,在配置Flow、Function、Connector等的默认携带参数:Params。

如下:

Function:

代码语言:yaml
复制
kistype: func
fname: funcName1
fmode: Verify
source:
  name: 公众号抖音商城户订单数据
  must:
    - order_id
    - user_id
option:
  default_params:
    default1: funcName1_param1
    default2: funcName1_param2

Flow:

代码语言:yaml
复制
kistype: flow
status: 1
flow_name: flowName1
flows:
  - fname: funcName1
    params:
      myKey1: flowValue1-1
      myKey2: flowValue1-2
  - fname: funcName2
    params:
      myKey1: flowValue2-1
      myKey2: flowValue2-2
  - fname: funcName3
    params:
      myKey1: flowValue3-1
      myKey2: flowValue3-2

Connector:

代码语言:yaml
复制
kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
  args1: value1
  args2: value2
load: null
save:
  - funcName2

这里面开发者均可以给定义的模块,提供Params,其中Flow提供的Params也会叠加到Function中去。

我们在之前构建Flow模块的时候,已经将这些参数读取进了每个模块的内存中,但是并没有给开发者暴露接口。

8.3.1 Flow添加Param读取接口

首先给Flow提供Param的查询接口:

kis-flow/kis/flow.go

代码语言:go
复制
type Flow interface {
	// ... ...
    // ... ...
    
	// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value
	GetFuncParam(key string) string
	// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value
	GetFuncParamAll() config.FParam
}

实现如下:

kis-flow/flow/kis_flow_data.go

代码语言:go
复制
// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value
func (flow *KisFlow) GetFuncParam(key string) string {
	flow.fplock.RLock()
	defer flow.fplock.RUnlock()

	if param, ok := flow.funcParams[flow.ThisFunctionId]; ok {
		if value, vok := param[key]; vok {
			return value
		}
	}

	return ""
}

// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value
func (flow *KisFlow) GetFuncParamAll() config.FParam {
	flow.fplock.RLock()
	defer flow.fplock.RUnlock()

	param, ok := flow.funcParams[flow.ThisFunctionId]
	if !ok {
		return nil
	}

	return param
}

GetFuncParam()GetFuncParamAll()分别为取出一个key,和取出全部的参数,但都是取出当前正在执行的Function的Params参数。

8.3.2 单元测试

我们这里给FlowName1中的每个Function添加一些参数。

kis-flow/test/load_conf/flow-FlowName1.yml

代码语言:yaml
复制
kistype: flow
status: 1
flow_name: flowName1
flows:
  - fname: funcName1
    params:
      myKey1: flowValue1-1
      myKey2: flowValue1-2
  - fname: funcName2
    params:
      myKey1: flowValue2-1
      myKey2: flowValue2-2
  - fname: funcName3
    params:
      myKey1: flowValue3-1
      myKey2: flowValue3-2

然后再分别给这里面关联的Function依次配置一些默认的自定义配置参数,如下:

kis-flow/test/load_conf/func/func-FuncName1.yml

代码语言:yaml
复制
kistype: func
fname: funcName1
fmode: Verify
source:
  name: 公众号抖音商城户订单数据
  must:
    - order_id
    - user_id
option:
  default_params:
    default1: funcName1_param1
    default2: funcName1_param2

kis-flow/test/load_conf/func/func-FuncName2.yml

代码语言:yaml
复制
kistype: func
fname: funcName2
fmode: Save
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id
option:
  cname: ConnName1
  default_params:
    default1: funcName2_param1
    default2: funcName2_param2

kis-flow/test/load_conf/func/func-FuncName3.yml

代码语言:yaml
复制
kistype: func
fname: funcName3
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id
option:
  default_params:
    default1: funcName3_param1
    default2: funcName3_param2

我们给FuncName2关联的Connector也配置一些Param参数,如下:

kis-flow/test/load_conf/conn/conn-ConnName1.yml

代码语言:yaml
复制
kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
  args1: value1
  args2: value2
load: null
save:
  - funcName2

最后,为了验证我们的配置参数可以在Function执行的过程中被准确的取出,我们依次改造了每个Funciton和Connector的业务函数,把各自Param打印出来,如下:

kis-flow/test/faas/faas_demo1.go

代码语言:go
复制
func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call funcName1Handler ----")
    
    // ++++++++++++++++
	fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

	for index, row := range flow.Input() {
		// 打印数据
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)

		// 计算结果数据
		resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

		// 提交结果数据
		_ = flow.CommitRow(resultStr)
	}

	return nil
}

kis-flow/test/faas/faas_demo2.go

代码语言:go
复制
func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call funcName2Handler ----")
    // ++++++++++++++++
	fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

	for index, row := range flow.Input() {
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)

		conn, err := flow.GetConnector()
		if err != nil {
			log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error())
			return err
		}

		if conn.Call(ctx, flow, row) != nil {
			log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
			return err
		}

		// 计算结果数据
		resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

		// 提交结果数据
		_ = flow.CommitRow(resultStr)
	}

	return nil
}

kis-flow/test/faas/faas_demo3.go

代码语言:go
复制
func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call funcName3Handler ----")
    // ++++
	fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

	for _, row := range flow.Input() {
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)
	}

	return nil
}

kis-flow/test/caas/caas_demo1.go

代码语言:go
复制
func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
	fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
		flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)

    // +++++++++++ 
	fmt.Printf("Params = %+v\n", conn.GetConfig().Params)

	fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)

	return nil
}

最后,我们来编写单元测试用例代码,如下:

kis-flow/test/kis_params_test.go

代码语言:go
复制
package test

import (
	"context"
	"kis-flow/common"
	"kis-flow/file"
	"kis-flow/kis"
	"kis-flow/test/caas"
	"kis-flow/test/faas"
	"testing"
)

func TestParams(t *testing.T) {
	ctx := context.Background()

	// 0. 注册Function 回调业务
	kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
	kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
	kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

	// 0. 注册ConnectorInit 和 Connector 回调业务
	kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
	kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

	// 1. 加载配置文件并构建Flow
	if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
		panic(err)
	}

	// 2. 获取Flow
	flow1 := kis.Pool().GetFlow("flowName1")

	// 3. 提交原始数据
	_ = flow1.CommitRow("This is Data1 from Test")
	_ = flow1.CommitRow("This is Data2 from Test")
	_ = flow1.CommitRow("This is Data3 from Test")

	// 4. 执行flow1
	if err := flow1.Run(ctx); err != nil {
		panic(err)
	}
}

cd到kis-flow/test/下,执行

代码语言:bash
复制
 go test -test.v -test.paniconexit0 -test.run  TestParams 

结果如下:

代码语言:bash
复制
=== RUN   TestParams
....
....

---> Call funcName1Handler ----
Params = map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]
...
...

---> Call funcName2Handler ----
Params = map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]
...
...
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
...
...
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
...
...

===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]

...
...

---> Call funcName3Handler ----
Params = map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2]

...
...
--- PASS: TestParams (0.01s)
PASS
ok      kis-flow/test   0.433s

我们可以看到,现在可以正确的取出各个层级的Params的配置参数了。

8.4 【V0.7】源代码

https://github.com/aceld/kis-flow/releases/tag/v0.7


作者:刘丹冰Aceld github: https://github.com/aceld

KisFlow开源项目地址:https://github.com/aceld/kis-flow

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 8.1 Flow Cache 数据流缓存
  • 8.1.1 go-cache
    • 8.1.2 KisFlow集成go-cache能力
      • (1) Flow提供抽象层接口
      • (2)提供一些常量
      • (3) KisFlow新增成员及初始化
      • (4)实现接口
  • 8.2 MetaData 临时缓存参数
  • 8.2.1 Flow添加MetaData
  • 8.2.2 Function 添加MetaData
  • 8.2.3 Connector添加MetaData
  • 8.3 Params 配置文件参数
    • 8.3.1 Flow添加Param读取接口
      • 8.3.2 单元测试
      • 8.4 【V0.7】源代码
      相关产品与服务
      流计算 Oceanus
      流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档