1. 概述
本文主要分享 Elastic-Job-Lite 作业事件追踪。
另外,Elastic-Job-Cloud 作业事件追踪 和 Elastic-Job-Lite 基本类似,不单独开一篇文章,记录在该文章里。如果你对 Elastic-Job-Cloud 暂时不感兴趣,可以跳过相应部分。
Elastic-Job 提供了事件追踪功能,可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。Elastic-Job 目前订阅两种事件,基于关系型数据库记录事件。
涉及到主要类的类图如下( 打开大图 ):
作业事件:粉色的类。
作业事件总线:黄色的类。
作业事件监听器:蓝色的类。
2. 作业事件总线
JobEventBus,作业事件总线,提供了注册监听器、发布事件两个方法。
创建 JobEventBus 代码如下:
JobEventBus 基于Google Guava EventBus,在《Sharding-JDBC 源码分析 —— SQL 执行》「4.1 EventBus」有详细分享。这里要注意的是 AsyncEventBus( 异步事件总线 ),注册在其上面的监听器是异步监听执行,事件发布无需阻塞等待监听器执行完逻辑,所以对性能不存在影响。
使用 JobEventConfiguration( 作业事件配置 ) 创建事件监听器,调用 #register() 方法进行注册监听。
该方法是私有(private )方法,只能使用 JobEventConfiguration 创建事件监听器注册。当不传递该配置时,意味着不开启事件追踪功能。
发布作业事件
发布作业事件( JobEvent ) 代码如下:
在 Elaistc-Job-Lite 里,LiteJobFacade 对JobEventBus#post(...) 进行封装,提供给作业执行器( AbstractElasticJobExecutor )调用( Elastic-Job-Cloud 实际也进行了封装 ):
TaskContext 通过#from(...) 方法,对作业任务ID( taskId ) 解析,获取任务上下文。TaskContext 代码注释很完整,点击链接直接查看。
3. 作业事件
目前有两种作业事件( JobEvent ):
JobStatusTraceEvent,作业状态追踪事件。
JobExecutionEvent,作业执行追踪事件。
本小节分享两方面:
作业事件发布时机。
Elastic-Job 基于关系型数据库记录事件的表结构。
3.1 作业状态追踪事件
JobStatusTraceEvent,作业状态追踪事件。
代码如下:
ExecutionType,执行类型。
Source,任务来源。
State,任务执行状态。
Elastic-Job-Lite 使用 TASK_STAGING、TASK_RUNNING、TASK_FINISHED、TASK_ERROR 四种执行状态。
Elastic-Job-Cloud 使用所有执行状态。
关系数据库表 JOB_STATUS_TRACE_LOG 结构如下:
Elastic-Job-Lite 一次作业执行记录如下( 打开大图 ):
JobStatusTraceEvent 在 Elastic-Job-Lite 发布时机:
State.TASK_STAGING:
State.TASK_RUNNING:
State.TASK_FINISHED、State.TASK_ERROR【第一种】:
State.TASK_FINISHED、State.TASK_ERROR【第二种】:
JobStatusTraceEvent 在 Elastic-Job-Cloud 发布时机:
Elastic-Job-Cloud 除了上文 Elastic-Job-Lite 会多一个场景下记录作业状态追踪事件( State.TASK_STAGING ),实现代码如下:
任务提交调度服务( TaskLaunchScheduledService )提交任务时,记录发布作业状态追踪事件(State.TASK_STAGING)。
Elastic-Job-Cloud 根据 Mesos Master 通知任务状态变更,记录多种作业状态追踪事件,实现代码如下:
3.2 作业执行追踪事件
JobExecutionEvent,作业执行追踪事件。
代码如下:
ExecutionSource,执行来源
关系数据库表JOB_EXECUTION_LOG 结构如下:
Elastic-Job-Lite 一次作业多作业分片项执行记录如下( 打开大图 ):
JobExecutionEvent 在 Elastic-Job-Lite 发布时机:
JobExecutionEvent 在 Elastic-Job-Cloud 发布时机:
和 Elastic-Job-Cloud 一致。
3.3 作业事件数据库存储
JobEventRdbStorage,作业事件数据库存储。
创建 JobEventRdbStorage 代码如下:
调用#createJobExecutionTableAndIndexIfNeeded(...) 创建 JOB_EXECUTION_LOG 表和索引。
调用 #createJobStatusTraceTableAndIndexIfNeeded(...) 创建 JOB_STATUS_TRACE_LOG 表和索引。
存储 JobStatusTraceEvent 代码如下:
originalTaskId,原任务作业ID。
Elastic-Job-Lite 暂未使用到该字段,存储空串( "" )。
Elastic-Job-Cloud 在作业失效转移场景下使用该字段,存储失效转移的任务作业ID。
存储 JobExecutionEvent 代码如下
作业分片项执行完成进行的是更新操作。
3.4 作业事件数据库查询
JobEventRdbSearch,作业事件数据库查询,提供给运维平台调用查询数据。感兴趣的同学点击链接直接查看。
4. 作业监听器
在上文我们看到,作业监听器通过传递作业事件配置( JobEventConfiguration )给作业事件总线( JobEventBus ) 进行创建监听器,并注册监听器到事件总线。
我们来看下 Elastic-Job 提供的基于关系数据库的事件配置实现。
JobEventRdbConfiguration,作业数据库事件配置。调用 #createJobEventListener()
创建作业事件数据库监听器( JobEventRdbListener )。
JobEventRdbListener,作业事件数据库监听器。实现代码如下:
通过 JobEventRdbStorage 存储作业事件到关系型数据库。
如何自定义作业监听器?
有些同学可能希望使用 ES 或者其他数据库存储作业事件,这个时候可以通过实现 JobEventConfiguration、JobEventListener 进行拓展。
Elastic-Job-Cloud JobEventConfiguration 怎么配置?
Elastic-Job-Cloud-Scheduler:从conf/elastic-job-cloud-scheduler.properties 配置文件读取如下属性,生成 JobEventConfiguration 配置对象。
event_trace_rdb_driver
event_trace_rdb_url
event_trace_rdb_username
event_trace_rdb_password
Elastic-Job-Cloud-Executor:通过接收到任务执行信息里读取JobEventConfiguration,实现代码如下:
666. 彩蛋
旁白君:瞎比比了这么长,能不能简单粗暴一点。
芋道君:是是是。
文章来源:掘金
作者:芋道源码
data:text/html;charset=UTF-8;base64,
5p625p6E5biI5a2m5Lmg5Lqk5rWB576k5Y+35pivNTc1NzUxODU0Cg==
领取专属 10元无门槛券
私享最新 技术干货