前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)

Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)

原创
作者头像
刘丹冰Aceld
发布2024-04-18 14:50:52
1050
发布2024-04-18 14:50:52
举报
文章被收录于专栏:KisFlow-Golang流式计算框架

KisFlow源代码:https://github.com/aceld/kis-flow

2. V0.1-项目构建及基础模块定义

首先我们创建我们的项目,项目的主文件目录就叫KisFlow,且在Github上创建对应的仓库: https://github.com/aceld/kis-flow 然后将项目代码clone到本地。

2.0 项目构建

(这里如果你是按照本教程开发,需要在自己的仓库重新创建一个新项目,并且clone到本地开发)

2.0.1 创建项目目录

接下来,我们先将项目中的必要的文件目录创建好,项目的目录结构如下:

代码语言:bash
复制
 kis-flow /
.
├── LICENSE
├── README.md
├── common/
├── example/
├── function/
├── conn/
├── config/
├── flow/
└── kis/

这里我们创建三个文件夹, common/为 存放我们一些公用的基础常量和一些枚举参数,还有一些工具类的方法。 flow/为存放KisFlow的核心代码。 function/为存放KisFunction的核心代码。 conn/为存放KisConnector的核心代码。 config/ 存放flow、functioin、connector等策略配置信息模块。 example/为我们针对KisFlow的一些测试案例和test单元测试案例等,能够及时验证我们的项目效果。 kis/来存放所有模块的抽象层。

2.0.1 创建go.mod

cd 到 kis-flow的项目根目录,执行如下指令:

我们会得到go.mod文件,这个是作为当前项目的包管理文件,如下:

代码语言:go
复制
module kis-flow
go 1.18

首先因为在之后会有很多调试日志要打印,我们先把日志模块集成了,日志模块KisFlow提供一个默认的标准输出Logger对象,再对我开放一个SetLogger() 方法来进行重新设置开发者自己的Logger模块。

2.1 KisLogger

2.1.1 Logger抽象接口

将Logger的定义在kis-flow/log/目录下,创建kis_log.go文件:

kis-flow/log/kis_log.go

代码语言:go
复制
package log

import "context"

type KisLogger interface {
	// InfoFX 有上下文的Info级别日志接口, format字符串格式
	InfoFX(ctx context.Context, str string, v ...interface{})
	// ErrorFX 有上下文的Error级别日志接口, format字符串格式
	ErrorFX(ctx context.Context, str string, v ...interface{})
	// DebugFX 有上下文的Debug级别日志接口, format字符串格式
	DebugFX(ctx context.Context, str string, v ...interface{})

	// InfoF 无上下文的Info级别日志接口, format字符串格式
	InfoF(str string, v ...interface{})
	// ErrorF 无上下文的Error级别日志接口, format字符串格式
	ErrorF(str string, v ...interface{})
	// DebugF 无上下文的Debug级别日志接口, format字符串格式
	DebugF(str string, v ...interface{})
}

// kisLog 默认的KisLog 对象
var kisLog KisLogger

// SetLogger 设置KisLog对象, 可以是用户自定义的Logger对象
func SetLogger(newlog KisLogger) {
	kisLog = newlog
}

// Logger 获取到kisLog对象
func Logger() KisLogger {
	return kisLog
}

KisLogger提供了三个级别的日志,分别是Info、Error、Debug。且也分别提供了具备context参数与不具备context参数的两套日志接口。

提供一个全局对象kisLog,默认的KisLog 对象。以及方法SetLogger()Logger()供开发可以设置自己的Logger对象以及获取到Logger对象。

2.1.2 默认的日志对象KisDefaultLogger

如果开发没有自定义的日志对象定义,那么KisFlow会提供一个默认的日志对象kisDefaultLogger,这个类实现了KisLogger的全部接口,且都是默认打印到标准输出的形式来打印日志,定义在kis-flow/log/目录下,创建kis_default_log.go文件。

kis-flow/log/kis_default_log.go

代码语言:go
复制
package log

import (
	"context"
	"fmt"
)

// kisDefaultLog 默认提供的日志对象
type kisDefaultLog struct{}

func (log *kisDefaultLog) InfoF(str string, v ...interface{}) {
	fmt.Printf(str, v...)
}

func (log *kisDefaultLog) ErrorF(str string, v ...interface{}) {
	fmt.Printf(str, v...)
}

