前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

原创
作者头像
刘丹冰Aceld
发布2024-07-23 09:58:09
920
发布2024-07-23 09:58:09
举报
文章被收录于专栏:KisFlow-Golang流式计算框架

接下来我们来增强KisFlow中Function对业务数据处理的聚焦,将之前Function的写法:

代码语言:go
复制
func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call funcName3Handler ----")
	fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

	for _, row := range flow.Input() {
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)
	}

	return nil
}

是从flow.Input()中 获取到原始数据,改成可以直接获取到业务想要的具体数据结构类型,而无需断言等类型判断和转换。改成的Function扩展参数用法大致如下:

proto

代码语言:go
复制
type StuScores struct {
	StuId  int `json:"stu_id"`
	Score1 int `json:"score_1"`
	Score2 int `json:"score_2"`
	Score3 int `json:"score_3"`
}

type StuAvgScore struct {
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

FaaS

代码语言:go
复制
type AvgStuScoreIn struct {
	serialize.DefaultSerialize
	proto.StuScores
}

type AvgStuScoreOut struct {
	serialize.DefaultSerialize
	proto.StuAvgScore
}

// AvgStuScore(FaaS) 计算学生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
	for _, row := range rows {
		avgScore := proto.StuAvgScore{
			StuId:    row.StuId,
			AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
		}
		// 提交结果数据
		_ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
	}

	return nil
}

这样,我们可以通过第三个形式参数rows直接拿到我们期待的目标输出结构体数据,不需要断言和转换,更加关注业务方的开发效率。

当然,如果你希望获取到原始的数据,依然可以从flow.Input()中获取到。

本章将实现KisFlow上述功能。

11.1 FaaS业务回调函数自描述

本节将完成FaaS的自描述概念改造,我们知道之前的FaaS回调如下:

代码语言:go
复制
type FaaS func(context.Context, Flow) error

那么我们需要一个结构体,来描述这个函数属性,包括他的函数名称、函数地址、形参数量、相残类型、返回值类型等等。

11.1.1 FaaSDesc 回调自描述类型

kis-flow/kis/下,新创建一个文件faas.go,定义如下结构体:

kis-flow/kis/faas.go

代码语言:go
复制
// FaaS Function as a Service

// 将
// type FaaS func(context.Context, Flow) error
// 改为
// type FaaS func(context.Context, Flow, ...interface{}) error
// 可以通过可变参数的任意输入类型进行数据传递
type FaaS interface{}

// FaaSDesc FaaS 回调计算业务函数 描述
type FaaSDesc struct {
	FnName    string         // Function名称
	f         interface{}    // FaaS 函数
	fName     string         // 函数名称
	ArgsType  []reflect.Type // 函数参数类型(集合)
	ArgNum    int            // 函数参数个数
	FuncType  reflect.Type   // 函数类型
	FuncValue reflect.Value  // 函数值(函数地址)
}

将之前的FaaS改进成interface{},而FaaSDesc具备了一些属性。

  • FnName: 表示当前Function的名称,例如我们之前例子的"funcDemo1" 等,这个是用来KisFlow给Function标识的FunctionName。
  • f:表示定义的FaaS函数。
  • fName: 定义f函数的函数名称。
  • ArgsType:定义的f函数的形参类型列表,这是一个slice。
  • ArgNum:定义的f函数的输入形参个数。
  • FuncType:定义的f函数的数据类型。
  • FuncValue:定义的f函数的函数值(可以被调度的函数地址)。

11.1.2 新建一个FaaSDesc对象

下面,提供一个新建FaaSDesc的构造函数,形参的类型就是KisFlow的FunctionName和定义的FaaS函数,如下:

kis-flow/kis/faas.go

