前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出

Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出

原创
作者头像
刘丹冰Aceld
发布2024-06-27 15:59:20
690
发布2024-06-27 15:59:20
举报

kis-flow源代码:https://github.com/aceld/kis-flow

6.1 配置的导入

现在每次建立Flow和Function等,都需要一系列繁琐的添加,不是很方便,接下来,我们可以通过批量读写配置文件,构建KisFlow中的结构关系,并且也可以将KisFlow的结构导出到本地文件中。目前我们先用文件的形式做配置的持久化,开发者也可以今后做数据库或者远程配置的持久化均可。

6.1.1 创建配置文件

首先我们在kis-flow/test/load\_conf/下创建需要加载的kisflow业务配置文件。

kis-flow/test/load\_conf/下分别创建conn/flow/func/三个文件夹分别存放Connector、Flow、Funciton的配置信息。

代码语言:bash
复制
├── conn

│   └── conn-ConnName1.yml

├── flow

│   └── flow-FlowName1.yml

└── func

    ├── func-FuncName1.yml

    ├── func-FuncName2.yml

    └── func-FuncName3.yml

分别创建一些yml文件。具体内容如下:

A.Function

kis-flow/test/load_conf/func/func-FuncNam1.yml

代码语言:yaml
复制
kistype: func

fname: funcName1

fmode: Verify

source:

  name: 公众号抖音商城户订单数据

  must:

    - order\_id

    - user\_id

kis-flow/test/load_conf/func/func-FuncNam2.yml

代码语言:yaml
复制
kistype: func

fname: funcName2

fmode: Save

source:

  name: 用户订单错误率

  must:

    - order\_id

    - user\_id

option:

  cname: ConnName1

kis-flow/test/load_conf/func/func-FuncNam2.yml

代码语言:yaml
复制
kistype: func

fname: funcName2

fmode: Save

source:

  name: 用户订单错误率

  must:

    - order\_id

    - user\_id

option:

  cname: ConnName1

kis-flow/test/load_conf/func/func-FuncNam3.yml

代码语言:yaml
复制
kistype: func

fname: funcName3

fmode: Calculate

source:

  name: 用户订单错误率

  must:

    - order\_id

    - user\_id
B.Connector

kis-flow/test/load_conf/func/func-ConnName1.yml

代码语言:yaml
复制
kistype: conn

cname: ConnName1

addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'

type: redis

key: redis-key

params:

  args1: value1

  args2: value2

load: null

save:

  - funcName2
C.Flow

kis-flow/test/load_conf/func/func-FlowName1.yml

代码语言:yaml
复制
kistype: flow

status: 1

flow\_name: flowName1

flows:

  - fname: funcName1

  - fname: funcName2

  - fname: funcName3

6.1.2 配置解析

创建kis-flow/file/目录,且创建kis-flow/file/config\_import.go文件。

首先定义一个可以存放全部配置的接口:

kis-flow/file/config_import.go

代码语言:go
复制
type allConfig struct {

    Flows map[string]\*config.KisFlowConfig

    Funcs map[string]\*config.KisFuncConfig

    Conns map[string]\*config.KisConnConfig

}

key作为各个模块的Name名称字段。

然后分别定义解析Flow、Function、Connector配置的方法。yaml的第三方库,我们用"gopkg.in/yaml.v3"这个库。

kis-flow/go.mod

代码语言:go
复制
module kis-flow



go 1.18



require github.com/google/uuid v1.5.0

require gopkg.in/yaml.v3 v3.0.1 // indirect
A. Flow 配置解析

kis-flow/file/config_import.go

代码语言:go
复制
// kisTypeFlowConfigure 解析Flow配置文件,yaml格式

func kisTypeFlowConfigure(all \*allConfig, confData []byte, fileName string, kisType interface{}) error {

    flow := new(config.KisFlowConfig)

    if ok := yaml.Unmarshal(confData, flow); ok != nil {

        return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))

    }



    // 如果FLow状态为关闭,则不做配置加载

    if common.KisOnOff(flow.Status) == common.FlowDisable {

        return nil

    }



    if \_, ok := all.Flows[flow.FlowName]; ok {

        return errors.New(fmt.Sprintf("%s set repeat flow\_id:%s", fileName, flow.FlowName))

    }



    // 加入配置集合中

    all.Flows[flow.FlowName] = flow



    return nil

}
  • confData:是文件二进制数据
  • fileName:是文件路径
  • kistype: 为配置文件类别