func (log *kisDefaultLog) DebugF(str string, v ...interface{}) {
	fmt.Printf(str, v...)
}

func (log *kisDefaultLog) InfoFX(ctx context.Context, str string, v ...interface{}) {
	fmt.Println(ctx)
	fmt.Printf(str, v...)
}

func (log *kisDefaultLog) ErrorFX(ctx context.Context, str string, v ...interface{}) {
	fmt.Println(ctx)
	fmt.Printf(str, v...)
}

func (log *kisDefaultLog) DebugFX(ctx context.Context, str string, v ...interface{}) {
	fmt.Println(ctx)
	fmt.Printf(str, v...)
}

func init() {
	// 如果没有设置Logger, 则启动时使用默认的kisDefaultLog对象
	if Logger() == nil {
		SetLogger(&kisDefaultLog{})
	}
}

这里在init()初始化方法中,会判断目前是否已经有设置全局的Logger对象,如果没有,KisFlow会默认选择kisDefaultLog 作为全局Logger日志对象。

2.1.3 单元测试KisLogger

现在,我们先不针对KisLogger做过多的方法开发,我们优先将现有的程序跑起来,做一个单元测试来测试创建一个KisLogger

kis-flow/test/kis_log_test.go

代码语言:go
复制
package test

import (
	"context"
	"kis-flow/log"
	"testing"
)

func TestKisLogger(t *testing.T) {
	ctx := context.Background()

	log.Logger().InfoFX(ctx, "TestKisLogger InfoFX")
	log.Logger().ErrorFX(ctx, "TestKisLogger ErrorFX")
	log.Logger().DebugFX(ctx, "TestKisLogger DebugFX")

	log.Logger().InfoF("TestKisLogger InfoF")
	log.Logger().ErrorF("TestKisLogger ErrorF")
	log.Logger().DebugF("TestKisLogger DebugF")
}

我们cdkis-flow/test/目录下执行单元测试指令:

代码语言:bash
复制
go test -test.v -test.paniconexit0 -test.run TestKisLogger

得到结果如下:

代码语言:bash
复制
=== RUN   TestKisLogger
context.Background
TestKisLogger InfoFX
context.Background
TestKisLogger ErrorFX
context.Background
TestKisLogger DebugFX
TestKisLogger InfoF
TestKisLogger ErrorF
TestKisLogger DebugF
--- PASS: TestKisLogger (0.00s)
PASS
ok      kis-flow/test   0.509s

2.2 KisConfig

在KisFlow中,我们定义了三种核心模块,分别是KisFunction, KisFlow, KisConnector ,所以KisConfig也分别需要针对这三个模块进行定义,我们将全部有关KisConfig的代码都放在kis-flow/config/目录下。

代码语言:bash
复制
➜  kis-flow git:(master) ✗ tree
.
├── LICENSE
├── README.md
├── common/
│   └── 
├── example/
│   └── 
├── config/
│   ├──
├── test/
└── go.mod

2.2.1 KisFuncConfig 定义

KisFuncConfig在设计文档中的yaml文件形式如下:

代码语言:yaml
复制
kistype: func
fname: 测试KisFunction_S1
fmode: Save
source:
 name: 被校验的测试数据源1-用户订单维度
 must:
 - userid
 - orderid

option:
 cname: 测试KisConnector_1
 retry_times: 3
 retry_duration: 500
 default_params:
 default1: default1_param
 default2: default2_param

参数说明:

接下来我们根据上述的配置协议,来定义KisFunction的策略配置结构体,并且提供一些响应的初始化方法。 我们在项目文档中创建kis_func_config.go文件,在这里我们将需要的Config定义实现。

A. 结构体定义

kis-flow/config/kis_func_config.go

代码语言:go
复制
package config

import (
	"kis-flow/common"
	"kis-flow/log"
)

// FParam 在当前Flow中Function定制固定配置参数类型
type FParam map[string]string

// KisSource 表示当前Function的业务源
type KisSource struct {
	Name string   `yaml:"name"` //本层Function的数据源描述
	Must []string `yaml:"must"` //source必传字段
}

// KisFuncOption 可选配置
type KisFuncOption struct {
	CName        string `yaml:"cname"`           //连接器Connector名称
	RetryTimes   int    `yaml:"retry_times"`     //选填,Function调度重试(不包括正常调度)最大次数
	RetryDuriton int    `yaml:"return_duration"` //选填,Function调度每次重试最大时间间隔(单位:ms)
	Params       FParam `yaml:"default_params"`  //选填,在当前Flow中Function定制固定配置参数
}