代码语言:go
复制
// NewFaaSDesc 根据用户注册的FnName 和FaaS 回调函数,创建 FaaSDesc 描述实例
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
	// 传入的回调函数FaaS,函数值(函数地址)
	funcValue := reflect.ValueOf(f)

	// 传入的回调函数FaaS 类型
	funcType := funcValue.Type()

	// 判断传递的FaaS指针是否是函数类型
	if !isFuncType(funcType) {
		return nil, fmt.Errorf("provided FaaS type is %s, not a function", funcType.Name())
	}

	// 判断传递的FaaS函数是否有返回值类型是只包括(error)
	if funcType.NumOut() != 1 || funcType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
		return nil, errors.New("function must have exactly one return value of type error")
	}

	// FaaS函数的参数类型集合
	argsType := make([]reflect.Type, funcType.NumIn())

	// 获取FaaS的函数名称
	fullName := runtime.FuncForPC(funcValue.Pointer()).Name()

	// 确保  FaaS func(context.Context, Flow, ...interface{}) error 形参列表,存在context.Context 和 kis.Flow

	// 是否包含kis.Flow类型的形参
	containsKisFlow := false
	// 是否包含context.Context类型的形参
	containsCtx := false

	// 遍历FaaS的形参类型
	for i := 0; i < funcType.NumIn(); i++ {

		// 取出第i个形式参数类型
		paramType := funcType.In(i)

		if isFlowType(paramType) {
			// 判断是否包含kis.Flow类型的形参
			containsKisFlow = true

		} else if isContextType(paramType) {
			// 判断是否包含context.Context类型的形参
			containsCtx = true

		} else if isSliceType(paramType) {
            // 判断是否包含Slice类型的形参

			// 获取当前参数Slice的元素类型
			itemType := paramType.Elem()

			// 如果当前参数是一个指针类型,则获取指针指向的结构体类型
			if itemType.Kind() == reflect.Ptr {
				itemType = itemType.Elem() // 获取指针指向的结构体类型
			}
		} else {
			// Other types are not supported...
		}

		// 将当前形参类型追加到argsType集合中
		argsType[i] = paramType
	}

	if !containsKisFlow {
		// 不包含kis.Flow类型的形参,返回错误
		return nil, errors.New("function parameters must have kis.Flow param, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
	}

	if !containsCtx {
		// 不包含context.Context类型的形参,返回错误
		return nil, errors.New("function parameters must have context, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
	}

	// 返回FaaSDesc描述实例
	return &FaaSDesc{
		FnName:    fnName,
		f:         f,
		fName:     fullName,
		ArgsType:  argsType,
		ArgNum:    len(argsType),
		FuncType:  funcType,
		FuncValue: funcValue,
	}, nil
}

这里面通过用reflect反射能力,依次从f函数中获取相关的属性值,存放在FaaSDesc中。

这里面为了确保开发者传递的FaaS原因满足如下格式:

代码语言:go
复制
type FaaS func(context.Context, Flow, ...interface{}) error

所以对形参context.Context和形参Flow做了严格的形参类型校验,其中的校验方法如下:

kis-flow/kis/faas.go

代码语言:go
复制
// isFuncType 判断传递进来的 paramType 是否是函数类型
func isFuncType(paramType reflect.Type) bool {
	return paramType.Kind() == reflect.Func
}

// isFlowType 判断传递进来的 paramType 是否是 kis.Flow 类型
func isFlowType(paramType reflect.Type) bool {
	var flowInterfaceType = reflect.TypeOf((*Flow)(nil)).Elem()

	return paramType.Implements(flowInterfaceType)
}

// isContextType 判断传递进来的 paramType 是否是 context.Context 类型
func isContextType(paramType reflect.Type) bool {
	typeName := paramType.Name()

	return strings.Contains(typeName, "Context")
}

// isSliceType 判断传递进来的 paramType 是否是切片类型
func isSliceType(paramType reflect.Type) bool {
	return paramType.Kind() == reflect.Slice
}

NewFaaSDesc()containsKisFlowcontainsCtx两个bool类型的变量来判断是否包括Context和Flow类型。

下面这段代码是为了兼容传递的形参类型是结构体指针时候的兼容:

代码语言:go
复制
            // ... ... 

            // 获得当前形参类型
			itemType := paramType.Elem()

			// 如果当前参数是一个指针类型,则获取指针指向的结构体类型
			if itemType.Kind() == reflect.Ptr {
				itemType = itemType.Elem() // 获取指针指向的结构体类型
			}

            // ... ... 

比如开发者传递的FaaS函数原型如下:

代码语言:go
复制
func MyFaaSDemo(ctx context.Context, flow Flow, []*A) error

和:

代码语言:go
复制
func MyFaaSDemo(ctx context.Context, flow Flow, []A) error

11.1.3 注册FaaS函数

那么接下来,我们将kisPool模块,的注册FaaS函数的方法修改成注册一个FaaSDesc描述,修改后的注册方法如下:

kis-flow/kis/pool.go

代码语言:go
复制
// FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册
func (pool *kisPool) FaaS(fnName string, f FaaS) {

	// 当注册FaaS计算逻辑回调时,创建一个FaaSDesc描述对象
	faaSDesc, err := NewFaaSDesc(fnName, f)
	if err != nil {
		panic(err)
	}

	pool.fnLock.Lock() // 写锁
	defer pool.fnLock.Unlock()

	if _, ok := pool.fnRouter[fnName]; !ok {
		// 将FaaSDesc描述对象注册到fnRouter中
		pool.fnRouter[fnName] = faaSDesc
	} else {
		errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
		panic(errString)
	}

	log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
}

那么现在的fnRouter中保存的key依然是FunctionName,但是value则为当前FaaS函数的描述对象FaaSDesc.

11.1.4 kisPool调度FaaSDesc

最后再调度Function的时候,通过FaaSDesc取出调度函数地址和函数形参列表进行函数的调度。

修改的后的CallFunction()如下:

kis-flow/kis/pool.go

代码语言:go
复制
// CallFunction 调度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {

	if funcDesc, ok := pool.fnRouter[fnName]; ok {

		// 被调度Function的形参列表
		params := make([]reflect.Value, 0, funcDesc.ArgNum)

		for _, argType := range funcDesc.ArgsType {

			// 如果是Flow类型形参,则将 flow的值传入
			if isFlowType(argType) {
				params = append(params, reflect.ValueOf(flow))
				continue
			}

			// 如果是Context类型形参,则将 ctx的值传入
			if isContextType(argType) {
				params = append(params, reflect.ValueOf(ctx))
				continue
			}

			// 如果是Slice类型形参,则将 flow.Input()的值传入
			if isSliceType(argType) {
				params = append(params, value)
				continue
			}

			// 传递的参数,既不是Flow类型,也不是Context类型,也不是Slice类型,则默认给到零值
			params = append(params, reflect.Zero(argType))
		}

		// 调用当前Function 的计算逻辑
		retValues := funcDesc.FuncValue.Call(params)

		// 取出第一个返回值,如果是nil,则返回nil
		ret := retValues[0].Interface()
		if ret == nil {
			return nil
		}

		// 如果返回值是error类型,则返回error
		return retValues[0].Interface().(error)

	}

	log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)

	return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}