kisTypeFlowConfigure 会将配置信息解析到allConfig的Flows成员中。

同理,Function和Connector的解析办法如下。

B. Functioin配置解析

kis-flow/file/config_import.go

代码语言:go
复制
// kisTypeFuncConfigure 解析Function配置文件,yaml格式

func kisTypeFuncConfigure(all \*allConfig, confData []byte, fileName string, kisType interface{}) error {

    function := new(config.KisFuncConfig)

    if ok := yaml.Unmarshal(confData, function); ok != nil {

        return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))

    }

    if \_, ok := all.Funcs[function.FName]; ok {

        return errors.New(fmt.Sprintf("%s set repeat function\_id:%s", fileName, function.FName))

    }



    // 加入配置集合中

    all.Funcs[function.FName] = function



    return nil

}
C. Connector配置解析

kis-flow/file/config_import.go

代码语言:go
复制
// kisTypeConnConfigure 解析Connector配置文件,yaml格式

func kisTypeConnConfigure(all \*allConfig, confData []byte, fileName string, kisType interface{}) error {

    conn := new(config.KisConnConfig)

    if ok := yaml.Unmarshal(confData, conn); ok != nil {

        return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType))

    }



    if \_, ok := all.Conns[conn.CName]; ok {

        return errors.New(fmt.Sprintf("%s set repeat conn\_id:%s", fileName, conn.CName))

    }



    // 加入配置集合中

    all.Conns[conn.CName] = conn



    return nil

}

6.1.3 遍历文件

下面实现一个遍历一个路径loadPath下面所有的yml和yaml类型文件,按照kistype类别解析配置信息到allConfig中。

kis-flow/file/config_import.go

代码语言:go
复制
// parseConfigWalkYaml 全盘解析配置文件,yaml格式, 讲配置信息解析到allConfig中

func parseConfigWalkYaml(loadPath string) (\*allConfig, error) {



    all := new(allConfig)



    all.Flows = make(map[string]\*config.KisFlowConfig)

    all.Funcs = make(map[string]\*config.KisFuncConfig)

    all.Conns = make(map[string]\*config.KisConnConfig)



    err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error {

        // 校验文件后缀是否合法

        if suffix := path.Ext(filePath); suffix != ".yml" && suffix != ".yaml" {

            return nil

        }



        // 读取文件内容

        confData, err := ioutil.ReadFile(filePath)

        if err != nil {

            return err

        }



        confMap := make(map[string]interface{})



        // 校验yaml合法性

        if err := yaml.Unmarshal(confData, confMap); err != nil {

            return err

        }



        // 判断kisType是否存在

        if kisType, ok := confMap["kistype"]; !ok {

            return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath))

        } else {

            switch kisType {

            case common.KisIdTypeFlow:

                return kisTypeFlowConfigure(all, confData, filePath, kisType)



            case common.KisIdTypeFunction:

                return kisTypeFuncConfigure(all, confData, filePath, kisType)



            case common.KisIdTypeConnnector:

                return kisTypeConnConfigure(all, confData, filePath, kisType)



            default:

                return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))

            }

        }

    })



    if err != nil {

        return nil, err

    }



    return all, nil

}

6.1.4 导入方法

下面提供一个对外的公开方法ConfigImportYaml,需要提供一个导入的文件根路径。

kis-flow/file/config_import.go

代码语言:go
复制
// ConfigImportYaml 全盘解析配置文件,yaml格式

func ConfigImportYaml(loadPath string) error {



    all, err := parseConfigWalkYaml(loadPath)

    if err != nil {

        return err

    }



    for flowName, flowConfig := range all.Flows {



        // 构建一个Flow

        newFlow := flow.NewKisFlow(flowConfig)



        for \_, fp := range flowConfig.Flows {

            if err := buildFlow(all, fp, newFlow, flowName); err != nil {

                return err

            }

        }



        //将flow添加到FlowPool中

        kis.Pool().AddFlow(flowName, newFlow)

    }



    return nil

}