// KisFuncConfig 一个KisFunction策略配置
type KisFuncConfig struct {
	KisType string        `yaml:"kistype"`
	FName   string        `yaml:"fname"`
	FMode   string        `yaml:"fmode"`
	Source  KisSource     `yaml:"source"`
	Option  KisFuncOption `yaml:"option"`
}

这里KisFuncConfig是相关结构体,其中 FParamKisSourceKisFuncOption均为一些相关的参数类型。

B. 相关方法定义

下面我们先简单的提供创建KisFuncConfig的构造方法。

kis-flow/config/kis_func_config.go

代码语言:go
复制
// NewFuncConfig 创建一个Function策略配置对象, 用于描述一个KisFunction信息
func NewFuncConfig(funcName string, mode common.KisMode, source *KisSource, option *KisFuncOption) *KisFuncConfig {
     config := new(KisFuncConfig)
     config.FName = funcName

     if source == nil {
         log.Logger().ErrorF("funcName NewConfig Error, source is nil, funcName = %s\n", funcName)
         return nil
     }

     config.Source = *source
     config.FMode = string(mode)

     //FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系
     if mode == common.S || mode == common.L {
             if option == nil {
                   log.Logger().ErrorF("Funcion S/L need option->Cid\n")
                   return nil
             } else if option.CName == "" {
                   log.Logger().ErrorF("Funcion S/L need option->Cid\n")
                   return nil
             }
       }

      if option != nil {
           config.Option = *option
      }

     return config
}

上述代码中提到了common.Scommon.L两个枚举类型,这是我们针对KisFunction提供的五种类型的枚举值,我们可以将他们定义在 kis-flow/common/const.go文件中。

kis-flow/common/const.go

代码语言:go
复制
package common

type KisMode string

const (
	// V 为校验特征的KisFunction, 
    // 主要进行数据的过滤,验证,字段梳理,幂等等前置数据处理
	V KisMode = "Verify"

	// S 为存储特征的KisFunction, 
    // S会通过NsConnector进行将数据进行存储,数据的临时声明周期为NsWindow
	S KisMode = "Save"

	// L 为加载特征的KisFunction,
    // L会通过KisConnector进行数据加载,通过该Function可以从逻辑上与对应的S Function进行并流
	L KisMode = "Load"

	// C 为计算特征的KisFunction, 
    // C会通过KisFlow中的数据计算,生成新的字段,将数据流传递给下游S进行存储,或者自己也已直接通过KisConnector进行存储
	C KisMode = "Calculate"

	// E 为扩展特征的KisFunction,
    // 作为流式计算的自定义特征Function,如,Notify 调度器触发任务的消息发送,删除一些数据,重置状态等。
	E KisMode = "Expand"
)

如果fmodeSave或者Load说明这个function有查询库或者存储数据的行为,那么这个Function就需要关联一个KisConnector,那么CName就需要传递进来。

C. 创建KisFuncConfig单元测试

现在,我们先不针对KisFuncConfig做过多的方法开发,我们优先将现有的程序跑起来,做一个单元测试来测试创建一个KisFuncConfig

kis-flow/test/kis_config_test.gofunc TestNewFuncConfig(t *testing.T) { source := config.KisSource{ Name: "公众号抖音商城户订单数据", Must: []string{"order_id", "user_id"}, }option := config.KisFuncOption{ CName: "connectorName1", RetryTimes: 3, RetryDuriton: 300, Params: config.FParam{ "param1": "value1", "param2": "value2", }, } myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option) log.Logger().InfoF("funcName1: %+v\n", myFunc1)}

我们cdkis-flow/test/目录下执行单元测试指令:

代码语言:bash
复制
go test -test.v -test.paniconexit0 -test.run TestNewFuncConfig

得到结果如下:

代码语言:bash
复制
=== RUN   TestNewFuncConfig
funcName1: &{KisType: FName:funcName1 FMode:Save Source:{Name:公众号抖音商城户订单数据 Must:[order_id user_id]} Option:{CName:connectorName1 RetryTimes:3 RetryDuriton:300 Params:map[param1:value1 param2:value2]}}

--- PASS: TestNewFuncConfig (0.00s)
PASS
ok      kis-flow/test   0.545s

好了,现在最简单的KisFuncConfig的策略创建基本完成了。