函数的整体调度逻辑大致如下:

首选通过fnName进行从fnRouter路由到对应的FaaSDesc。遍历FaaSDesc的形参列表:

将Context和Flow对象依次取出来,将额外传递的自定义切片形参取出来,如果传递的参数,既不是Flow类型,也不是Context类型,也不是Slice类型,则默认给到零值,如下:

代码语言:go
复制
			params = append(params, reflect.Zero(argType))

最后执行函数的调度:

代码语言:go
复制
		retValues := funcDesc.FuncValue.Call(params)

得到第一个返回值error的数值,为nil则返回nil,否则返回error类型。

这样我们的FaaS自描述的调度模式就建立成功了,那么有了这套功能KisFlow可以做什么事情呢,下一节我们可以再调度FaaSDesc的时候将传递的自定义形参的数据类型进行序列化,得到开发者期待的数据类型。

11.2 FaaS形参的自定义数据类型序列化

11.2.1 Serialize序列化接口

首先,我们定义一个数据序列化接口,在kis-flow/kis/下创建serialize.go 文件,如下:

kis-flow/kis/serialize.go

代码语言:go
复制
// Serialize 数据序列化接口
type Serialize interface {
	// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
	UnMarshal(common.KisRowArr, reflect.Type) (reflect.Value, error)
	// Marshal 用于将指定类型的值序列化为 KisRowArr。
	Marshal(interface{}) (common.KisRowArr, error)
}

