下面我能看下cadence的helloword例子的源码,它包含两个文件,第一个文件是启动程序,第二个定义了workflow和activity
package main
import (
"flag"
"time"
"github.com/pborman/uuid"
"go.uber.org/cadence/client"
"go.uber.org/cadence/worker"
common "exp1/common"
)
// This needs to be done as part of a bootstrap step when the process starts.
// The workers are supposed to be long running.
func startWorkers(h *common.SampleHelper) {
// Configure worker options.
workerOptions := worker.Options{
MetricsScope: h.WorkerMetricScope,
Logger: h.Logger,
FeatureFlags: client.FeatureFlags{
WorkflowExecutionAlreadyCompletedErrorEnabled: true,
},
}
h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
}
func startShadower(h *common.SampleHelper) {
workerOptions := worker.Options{
MetricsScope: h.WorkerMetricScope,
Logger: h.Logger,
EnableShadowWorker: true,
ShadowOptions: worker.ShadowOptions{
WorkflowTypes: []string{helloWorldWorkflowName},
WorkflowStatus: []string{"Completed"},
ExitCondition: worker.ShadowExitCondition{
ShadowCount: 10,
},
},
}
h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
}
func startWorkflow(h *common.SampleHelper) {
workflowOptions := client.StartWorkflowOptions{
ID: "helloworld_" + uuid.New(),
TaskList: ApplicationName,
ExecutionStartToCloseTimeout: time.Minute,
DecisionTaskStartToCloseTimeout: time.Minute,
}
h.StartWorkflow(workflowOptions, helloWorldWorkflowName, "Cadence")
}
func registerWorkflowAndActivity(
h *common.SampleHelper,
) {
h.RegisterWorkflowWithAlias(helloWorldWorkflow, helloWorldWorkflowName)
h.RegisterActivity(helloWorldActivity)
}
func main() {
var mode string
flag.StringVar(&mode, "m", "trigger", "Mode is worker, trigger or shadower.")
flag.Parse()
var h common.SampleHelper
h.SetupServiceConfig()
switch mode {
case "worker":
registerWorkflowAndActivity(&h)
startWorkers(&h)
// The workers are supposed to be long running process that should not exit.
// Use select{} to block indefinitely for samples, you can quit by CMD+C.
select {}
case "shadower":
registerWorkflowAndActivity(&h)
startShadower(&h)
select {}
case "trigger":
startWorkflow(&h)
}
}
package main
import (
"context"
"time"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
)
/**
* This is the hello world workflow sample.
*/
// ApplicationName is the task list for this sample
const ApplicationName = "helloWorldGroup"
const helloWorldWorkflowName = "helloWorldWorkflow"
// helloWorkflow workflow decider
func helloWorldWorkflow(ctx workflow.Context, name string) error {
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
HeartbeatTimeout: time.Second * 20,
}
ctx = workflow.WithActivityOptions(ctx, ao)
logger := workflow.GetLogger(ctx)
logger.Info("helloworld workflow started")
var helloworldResult string
err := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult)
if err != nil {
logger.Error("Activity failed.", zap.Error(err))
return err
}
// Adding a new activity to the workflow will result in a non-determinstic change for the workflow
// Please check https://cadenceworkflow.io/docs/go-client/workflow-versioning/ for more information
//
// Un-commenting the following code and the TestReplayWorkflowHistoryFromFile in replay_test.go
// will fail due to the non-determinstic change
//
// If you have a completed workflow execution without the following code and run the
// TestWorkflowShadowing in shadow_test.go or start the worker in shadow mode (using -m shadower)
// those two shadowing check will also fail due to the non-deterministic change
//
// err := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult)
// if err != nil {
// logger.Error("Activity failed.", zap.Error(err))
// return err
// }
logger.Info("Workflow completed.", zap.String("Result", helloworldResult))
return nil
}
func helloWorldActivity(ctx context.Context, name string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("helloworld activity started")
return "Hello " + name + "!", nil
}
启动文件里首先定义了相关的各种配置,包括日志配和监控配置,监控采用的是prometheus。设置配置的函数是h.SetupServiceConfig(),位于common/sample_helper.go
type (
// SampleHelper class for workflow sample helper.
SampleHelper struct {
Service workflowserviceclient.Interface
WorkerMetricScope tally.Scope
ServiceMetricScope tally.Scope
Logger *zap.Logger
Config Configuration
Builder *WorkflowClientBuilder
DataConverter encoded.DataConverter
CtxPropagators []workflow.ContextPropagator
workflowRegistries []registryOption
activityRegistries []registryOption
Tracer opentracing.Tracer
configFile string
}
// Configuration for running samples.
Configuration struct {
DomainName string `yaml:"domain"`
ServiceName string `yaml:"service"`
HostNameAndPort string `yaml:"host"`
Prometheus *prometheus.Configuration `yaml:"prometheus"`
}
registryOption struct {
registry interface{}
alias string
}
其中配置文件使用的是config/development.yaml
func (h *SampleHelper) SetupServiceConfig() {
if h.configFile == "" {
h.configFile = defaultConfigFile
if err := yaml.Unmarshal(configData, &h.Config); err != nil {
logger, err := zap.NewDevelopment()
if h.Config.Prometheus != nil {
reporter, err := h.Config.Prometheus.NewReporter(
prometheus.ConfigurationOptions{
Registry: prom.NewRegistry(),
h.Builder = NewBuilder(logger).
SetHostPort(h.Config.HostNameAndPort).
SetDomain(h.Config.DomainName).
SetMetricsScope(h.ServiceMetricScope).
SetDataConverter(h.DataConverter).
SetTracer(h.Tracer).
SetContextPropagators(h.CtxPropagators)
service, err := h.Builder.BuildServiceClient()
h.Service = service
domainClient, _ := h.Builder.BuildCadenceDomainClient()
_, err = domainClient.Describe(context.Background(), h.Config.DomainName)
h.workflowRegistries = make([]registryOption, 0, 1)
h.activityRegistries = make([]registryOption, 0, 1)
const (
defaultConfigFile = "config/development.yaml"
然后是注册workflow和activity 即registerWorkflowAndActivity(&h)
h.RegisterWorkflowWithAlias(helloWorldWorkflow, helloWorldWorkflowName)
common/sample_helper.go
func (h *SampleHelper) RegisterWorkflowWithAlias(workflow interface{}, alias string) {
registryOption := registryOption{
registry: workflow,
alias: alias,
}
h.workflowRegistries = append(h.workflowRegistries, registryOption)
h.RegisterActivity(helloWorldActivity)
func (h *SampleHelper) RegisterActivity(activity interface{}) {
h.RegisterActivityWithAlias(activity, "")
}
func (h *SampleHelper) RegisterActivityWithAlias(activity interface{}, alias string) {
registryOption := registryOption{
registry: activity,
alias: alias,
}
h.activityRegistries = append(h.activityRegistries, registryOption)
前面workflow.go里定义我们的workflow和activity
func helloWorldWorkflow(ctx workflow.Context, name string) error {
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
HeartbeatTimeout: time.Second * 20,
}
err := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult)
func helloWorldActivity(ctx context.Context, name string) (string, error) {
go.uber.org/cadence@v0.19.1/workflow/workflow.go
func ExecuteActivity(ctx Context, activity interface{}, args ...interface{}) Future {
return internal.ExecuteActivity(ctx, activity, args...)
}
指定完成后就可以启动任务了startWorkers(&h)
workflowOptions := client.StartWorkflowOptions{
ID: "helloworld_" + uuid.New(),
TaskList: ApplicationName,
ExecutionStartToCloseTimeout: time.Minute,
DecisionTaskStartToCloseTimeout: time.Minute,
}
h.StartWorkflow(workflowOptions, helloWorldWorkflowName, "Cadence")
func startWorkers(h *common.SampleHelper) {
workerOptions := worker.Options{
MetricsScope: h.WorkerMetricScope,
Logger: h.Logger,
FeatureFlags: client.FeatureFlags{
WorkflowExecutionAlreadyCompletedErrorEnabled: true,
},
}
h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
common/sample_helper.go
func (h *SampleHelper) StartWorkers(domainName string, groupName string, options worker.Options) {
worker := worker.New(h.Service, domainName, groupName, options)
h.registerWorkflowAndActivity(worker)
err := worker.Start()
func (h *SampleHelper) registerWorkflowAndActivity(worker worker.Worker) {
for _, w := range h.workflowRegistries {
if len(w.alias) == 0 {
worker.RegisterWorkflow(w.registry)
} else {
worker.RegisterWorkflowWithOptions(w.registry, workflow.RegisterOptions{Name: w.alias})
}
}
for _, act := range h.activityRegistries {
if len(act.alias) == 0 {
worker.RegisterActivity(act.registry)
} else {
worker.RegisterActivityWithOptions(act.registry, activity.RegisterOptions{Name: act.alias})
}
}
go.uber.org/cadence@v0.19.1/worker/worker.go
func New(
service workflowserviceclient.Interface,
domain string,
taskList string,
options Options,
) Worker {
return internal.NewWorker(service, domain, taskList, options)
}
type (
// Worker hosts workflow and activity implementations.
// Use worker.New(...) to create an instance.
Worker interface {
Registry
// Start starts the worker in a non-blocking fashion
Start() error
// Run is a blocking start and cleans up resources when killed
// returns error only if it fails to start the worker
Run() error
// Stop cleans up any resources opened by worker
Stop()
}
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!