2.2.2 KisFlowConfig 定义

KisFlowConfig在设计文档中的yaml文件形式如下:

代码语言:yaml
复制
kistype: flow
status: 1
flow_name: MyFlow1
flows:
  - fname: 测试PrintInput
    params:
      args1: value1
      args2: value2
  - fname: 测试KisFunction_S1
  - fname: 测试PrintInput
    params:
      args1: value11
      args2: value22
      default2: newDefault
  - fname: 测试PrintInput
  - fname: 测试KisFunction_S1
    params:
      my_user_param1: ffffffxxxxxx
  - fname: 测试PrintInput

参数说明:

A. 结构体定义

接下来我们根据上述的配置协议,来定义KisFlow的策略配置结构体,并且提供一些响应的初始化方法。 我们在项目文档中创建kis_flow_config.go文件,在这里我们将需要的Config定义实现。

kis-flow/config/kis_flow_config.go

代码语言:go
复制
package config

import "kis-flow/common"

// KisFlowFunctionParam 一个Flow配置中Function的Id及携带固定配置参数
type KisFlowFunctionParam struct {
	FuncName string `yaml:"fname"`  //必须
	Params   FParam `yaml:"params"` //选填,在当前Flow中Function定制固定配置参数
}

// KisFlowConfig 用户贯穿整条流式计算上下文环境的对象
type KisFlowConfig struct {
	KisType  string                 `yaml:"kistype"`
	Status   int                    `yaml:"status"`
	FlowName string                 `yaml:"flow_name"`
	Flows    []KisFlowFunctionParam `yaml:"flows"`
}

这里提供了一个新的参数类型 KisFlowFunctionParam ,这个表示配置KisFlow的时候,在调度的时候,flow默认传递当前被调度Function的自定义默认参数,如果不需要可以不添加此参数。

B. 相关方法定义

提供一个新建KisFlowConfig的构造方法。

kis-flow/config/kis_flow_config.go

代码语言:go
复制
// NewFlowConfig 创建一个Flow策略配置对象, 用于描述一个KisFlow信息
func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
	config := new(KisFlowConfig)
	config.FlowName = flowName
	config.Flows = make([]KisFlowFunctionParam, 0)

	config.Status = int(enable)

	return config
}

// AppendFunctionConfig 添加一个Function Config 到当前Flow中
func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) {
	fConfig.Flows = append(fConfig.Flows, params)
}

有关flow携带的Function配置,这里我们采用通过AppendFunctionConfig动态的去添加,目的是为了,今后可能有关kisflow的配置会从数据库/动态远程配置等中提取,那么就需要动态的将配置组合进来。

C. KisFlowConfig单元测试

同样,我们简单些一个单元测试来测试KisFlowConfig的创建。

kis-flow/test/kis_config_test.gofunc TestNewFlowConfig(t *testing.T) {flowFuncParams1 := config.KisFlowFunctionParam{ FuncName: "funcName1", Params: config.FParam{ "flowSetFunParam1": "value1", "flowSetFunParam2": "value2", }, } flowFuncParams2 := config.KisFlowFunctionParam{ FuncName: "funcName2", Params: config.FParam{ "default": "value1", }, } myFlow1 := config.NewFlowConfig("flowName1", common.FlowEnable) myFlow1.AppendFunctionConfig(flowFuncParams1) myFlow1.AppendFunctionConfig(flowFuncParams2) log.Logger().InfoF("myFlow1: %+v\n", myFlow1)}

我们cdkis-flow/test/目录下执行单元测试指令:

代码语言:go
复制
$ go test -test.v -test.paniconexit0 -test.run TestNewFlowConfig

得到结果如下:

代码语言:bash
复制
=== RUN   TestNewFlowConfig
myFlow1: &{KisType: Status:1 FlowName:flowName1 Flows:[{FuncName:funcName1 Params:map[flowSetFunParam1:value1 flowSetFunParam2:value2]} {FuncName:funcName2 Params:map[default:value1]}]}

--- PASS: TestNewFlowConfig (0.00s)
PASS
ok      kis-flow/test   0.251s

2.2.3 KisConnConfig

KisConnConfig在设计文档中的yaml文件形式如下:

代码语言:yaml
复制
kistype: conn
cname: 测试KisConnector_1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: userid_orderid_option
params:
  args1: value1
  args2: value2
load: null
save:
  - 测试KisFunction_S1

A. 结构体定义