其中KisRowArr是我们KisFlow中传递每个Function的数据切片,之前我们定义在了kis-flow/common/data_type.go中:

代码语言:go
复制
package common

// KisRow 一行数据
type KisRow interface{}

// KisRowArr 一次业务的批量数据
type KisRowArr []KisRow

/*
	KisDataMap 当前Flow承载的全部数据
   	key	:  数据所在的Function ID
    value: 对应的KisRow
*/
type KisDataMap map[string]KisRowArr

Serialize提供了两个接口:

  • UnMarshal:用于将 KisRowArr 反序列化为指定类型的值。
  • Marshal:用于将指定类型的值序列化为 KisRowArr。

KisFlow会提供一个默认的Serialize给每个FaaS函数,开发者也可以自定义自己的Serialize来对FaaS传递的形参进行自定义的数据序列化动作。

11.2.2 KisFlow默认的Serialize序列化

KisFlow提供一个默认的Serialize序列化实例,主要以Json格式为主,在kis-flow/下创建serialize/文件夹,在kis-flow/serialize/下创建serialize_default.go文件,实现的序列化和反序列化代码如下:

kis-flow/serialize/serialize_default.go

代码语言:go
复制
package serialize

import (
	"encoding/json"
	"fmt"
	"kis-flow/common"
	"reflect"
)

type DefaultSerialize struct{}

// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
func (f *DefaultSerialize) UnMarshal(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
	// 确保传入的类型是一个切片
	if r.Kind() != reflect.Slice {
		return reflect.Value{}, fmt.Errorf("r must be a slice")
	}

	slice := reflect.MakeSlice(r, 0, len(arr))

	// 遍历每个元素并尝试反序列化
	for _, row := range arr {
		var elem reflect.Value
		var err error

		// 尝试断言为结构体或指针
		elem, err = unMarshalStruct(row, r.Elem())
		if err == nil {
			slice = reflect.Append(slice, elem)
			continue
		}

		// 尝试直接反序列化字符串
		elem, err = unMarshalJsonString(row, r.Elem())
		if err == nil {
			slice = reflect.Append(slice, elem)
			continue
		}

		// 尝试先序列化为 JSON 再反序列化
		elem, err = unMarshalJsonStruct(row, r.Elem())
		if err == nil {
			slice = reflect.Append(slice, elem)
		} else {
			return reflect.Value{}, fmt.Errorf("failed to decode row: %v", err)
		}
	}

	return slice, nil
}

// Marshal 用于将指定类型的值序列化为 KisRowArr(json 序列化)。
func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
	var arr common.KisRowArr

	switch reflect.TypeOf(i).Kind() {
	case reflect.Slice, reflect.Array:
		slice := reflect.ValueOf(i)
		for i := 0; i < slice.Len(); i++ {
			// 序列化每个元素为 JSON 字符串,并将其添加到切片中。
			jsonBytes, err := json.Marshal(slice.Index(i).Interface())
			if err != nil {
				return nil, fmt.Errorf("failed to marshal element to JSON: %v  ", err)
			}
			arr = append(arr, string(jsonBytes))
		}
	default:
		// 如果不是切片或数组类型,则直接序列化整个结构体为 JSON 字符串。
		jsonBytes, err := json.Marshal(i)
		if err != nil {
			return nil, fmt.Errorf("failed to marshal element to JSON: %v  ", err)
		}
		arr = append(arr, string(jsonBytes))
	}

	return arr, nil
}

其中一些函数定义如下:

kis-flow/serialize/serialize_default.go