首先会调用parseConfigWalkYaml()将全部的配置信息加载到内存中。

其次,遍历所有的Flow,依次去构建Flow,最终将flow添加到Pool当中,具体的构建流程如下:

kis-flow/file/config_import.go

代码语言:go
复制
func buildFlow(all \*allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error {

    //加载当前Flow依赖的Function

    if funcConfig, ok := all.Funcs[fp.FuncName]; !ok {

        return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName))

    } else {

        //flow add connector

        if funcConfig.Option.CName != "" {

            // 加载当前Function依赖的Connector

            if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok {

                return errors.New(fmt.Sprintf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName))

            } else {

                // Function Config 关联 Connector Config

                \_ = funcConfig.AddConnConfig(connConf)

            }

        }



        //flow add function

        if err := newFlow.Link(funcConfig, fp.Params); err != nil {

            return err

        }

    }



    return nil

}

6.2 配置导入单元测试

创建单元测试文件 kis-flow/test/kis\_config\_import\_test.go

kis-flow/test/kis_config_import_test.go

代码语言:go
复制
package test



import (

    "context"

    "kis-flow/common"

    "kis-flow/file"

    "kis-flow/kis"

    "kis-flow/test/caas"

    "kis-flow/test/faas"

    "testing"

)



func TestConfigImportYmal(t \*testing.T) {

    ctx := context.Background()



    // 0. 注册Function 回调业务

    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)

    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)

    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)



    // 0. 注册ConnectorInit 和 Connector 回调业务

    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)

    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)



    // 1. 加载配置文件并构建Flow

    if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load\_conf/"); err != nil {

        panic(err)

    }



    // 2. 获取Flow

    flow1 := kis.Pool().GetFlow("flowName1")



    // 3. 提交原始数据

    \_ = flow1.CommitRow("This is Data1 from Test")

    \_ = flow1.CommitRow("This is Data2 from Test")

    \_ = flow1.CommitRow("This is Data3 from Test")



    // 4. 执行flow1

    if err := flow1.Run(ctx); err != nil {

        panic(err)

    }

}

先注册业务方法。然后通过ConfigImportYaml加载配置,之后从Pool中得到flow实例,提交数据,运行。

注意,这里的配置文件路径,写的是绝对路径。

cd 到kis-flow/test/目录下,执行指令:

代码语言:bash
复制
go test -test.v -test.paniconexit0 -test.run TestConfigImportYmal    

结果如下:

代码语言:bash
复制
=== RUN   TestConfigImportYmal

Add KisPool FuncName=funcName1

Add KisPool FuncName=funcName2

Add KisPool FuncName=funcName3

Add KisPool CaaSInit CName=ConnName1

Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save

===> Call Connector InitDemo1

&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}

Add FlowRouter FlowName=flowName1



context.Background

====> After CommitSrcData, flow\_name = flowName1, flow\_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b

All Level Data =

 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]



KisFunctionV, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114960 ThisFunctionId:func-37c7070f45144529891d433ae9c4ebfc PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}



---> Call funcName1Handler ----

In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data1 from Test

In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data2 from Test

In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data3 from Test

context.Background

 ====> After commitCurData, flow\_name = flowName1, flow\_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b

All Level Data =

 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]



KisFunctionS, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001149c0 ThisFunctionId:func-5315301ffbbb4ae4be021729ddff1569 PrevFunctionId:func-37c7070f45144529891d433ae9c4ebfc funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}



---> Call funcName2Handler ----

In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 0

===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save

===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0

In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 1

===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save

===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1

In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 2

===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save

===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2

context.Background

 ====> After commitCurData, flow\_name = flowName1, flow\_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b

All Level Data =

 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]]



KisFunctionC, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114a20 ThisFunctionId:func-89a6a662729b4a0895e849c40bf29892 PrevFunctionId:func-5315301ffbbb4ae4be021729ddff1569 funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]}



---> Call funcName3Handler ----

In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 0

In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 1

In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 2

--- PASS: TestConfigImportYmal (0.01s)

PASS

