前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:uber/cadence(2)

golang源码分析:uber/cadence(2)

作者头像
golangLeetcode
发布2023-08-09 15:13:20
2190
发布2023-08-09 15:13:20
举报

下面我能看下cadence的helloword例子的源码,它包含两个文件,第一个文件是启动程序,第二个定义了workflow和activity

代码语言:javascript
复制
package main

import (
  "flag"
  "time"

  "github.com/pborman/uuid"
  "go.uber.org/cadence/client"
  "go.uber.org/cadence/worker"

  common "exp1/common"
)

// This needs to be done as part of a bootstrap step when the process starts.
// The workers are supposed to be long running.
func startWorkers(h *common.SampleHelper) {
  // Configure worker options.
  workerOptions := worker.Options{
    MetricsScope: h.WorkerMetricScope,
    Logger:       h.Logger,
    FeatureFlags: client.FeatureFlags{
      WorkflowExecutionAlreadyCompletedErrorEnabled: true,
    },
  }
  h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
}

func startShadower(h *common.SampleHelper) {
  workerOptions := worker.Options{
    MetricsScope:       h.WorkerMetricScope,
    Logger:             h.Logger,
    EnableShadowWorker: true,
    ShadowOptions: worker.ShadowOptions{
      WorkflowTypes:  []string{helloWorldWorkflowName},
      WorkflowStatus: []string{"Completed"},
      ExitCondition: worker.ShadowExitCondition{
        ShadowCount: 10,
      },
    },
  }
  h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
}

func startWorkflow(h *common.SampleHelper) {
  workflowOptions := client.StartWorkflowOptions{
    ID:                              "helloworld_" + uuid.New(),
    TaskList:                        ApplicationName,
    ExecutionStartToCloseTimeout:    time.Minute,
    DecisionTaskStartToCloseTimeout: time.Minute,
  }
  h.StartWorkflow(workflowOptions, helloWorldWorkflowName, "Cadence")
}

func registerWorkflowAndActivity(
  h *common.SampleHelper,
) {
  h.RegisterWorkflowWithAlias(helloWorldWorkflow, helloWorldWorkflowName)
  h.RegisterActivity(helloWorldActivity)
}

func main() {
  var mode string
  flag.StringVar(&mode, "m", "trigger", "Mode is worker, trigger or shadower.")
  flag.Parse()

  var h common.SampleHelper
  h.SetupServiceConfig()

  switch mode {
  case "worker":
    registerWorkflowAndActivity(&h)
    startWorkers(&h)

    // The workers are supposed to be long running process that should not exit.
    // Use select{} to block indefinitely for samples, you can quit by CMD+C.
    select {}
  case "shadower":
    registerWorkflowAndActivity(&h)
    startShadower(&h)

    select {}
  case "trigger":
    startWorkflow(&h)
  }
}
代码语言:javascript
复制
package main

import (
  "context"
  "time"

  "go.uber.org/cadence/activity"
  "go.uber.org/cadence/workflow"
  "go.uber.org/zap"
)

/**
 * This is the hello world workflow sample.
 */

// ApplicationName is the task list for this sample
const ApplicationName = "helloWorldGroup"

const helloWorldWorkflowName = "helloWorldWorkflow"

// helloWorkflow workflow decider
func helloWorldWorkflow(ctx workflow.Context, name string) error {
  ao := workflow.ActivityOptions{
    ScheduleToStartTimeout: time.Minute,
    StartToCloseTimeout:    time.Minute,
    HeartbeatTimeout:       time.Second * 20,
  }
  ctx = workflow.WithActivityOptions(ctx, ao)

  logger := workflow.GetLogger(ctx)
  logger.Info("helloworld workflow started")
  var helloworldResult string
  err := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult)
  if err != nil {
    logger.Error("Activity failed.", zap.Error(err))
    return err
  }

  // Adding a new activity to the workflow will result in a non-determinstic change for the workflow
  // Please check https://cadenceworkflow.io/docs/go-client/workflow-versioning/ for more information
  //
  // Un-commenting the following code and the TestReplayWorkflowHistoryFromFile in replay_test.go
  // will fail due to the non-determinstic change
  //
  // If you have a completed workflow execution without the following code and run the
  // TestWorkflowShadowing in shadow_test.go or start the worker in shadow mode (using -m shadower)
  // those two shadowing check will also fail due to the non-deterministic change
  //
  // err := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult)
  // if err != nil {
  //   logger.Error("Activity failed.", zap.Error(err))
  //   return err
  // }

  logger.Info("Workflow completed.", zap.String("Result", helloworldResult))

  return nil
}