代码语言:go
复制
// 尝试断言为结构体或指针
func unMarshalStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
	// 检查 row 是否为结构体或结构体指针类型
	rowType := reflect.TypeOf(row)
	if rowType == nil {
		return reflect.Value{}, fmt.Errorf("row is nil pointer")
	}
	if rowType.Kind() != reflect.Struct && rowType.Kind() != reflect.Ptr {
		return reflect.Value{}, fmt.Errorf("row must be a struct or struct pointer type")
	}

	// 如果 row 是指针类型,则获取它指向的类型
	if rowType.Kind() == reflect.Ptr {
		// 空指针
		if reflect.ValueOf(row).IsNil() {
			return reflect.Value{}, fmt.Errorf("row is nil pointer")
		}

		// 解引用
		row = reflect.ValueOf(row).Elem().Interface()

		// 拿到解引用后的类型
		rowType = reflect.TypeOf(row)
	}

	// 检查是否可以将 row 断言为 elemType(目标类型)
	if !rowType.AssignableTo(elemType) {
		return reflect.Value{}, fmt.Errorf("row type cannot be asserted to elemType")
	}

	// 将 row 转换为 reflect.Value 并返回
	return reflect.ValueOf(row), nil
}

// 尝试直接反序列化字符串(将Json字符串 反序列化为 结构体)
func unMarshalJsonString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
	// 判断源数据是否可以断言成string
	str, ok := row.(string)
	if !ok {
		return reflect.Value{}, fmt.Errorf("not a string")
	}

	// 创建一个新的结构体实例,用于存储反序列化后的值
	elem := reflect.New(elemType).Elem()

	// 尝试将json字符串反序列化为结构体。
	if err := json.Unmarshal([]byte(str), elem.Addr().Interface()); err != nil {
		return reflect.Value{}, fmt.Errorf("failed to unmarshal string to struct: %v", err)
	}

	return elem, nil
}

// 尝试先序列化为 JSON 再反序列化(将结构体转换成Json字符串,再将Json字符串 反序列化为 结构体)
func unMarshalJsonStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
	// 将 row 序列化为 JSON 字符串
	jsonBytes, err := json.Marshal(row)
	if err != nil {
		return reflect.Value{}, fmt.Errorf("failed to marshal row to JSON: %v  ", err)
	}

	// 创建一个新的结构体实例,用于存储反序列化后的值
	elem := reflect.New(elemType).Interface()

	// 将 JSON 字符串反序列化为结构体
	if err := json.Unmarshal(jsonBytes, elem); err != nil {
		return reflect.Value{}, fmt.Errorf("failed to unmarshal JSON to element: %v  ", err)
	}

	return reflect.ValueOf(elem).Elem(), nil
}
  • UnMarshal(): 的实现 首先判断形参是否是一个Slice,如果是的话,那么切片中的每个元素的数据进行序列化,优先尝试unMarshalStruct()结构体反序列化,其次尝试json字符串的反序列化unMarshalJsonString(),最后再尝试具备相同属性的结构体但是名称不同的反序列化unMarshalJsonStruct()
  • Marshal(): 则是将任意类型序列化为json二进制字符串存储在KisRowArr中。

注意:KisFlow目前的默认序列化只实现了json格式的序列化,开发者可以参考DefaultSerialize{} 来实现自己其他格式的数据序列化和反序列化动作。

11.2.3 默认的默认的Serialize实例

在serialize的接口定义中,定义一个全局默认的序列化实例,defaultSerialize。

kis-flow/kis/serialize.go

代码语言:go
复制
// defaultSerialize KisFlow提供的默认序列化实现(开发者可以自定义)
var defaultSerialize = &serialize.DefaultSerialize{}

同时提供一个判断一个数据类型是否实现了抽象接口Serialize的校验方法,如下:

kis-flow/kis/serialize.go

代码语言:go
复制
// isSerialize 判断传递进来的 paramType 是否实现了 Serialize 接口
func isSerialize(paramType reflect.Type) bool {
	return paramType.Implements(reflect.TypeOf((*Serialize)(nil)).Elem())
}

11.2.4 FaaSDesc实现Serialize序列化接口

现在将FaaSDesc去继承且实现Serialize接口,在调度FaaSDesc的时候将传递的输入参数进行序列化得到相对应的具体类型形参,定义如下:

kis-flow/kis/faas.go