ok      kis-flow/test   0.517s

预期的结果和我们一致,现在我们可以通过配置文件进行加载且构建KisFlow了。

6.3 配置的导出

6.3.1 导出实现

kis-flow/file/config_export.go

代码语言:go
复制
package file



import (

    "errors"

    "fmt"

    "gopkg.in/yaml.v3"

    "io/ioutil"

    "kis-flow/common"

    "kis-flow/kis"

)



// ConfigExportYaml 将flow配置输出,且存储本地

func ConfigExportYaml(flow kis.Flow, savaPath string) error {



    if data, err := yaml.Marshal(flow.GetConfig()); err != nil {

        return err

    } else {

        //flow

        err := ioutil.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644)

        if err != nil {

            return err

        }



        //function

        for \_, fp := range flow.GetConfig().Flows {

            fConf := flow.GetFuncConfigByName(fp.FuncName)

            if fConf == nil {

                return errors.New(fmt.Sprintf("function name = %s config is nil ", fp.FuncName))

            }



            if fdata, err := yaml.Marshal(fConf); err != nil {

                return err

            } else {

                if err := ioutil.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil {

                    return err

                }

            }



            // Connector

            if fConf.Option.CName != "" {

                cConf, err := fConf.GetConnConfig()

                if err != nil {

                    return err

                }

                if cdata, err := yaml.Marshal(cConf); err != nil {

                    return err

                } else {

                    if err := ioutil.WriteFile(savaPath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil {

                        return err

                    }

                }

            }

        }

    }



    return nil

}

这里面需要补充一些接口,如下:

6.3.2 Flow新增接口

kis-flow/kis/flow.go

代码语言:go
复制
package kis



import (

    "context"

    "kis-flow/common"

    "kis-flow/config"

)



type Flow interface {

    // Run 调度Flow,依次调度Flow中的Function并且执行

    Run(ctx context.Context) error

    // Link 将Flow中的Function按照配置文件中的配置进行连接

    Link(fConf \*config.KisFuncConfig, fParams config.FParam) error

    // CommitRow 提交Flow数据到即将执行的Function层

    CommitRow(row interface{}) error

    // Input 得到flow当前执行Function的输入源数据

    Input() common.KisRowArr

    // GetName 得到Flow的名称

    GetName() string

    // GetThisFunction 得到当前正在执行的Function

    GetThisFunction() Function

    // GetThisFuncConf 得到当前正在执行的Function的配置

    GetThisFuncConf() \*config.KisFuncConfig

    // GetConnector 得到当前正在执行的Function的Connector

    GetConnector() (Connector, error)

    // GetConnConf 得到当前正在执行的Function的Connector的配置

    GetConnConf() (\*config.KisConnConfig, error)

    

    // +++++++++++++++++++++++++++++++

    // GetConfig 得到当前Flow的配置

    GetConfig() \*config.KisFlowConfig

    // GetFuncConfigByName 得到当前Flow的配置

    GetFuncConfigByName(funcName string) \*config.KisFuncConfig

    // +++++++++++++++++++++++++++++++

}

flow新增的接口实现如下:

kis-flow/flow/kis_flow.go

代码语言:go
复制
func (flow \*KisFlow) GetConfig() \*config.KisFlowConfig {

    return flow.Conf

}



// GetFuncConfigByName 得到当前Flow的配置

func (flow \*KisFlow) GetFuncConfigByName(funcName string) \*config.KisFuncConfig {

    if f, ok := flow.Funcs[funcName]; ok {

        return f.GetConfig()

    } else {

        log.Logger().ErrorF("GetFuncConfigByName(): Function %s not found", funcName)

        return nil

    }

}

6.3.3 KisFlow中Funcs修复

这里面之前有个笔误。

kis-flow/flow/kis_flow.go

代码语言:go
复制
// KisFlow 用于贯穿整条流式计算的上下文环境