接下来我们根据上述的配置协议,来定义KisConnector的策略配置结构体,并且提供一些响应的初始化方法。 我们在项目文档中创建kis_conn_config.go文件,在这里我们将需要的Config定义实现。

kis-flow/config/kis_conn_config.go

代码语言:go
复制
package config

import (
	"errors"
	"fmt"
	"kis-flow/common"
)

// KisConnConfig KisConnector 策略配置
type KisConnConfig struct {
	//配置类型
	KisType string `yaml:"kistype"`
	//唯一描述标识
	CName string `yaml:"cname"`
	//基础存储媒介地址
	AddrString string `yaml:"addrs"`
	//存储媒介引擎类型"Mysql" "Redis" "Kafka"等
	Type common.KisConnType `yaml:"type"`
	//一次存储的标识:如Redis为Key名称、Mysql为Table名称,Kafka为Topic名称等
	Key string `yaml:"key"`
	//配置信息中的自定义参数
	Params map[string]string `yaml:"params"`
	//存储读取所绑定的NsFuncionID
	Load []string `yaml:"load"`
	Save []string `yaml:"save"`
}

B. 相关方法定义

kis-flow/config/kis_conn_config.go

代码语言:go
复制
// NewConnConfig 创建一个KisConnector策略配置对象, 用于描述一个KisConnector信息
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
	strategy := new(KisConnConfig)
	strategy.CName = cName
	strategy.AddrString = addr

	strategy.Type = t
	strategy.Key = key
	strategy.Params = param

	return strategy
}

// WithFunc Connector与Function进行关系绑定
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {

	switch common.KisMode(fConfig.FMode) {
	case common.S:
		cConfig.Save = append(cConfig.Save, fConfig.FName)
	case common.L:
		cConfig.Load = append(cConfig.Load, fConfig.FName)
	default:
		return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
	}

	return nil
}

这里也是通过提供WithFunc方法来动态的添加Conn和Function的关联关系 ###

C. KisConnConfig 单元测试 同样,我们简单些一个单元测试来测试KisConnConfig的创建。

kis-flow/test/kis_config_test.go

代码语言:go
复制
func TestNewConnConfig(t *testing.T) {

	source := config.KisSource{
		Name: "公众号抖音商城户订单数据",
		Must: []string{"order_id", "user_id"},
	}

	option := config.KisFuncOption{
		CName:        "connectorName1",
		RetryTimes:   3,
		RetryDuriton: 300,

		Params: config.FParam{
			"param1": "value1",
			"param2": "value2",
		},
	}

	myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)

	connParams := config.FParam{
		"param1": "value1",
		"param2": "value2",
	}

	myConnector1 := config.NewConnConfig("connectorName1", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)

	if err := myConnector1.WithFunc(myFunc1); err != nil {
		log.Logger().ErrorF("WithFunc err: %s\n", err.Error())
	}

	log.Logger().InfoF("myConnector1: %+v\n", myConnector1)
}

我们cdkis-fow/test/目录下执行单元测试指令:

代码语言:bash
复制
$ go test -test.v -test.paniconexit0 -test.run TestNewConnConfig

得到结果如下:

代码语言:bash
复制
=== RUN   TestNewConnConfig
myConnector1: &{KisType: CName:connectorName1 AddrString:0.0.0.0:9987,0.0.0.0:9997 Type:redis Key:key Params:map[param1:value1 param2:value2] Load:[] Save:[funcName1]}

--- PASS: TestNewConnConfig (0.00s)
PASS
ok      kis-flow/test   0.481s

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2. V0.1-项目构建及基础模块定义
    • 2.0 项目构建
      • 2.0.1 创建项目目录
      • 2.0.1 创建go.mod
    • 2.1 KisLogger
      • 2.1.1 Logger抽象接口
      • 2.1.2 默认的日志对象KisDefaultLogger
      • 2.1.3 单元测试KisLogger
    • 2.2 KisConfig
      • 2.2.1 KisFuncConfig 定义
        • A. 结构体定义
        • B. 相关方法定义
        • C. 创建KisFuncConfig单元测试
      • 2.2.2 KisFlowConfig 定义
        • A. 结构体定义
        • B. 相关方法定义
        • C. KisFlowConfig单元测试
      • 2.2.3 KisConnConfig
        • A. 结构体定义
        • B. 相关方法定义
    相关产品与服务
    云数据库 Redis®
    腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档