代码语言:go
复制
// FaaSDesc FaaS 回调计算业务函数 描述
type FaaSDesc struct {
    // +++++++
	Serialize                // 当前Function的数据输入输出序列化实现
    // +++++++
	FnName    string         // Function名称
	f         interface{}    // FaaS 函数
	fName     string         // 函数名称
	ArgsType  []reflect.Type // 函数参数类型(集合)
	ArgNum    int            // 函数参数个数
	FuncType  reflect.Type   // 函数类型
	FuncValue reflect.Value  // 函数值(函数地址)
}

然后,在构造方法NewFaaSDesc()加上对自定义形参的判断,判断传递的自定义形参是否实现了Serialize的两个序列化接口,如果实现了,则使用自定义的序列化接口,如果没有实现,则使用默认的DefaultSerialize{}实例。

kis-flow/kis/faas.go

代码语言:go
复制
// NewFaaSDesc 根据用户注册的FnName 和FaaS 回调函数,创建 FaaSDesc 描述实例
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {

    // ++++++++++
	// 输入输出序列化实例
	var serializeImpl Serialize
    // ++++++++++

	// ... ...
    // ... ...
    
	// 遍历FaaS的形参类型
	for i := 0; i < funcType.NumIn(); i++ {

		// 取出第i个形式参数类型
		paramType := funcType.In(i)

		if isFlowType(paramType) {
			// 判断是否包含kis.Flow类型的形参
			containsKisFlow = true

		} else if isContextType(paramType) {
			// 判断是否包含context.Context类型的形参
			containsCtx = true

		} else if isSliceType(paramType) {

			// 获取当前参数Slice的元素类型
			itemType := paramType.Elem()

			// 如果当前参数是一个指针类型,则获取指针指向的结构体类型
			if itemType.Kind() == reflect.Ptr {
				itemType = itemType.Elem() // 获取指针指向的结构体类型
			}


            // +++++++++++++++++++++++++++++

			// Check if f implements Serialize interface
			// (检测传递的FaaS函数是否实现了Serialize接口)
			if isSerialize(itemType) {
				// 如果当前形参实现了Serialize接口,则使用当前形参的序列化实现
				serializeImpl = reflect.New(itemType).Interface().(Serialize)

			} else {
				// 如果当前形参没有实现Serialize接口,则使用默认的序列化实现
				serializeImpl = defaultSerialize // Use global default implementation
			}
            // +++++++++++++++++++++++++++++++
            
		} else {
			// Other types are not supported
		}

		// 将当前形参类型追加到argsType集合中
		argsType[i] = paramType
	}

	// ... ...
    // ... ...

	// 返回FaaSDesc描述实例
	return &FaaSDesc{
		Serialize: serializeImpl,
		FnName:    fnName,
		f:         f,
		fName:     fullName,
		ArgsType:  argsType,
		ArgNum:    len(argsType),
		FuncType:  funcType,
		FuncValue: funcValue,
	}, nil
}

11.2.5 完成调度FaaS数据序列化

最后在调度FaaSDesc的时候,解析形参的时候,如果是自定义的Slice参数,则对齐进行反序列化操作,将flow.Input()的原数据反序列化成为开发者需要的结构体数据,进行调度FaaS,实现如下:

kis-flow/kis/pool.go

代码语言:go
复制
// CallFunction 调度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {

	if funcDesc, ok := pool.fnRouter[fnName]; ok {

		// 被调度Function的形参列表
		params := make([]reflect.Value, 0, funcDesc.ArgNum)

		for _, argType := range funcDesc.ArgsType {

			// 如果是Flow类型形参,则将 flow的值传入
			if isFlowType(argType) {
				params = append(params, reflect.ValueOf(flow))
				continue
			}

			// 如果是Context类型形参,则将 ctx的值传入
			if isContextType(argType) {
				params = append(params, reflect.ValueOf(ctx))
				continue
			}

			// 如果是Slice类型形参,则将 flow.Input()的值传入
			if isSliceType(argType) {

                // +++++++++++++++++++
				// 将flow.Input()中的原始数据,反序列化为argType类型的数据
				value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
				if err != nil {
					log.Logger().ErrorFX(ctx, "funcDesc.Serialize.DecodeParam err=%v", err)
				} else {
					params = append(params, value)
					continue
				}
                // +++++++++++++++++++
			}

			// 传递的参数,既不是Flow类型,也不是Context类型,也不是Slice类型,则默认给到零值
			params = append(params, reflect.Zero(argType))
		}

		// 调用当前Function 的计算逻辑
		retValues := funcDesc.FuncValue.Call(params)

		// 取出第一个返回值,如果是nil,则返回nil
		ret := retValues[0].Interface()
		if ret == nil {
			return nil
		}

		// 如果返回值是error类型,则返回error
		return retValues[0].Interface().(error)

	}

	log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)

	return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}