type KisFlow struct {

    // 基础信息

    Id   string                // Flow的分布式实例ID(用于KisFlow内部区分不同实例)

    Name string                // Flow的可读名称

    Conf \*config.KisFlowConfig // Flow配置策略



    // Function列表

    Funcs          map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName

    FlowHead       kis.Function            // 当前Flow所拥有的Function列表表头

    FlowTail       kis.Function            // 当前Flow所拥有的Function列表表尾

    flock          sync.RWMutex            // 管理链表插入读写的锁

    ThisFunction   kis.Function            // Flow当前正在执行的KisFunction对象

    ThisFunctionId string                  // 当前执行到的Function ID

    PrevFunctionId string                  // 当前执行到的Function 上一层FunctionID



    // Function列表参数

    funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam

    fplock     sync.RWMutex             // 管理funcParams的读写锁



    // 数据

    buffer common.KisRowArr  // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch

    data   common.KisDataMap // 流式计算各个层级的数据源

    inPut  common.KisRowArr  // 当前Function的计算输入数据

}

这里的Funcs成员,其key的含义,之前我们定义的是KisID,现在要修正为key的含义是FunctionName。

下面想Funcs成员赋值的代码做一个简单的修正

代码语言:go
复制
// appendFunc 将Function添加到Flow中, 链表操作

func (flow \*KisFlow) appendFunc(function kis.Function, fParam config.FParam) error {

       // ... ... 





    //将Function Name 详细Hash对应关系添加到flow对象中

    flow.Funcs[function.GetConfig().FName] = function



    // ... ... 

}

6.3.4 KisPool新增方法

kis-flow/kis/pool.go

代码语言:go
复制
// GetFlows 得到全部的Flow

func (pool \*kisPool) GetFlows() []Flow {

    pool.flowLock.RLock() // 读锁

    defer pool.flowLock.RUnlock()



    var flows []Flow



    for \_, flow := range pool.flowRouter {

        flows = append(flows, flow)

    }



    return flows

}

KisPool新增 获取全部Flow的方法,以支持导出模块使用。

6.4 配置导出单元测试

kis-flow/test/创建kis\_config\_export\_test.go文件。

代码语言:go
复制
package test



import (

    "kis-flow/common"

    "kis-flow/file"

    "kis-flow/kis"

    "kis-flow/test/caas"

    "kis-flow/test/faas"

    "testing"

)



func TestConfigExportYmal(t \*testing.T) {

    // 0. 注册Function 回调业务

    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)

    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)

    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)



    // 0. 注册ConnectorInit 和 Connector 回调业务

    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)

    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)



    // 1. 加载配置文件并构建Flow

    if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load\_conf/"); err != nil {

        panic(err)

    }



    // 2. 讲构建的内存KisFlow结构配置导出的文件当中

    flows := kis.Pool().GetFlows()

    for \_, flow := range flows {

        if err := file.ConfigExportYaml(flow, "/Users/gopath/src/kis-flow/test/export\_conf/"); err != nil {

            panic(err)

        }

    }

}

cd到kis-flow/test/下 执行:

代码语言:bash
复制
go test -test.v -test.paniconexit0 -test.run  TestConfigExportYmal 

会在kis-flow/test/export\_conf/下得到导出的配置。

代码语言:bash
复制
├── export\_conf

│   ├── conn-ConnName1.yaml

│   ├── flow-flowName1.yaml

│   ├── func-funcName1.yaml

│   ├── func-funcName2.yaml

│   └── func-funcName3.yaml

6.5 【V0.5】源代码

https://github.com/aceld/kis-flow/releases/tag/v0.5


作者:刘丹冰Aceld github: https://github.com/aceld

KisFlow开源项目地址:https://github.com/aceld/kis-flow

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 6.1 配置的导入
    • 6.1.1 创建配置文件
      • A.Function
      • B.Connector
      • C.Flow
    • 6.1.2 配置解析
      • A. Flow 配置解析
      • B. Functioin配置解析
      • C. Connector配置解析
    • 6.1.3 遍历文件
      • 6.1.4 导入方法
      • 6.2 配置导入单元测试
      • 6.3 配置的导出
        • 6.3.1 导出实现
          • 6.3.2 Flow新增接口
            • 6.3.3 KisFlow中Funcs修复
              • 6.3.4 KisPool新增方法
              • 6.4 配置导出单元测试
              • 6.5 【V0.5】源代码
              相关产品与服务
              流计算 Oceanus
              流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档