func helloWorldActivity(ctx context.Context, name string) (string, error) {
  logger := activity.GetLogger(ctx)
  logger.Info("helloworld activity started")
  return "Hello " + name + "!", nil
}

启动文件里首先定义了相关的各种配置,包括日志配和监控配置,监控采用的是prometheus。设置配置的函数是h.SetupServiceConfig(),位于common/sample_helper.go

代码语言:javascript
复制
type (
  // SampleHelper class for workflow sample helper.
  SampleHelper struct {
    Service            workflowserviceclient.Interface
    WorkerMetricScope  tally.Scope
    ServiceMetricScope tally.Scope
    Logger             *zap.Logger
    Config             Configuration
    Builder            *WorkflowClientBuilder
    DataConverter      encoded.DataConverter
    CtxPropagators     []workflow.ContextPropagator
    workflowRegistries []registryOption
    activityRegistries []registryOption
    Tracer             opentracing.Tracer


    configFile string
  }    
代码语言:javascript
复制
// Configuration for running samples.
  Configuration struct {
    DomainName      string                    `yaml:"domain"`
    ServiceName     string                    `yaml:"service"`
    HostNameAndPort string                    `yaml:"host"`
    Prometheus      *prometheus.Configuration `yaml:"prometheus"`
  }
代码语言:javascript
复制
registryOption struct {
    registry interface{}
    alias    string
  }

其中配置文件使用的是config/development.yaml

代码语言:javascript
复制
func (h *SampleHelper) SetupServiceConfig() {
          if h.configFile == "" {
    h.configFile = defaultConfigFile
        if err := yaml.Unmarshal(configData, &h.Config); err != nil {
        logger, err := zap.NewDevelopment()
        if h.Config.Prometheus != nil {
    reporter, err := h.Config.Prometheus.NewReporter(
      prometheus.ConfigurationOptions{
        Registry: prom.NewRegistry(),
        h.Builder = NewBuilder(logger).
    SetHostPort(h.Config.HostNameAndPort).
    SetDomain(h.Config.DomainName).
    SetMetricsScope(h.ServiceMetricScope).
    SetDataConverter(h.DataConverter).
    SetTracer(h.Tracer).
    SetContextPropagators(h.CtxPropagators)
        service, err := h.Builder.BuildServiceClient()
        h.Service = service
        domainClient, _ := h.Builder.BuildCadenceDomainClient()
  _, err = domainClient.Describe(context.Background(), h.Config.DomainName)
          h.workflowRegistries = make([]registryOption, 0, 1)
  h.activityRegistries = make([]registryOption, 0, 1)  
代码语言:javascript
复制
const (
  defaultConfigFile = "config/development.yaml"

然后是注册workflow和activity 即registerWorkflowAndActivity(&h)

代码语言:javascript
复制
h.RegisterWorkflowWithAlias(helloWorldWorkflow, helloWorldWorkflowName)

common/sample_helper.go

代码语言:javascript
复制
func (h *SampleHelper) RegisterWorkflowWithAlias(workflow interface{}, alias string) {
          registryOption := registryOption{
    registry: workflow,
    alias:    alias,
  }
        h.workflowRegistries = append(h.workflowRegistries, registryOption)
    h.RegisterActivity(helloWorldActivity)
代码语言:javascript
复制
func (h *SampleHelper) RegisterActivity(activity interface{}) {
  h.RegisterActivityWithAlias(activity, "")
}      
代码语言:javascript
复制
func (h *SampleHelper) RegisterActivityWithAlias(activity interface{}, alias string) {
          registryOption := registryOption{
    registry: activity,
    alias:    alias,
  }
        h.activityRegistries = append(h.activityRegistries, registryOption)

前面workflow.go里定义我们的workflow和activity

代码语言:javascript
复制
func helloWorldWorkflow(ctx workflow.Context, name string) error {
          ao := workflow.ActivityOptions{
    ScheduleToStartTimeout: time.Minute,
    StartToCloseTimeout:    time.Minute,
    HeartbeatTimeout:       time.Second * 20,
  }
        err := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult)  
代码语言:javascript
复制
func helloWorldActivity(ctx context.Context, name string) (string, error) {

go.uber.org/cadence@v0.19.1/workflow/workflow.go

代码语言:javascript
复制
func ExecuteActivity(ctx Context, activity interface{}, args ...interface{}) Future {
  return internal.ExecuteActivity(ctx, activity, args...)
}

指定完成后就可以启动任务了startWorkers(&h)

代码语言:javascript
复制
workflowOptions := client.StartWorkflowOptions{
    ID:                              "helloworld_" + uuid.New(),
    TaskList:                        ApplicationName,
    ExecutionStartToCloseTimeout:    time.Minute,
    DecisionTaskStartToCloseTimeout: time.Minute,
  }
    h.StartWorkflow(workflowOptions, helloWorldWorkflowName, "Cadence")
代码语言:javascript
复制
func startWorkers(h *common.SampleHelper) {
      workerOptions := worker.Options{
    MetricsScope: h.WorkerMetricScope,
    Logger:       h.Logger,
    FeatureFlags: client.FeatureFlags{
      WorkflowExecutionAlreadyCompletedErrorEnabled: true,
    },
  }
      h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)

common/sample_helper.go

代码语言:javascript
复制
func (h *SampleHelper) StartWorkers(domainName string, groupName string, options worker.Options) {
        worker := worker.New(h.Service, domainName, groupName, options)
        h.registerWorkflowAndActivity(worker)
        err := worker.Start()
代码语言:javascript
复制
func (h *SampleHelper) registerWorkflowAndActivity(worker worker.Worker) {
        for _, w := range h.workflowRegistries {
    if len(w.alias) == 0 {
      worker.RegisterWorkflow(w.registry)
    } else {
      worker.RegisterWorkflowWithOptions(w.registry, workflow.RegisterOptions{Name: w.alias})
    }
  }
  for _, act := range h.activityRegistries {
    if len(act.alias) == 0 {
      worker.RegisterActivity(act.registry)
    } else {
      worker.RegisterActivityWithOptions(act.registry, activity.RegisterOptions{Name: act.alias})
    }
  }

go.uber.org/cadence@v0.19.1/worker/worker.go

代码语言:javascript
复制
func New(
  service workflowserviceclient.Interface,
  domain string,
  taskList string,
  options Options,
) Worker {
  return internal.NewWorker(service, domain, taskList, options)
}
代码语言:javascript
复制
type (
  // Worker hosts workflow and activity implementations.
  // Use worker.New(...) to create an instance.
  Worker interface {
    Registry


    // Start starts the worker in a non-blocking fashion
    Start() error
    // Run is a blocking start and cleans up resources when killed
    // returns error only if it fails to start the worker
    Run() error
    // Stop cleans up any resources opened by worker
    Stop()
  }
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-06-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Prometheus 监控服务
Prometheus 监控服务(TencentCloud Managed Service for Prometheus,TMP)是基于开源 Prometheus 构建的高可用、全托管的服务,与腾讯云容器服务(TKE)高度集成,兼容开源生态丰富多样的应用组件,结合腾讯云可观测平台-告警管理和 Prometheus Alertmanager 能力,为您提供免搭建的高效运维能力,减少开发及运维成本。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档