这样我们就将数据序列化的动作和FaaSDesc模块结合起来了,接下来,我们写一个单元测试来测试这部分的能力。

11.3 自定义形参序列化单元测试

11.3.1 Flow与Function的配置文件定义

单元测试,我们新建两个Function配置如下:

kis-flow/test/load_conf/func/func-avgStuScore.yml

代码语言:yaml
复制
kistype: func
fname: AvgStuScore
fmode: Calculate
source:
    name: 学生平均分
    must:
        - stu_id

kis-flow/test/load_conf/func/func-PrintStuAvgScore.yml

代码语言:yaml
复制
kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
    name: 学生平均分
    must:
        - stu_id

然后我们来定义一个Flow将上述的两个Function链接起来

kis-flow/test/load_conf/flow/flow-StuAvg.yml

代码语言:yaml
复制
kistype: flow
status: 1
flow_name: StuAvg
flows:
    - fname: AvgStuScore
    - fname: PrintStuAvgScore

11.3.2 自定义基础数据proto定义

kis-flow/test/下创建proto/文件夹,创建一个自定义的基础数据proto,为了今后数据协议的复用,如下:

kis-flow/test/proto/stu_score.go

代码语言:go
复制
package proto

// 学生学习分数
type StuScores struct {
	StuId  int `json:"stu_id"`
	Score1 int `json:"score_1"`
	Score2 int `json:"score_2"`
	Score3 int `json:"score_3"`
}

// 学生的平均分
type StuAvgScore struct {
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

11.3.3 定义两个FaaS计算回调函数

定义两个FaaS计算函数,一个为计算一个Student的平均分,一个打印Student的平均分,如下:

kis-flow/test/faas/faas_stu_score_avg.go

代码语言:go
复制
package faas

import (
	"context"
	"kis-flow/kis"
	"kis-flow/serialize"
	"kis-flow/test/proto"
)

type AvgStuScoreIn struct {
	serialize.DefaultSerialize
	proto.StuScores
}

type AvgStuScoreOut struct {
	serialize.DefaultSerialize
	proto.StuAvgScore
}

// AvgStuScore(FaaS) 计算学生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
	for _, row := range rows {
		avgScore := proto.StuAvgScore{
			StuId:    row.StuId,
			AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
		}
		// 提交结果数据
		_ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
	}

	return nil
}

AvgStuScore()方法为我们改进之后的FaaS函数,其中第三个形参rows []*AvgStuScoreIn为我们自定义序列化的形参,之前我们通过flow.Input()来拿到原始的数据,然后进行遍历,其实现在依然可以这么处理,但是每次可能需要开发者在FaaS中自行断言判断,对开发的效率有些成本,那么现在开发者完全可以通过AvgStuScoreIn来描述一个形参的数据,然后在AvgStuScore的业务中,通过遍历rows得到已经序列化好的结构体,增加的代码的可读性也降低的写业务的开发成本,提高了效率。

打印平均分的FaaS实现如下:

kis-flow/test/faas/faas_stu_score_avg_print.go

代码语言:go
复制
package faas

import (
	"context"
	"fmt"
	"kis-flow/kis"
	"kis-flow/serialize"
	"kis-flow/test/proto"
)

type PrintStuAvgScoreIn struct {
	serialize.DefaultSerialize
	proto.StuAvgScore
}

type PrintStuAvgScoreOut struct {
	serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {

	for _, row := range rows {
		fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
	}

	return nil
}

与上述函数一样,我们依然采用自定义的输入形参来进行逻辑开发。

11.3.4 单元测试用例

接下来我们来编写上面Flow的测试用例单元测试,代码如下:

kis-flow/test/kis_auto_inject_param_test.go

代码语言:go
复制
package test

import (
	"context"
	"kis-flow/common"
	"kis-flow/config"
	"kis-flow/file"
	"kis-flow/flow"
	"kis-flow/kis"
	"kis-flow/test/faas"
	"kis-flow/test/proto"
	"testing"
)

func TestAutoInjectParamWithConfig(t *testing.T) {
	ctx := context.Background()

	kis.Pool().FaaS("AvgStuScore", faas.AvgStuScore)
	kis.Pool().FaaS("PrintStuAvgScore", faas.PrintStuAvgScore)

	// 1. 加载配置文件并构建Flow
	if err := file.ConfigImportYaml("load_conf/"); err != nil {
		panic(err)
	}

	// 2. 获取Flow
	flow1 := kis.Pool().GetFlow("StuAvg")
	if flow1 == nil {
		panic("flow1 is nil")
	}

	// 3. 提交原始数据
	_ = flow1.CommitRow(&faas.AvgStuScoreIn{
		StuScores: proto.StuScores{
			StuId:  100,
			Score1: 1,
			Score2: 2,
			Score3: 3,
		},
	})
	_ = flow1.CommitRow(faas.AvgStuScoreIn{
		StuScores: proto.StuScores{
			StuId:  100,
			Score1: 1,
			Score2: 2,
			Score3: 3,
		},
	})

	// 提交原始数据(json字符串)
	_ = flow1.CommitRow(`{"stu_id":101}`)

	// 4. 执行flow1
	if err := flow1.Run(ctx); err != nil {
		panic(err)
	}
}

在提交原始数据的时候,我们这里面采用的是使用默认的序列化方式,支持json的反序列化支持,在CommitRow()的时候,一共提交的3条数据,前两条是提交的结构体数据,最后一次是提交的json字符串,目前都可以支持。

cd 到kis-flow/test/下,执行:

代码语言:bash
复制
$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig

得到结果如下:

代码语言:bash
复制
$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig
...
...
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
...
...
Add FlowRouter FlowName=StuAvg
context.Background
====> After CommitSrcData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
 map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]]

KisFunctionC, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023af80 ThisFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]] inPut:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

context.Background
 ====> After commitCurData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
 map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]]

KisFunctionE, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023b000 ThisFunctionId:func-7f308d00f4fa49488760ff1dfb85dc46 PrevFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]] inPut:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

stuid: [100], avg score: [2]
stuid: [100], avg score: [2]
stuid: [101], avg score: [0]
--- PASS: TestAutoInjectParamWithConfig (0.01s)
PASS
ok      kis-flow/test   0.030s

11.4 【V1.0】 源代码

https://github.com/aceld/kis-flow/releases/tag/v1.0


作者:刘丹冰Aceld github: https://github.com/aceld

KisFlow开源项目地址:https://github.com/aceld/kis-flow

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 11.1 FaaS业务回调函数自描述
    • 11.1.1 FaaSDesc 回调自描述类型
      • 11.1.2 新建一个FaaSDesc对象
        • 11.1.3 注册FaaS函数
          • 11.1.4 kisPool调度FaaSDesc
          • 11.2 FaaS形参的自定义数据类型序列化
            • 11.2.1 Serialize序列化接口
              • 11.2.2 KisFlow默认的Serialize序列化
                • 11.2.3 默认的默认的Serialize实例
                  • 11.2.4 FaaSDesc实现Serialize序列化接口
                    • 11.2.5 完成调度FaaS数据序列化
                    • 11.3 自定义形参序列化单元测试
                      • 11.3.1 Flow与Function的配置文件定义
                        • 11.3.2 自定义基础数据proto定义
                          • 11.3.3 定义两个FaaS计算回调函数
                            • 11.3.4 单元测试用例
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档