上一篇文章中,我们介绍了规则引擎的基本算法与使用:
我们看到,规则引擎的基础算法 Rete 算法其实是基于有向无环图的一种算法。
事实上,在实际工作生活中,并不是只有我们的逻辑推理是由有向无环图构成的,复杂的任务编排执行也可以被改造为有向无环图的形式。
下面来举个例子,我们需要构建一个博客文章发布系统,用户在这个博客系统中发布文章的流程很简单:
进一步,机器学习算法可能对文章内容进行自动修改,而人工审核可能对博客投稿的频道进行修改,虽然最终是否执行发布流程依赖于人工审核结果,但由于机器学习审核与人工审核是并发进行的,所以 3、4 两步存在竞争条件,需要复杂的加锁、判断逻辑。
我们看到,仅仅是上述四个步骤,就已经让我们的业务代码中出现了难以维护的加锁、判断逻辑,如果接下来又有新的需求:
当发布流程执行完成后,需要将机器学习算法模型计算结果推送给 C 部门。
那么,我们需要在上述流程中补充以下逻辑:
这个简单的需求,我们来画一个泳道图:
图中用虚线框住的部分存在竞争条件,可见,整个流程变得极为复杂,日常维护、问题排查、后续改进等等都难以进行。
那么,针对这样的复杂场景,有什么办法可以优化吗?
为什么一个看似简单的文章发布系统的例子实现起来却是如此复杂呢?
原因在于我们划分整个流程各步骤的粒度过粗,导致新的逻辑加入时难以应对。
同时,并发场景下,对于竞争条件的保护代码与业务代码耦合,造成了业务代码难以维护的问题。
只要有一个流程编排引擎,让他去处理流程各节点之间的依赖问题,就可以让我们仅仅将目光集中于业务,而不用去为缓存、加锁、判断等逻辑操心了。
首先,我们需要绘制出上述文章发布流程中各个任务节点构成的有向无环图:
经过流程编排,我们让后一个节点严格依赖前一个节点,将上述场景的泳道图改造为上述的有向无环图,整个文章发布流程是不是就十分简化了呢?
首先,我们需要一个类实例,实现整个编排引擎执行过程中各节点状态的记录:
package cn.techlog.testjava.main.drools.article_publish;
import java.util.Map;
public class PublishProcess {
private Integer preProcess;
private Integer pushMonitor;
private Integer receiveManualMonitor;
private Integer receiveMachineMonitor;
private Integer processManualMonitor;
private Integer processMachineMonitor;
private Map<String, String> machineMonitorResult;
public Integer getPreProcess() {
return preProcess;
}
public void setPreProcess(Integer preProcess) {
this.preProcess = preProcess;
}
public Integer getPushMonitor() {
return pushMonitor;
}
public void setPushMonitor(Integer pushMonitor) {
this.pushMonitor = pushMonitor;
}
public Integer getReceiveManualMonitor() {
return receiveManualMonitor;
}
public void setReceiveManualMonitor(Integer receiveManualMonitor) {
this.receiveManualMonitor = receiveManualMonitor;
}
public Integer getReceiveMachineMonitor() {
return receiveMachineMonitor;
}
public void setReceiveMachineMonitor(Integer receiveMachineMonitor) {
this.receiveMachineMonitor = receiveMachineMonitor;
}
public Integer getProcessManualMonitor() {
return processManualMonitor;
}
public void setProcessManualMonitor(Integer processManualMonitor) {
this.processManualMonitor = processManualMonitor;
}
public Integer getProcessMachineMonitor() {
return processMachineMonitor;
}
public void setProcessMachineMonitor(Integer processMachineMonitor) {
this.processMachineMonitor = processMachineMonitor;
}
public Map<String, String> getMachineMonitorResult() {
return machineMonitorResult;
}
public void setMachineMonitorResult(Map<String, String> machineMonitorResult) {
this.machineMonitorResult = machineMonitorResult;
}
}
根据流程编排的有向无环图,就可以完成 drl 文件的编写了:
package cn.techlog.testjava.main.drools.test
import cn.techlog.testjava.main.drools.article_publish.PublishProcess
import java.util.Map
import java.util.HashMap
rule "pre_process"
when
$publishProcess: PublishProcess(preProcess == null || preProcess == 0)
then
System.out.println("do pre process");
$publishProcess.setPreProcess(1);
update($publishProcess)
end
rule "push_monitor"
when
$publishProcess: PublishProcess(preProcess == 1, (pushMonitor == null || pushMonitor == 0))
then
System.out.println("do push monitor: push manual monitor and push machine monitor");
$publishProcess.setPushMonitor(1);
update($publishProcess)
end
rule "process_manual_monitor"
when
$publishProcess: PublishProcess(receiveManualMonitor == 1, (processManualMonitor == null || processManualMonitor == 0))
then
System.out.println("do process manual monitor, update article category and do article publish");
$publishProcess.setProcessManualMonitor(1);
update($publishProcess)
end
rule "process_machine_monitor"
when
$publishProcess: PublishProcess(receiveMachineMonitor == 1, processManualMonitor == 1, (processMachineMonitor == null || processMachineMonitor == 0))
then
System.out.println("do process machine monitor, update article content and push to C department: " + $publishProcess.getMachineMonitorResult());
$publishProcess.setProcessMachineMonitor(1);
update($publishProcess)
end
接下来我们来编写程序模拟这一流程的执行:
package cn.techlog.testjava.main.drools.article_publish;
import cn.techlog.testjava.main.util.FileUtil;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.StatelessKieSession;
import org.kie.internal.conf.MultithreadEvaluationOption;
import org.kie.internal.utils.KieHelper;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.FutureTask;
public class ArticlePublishTest {
private static final PublishProcess publishProcess = new PublishProcess();
private static final StatelessKieSession kieSession;
static {
String drlStr = FileUtil.getFileString("drl/article_publish.drl");
KieHelper helper = new KieHelper();
helper.addContent(drlStr, ResourceType.DRL);
kieSession = helper.build(MultithreadEvaluationOption.YES).newStatelessKieSession();
}
public static void main(String[] args) {
FutureTask<String> futureTask1 = new FutureTask<>(ArticlePublishTest::doDrl);
FutureTask<String> futureTask2 = new FutureTask<>(ArticlePublishTest::startNode);
Thread thread1 = new Thread(futureTask1);
Thread thread2 = new Thread(futureTask2);
thread1.start();
thread2.start();
}
private static String startNode() throws InterruptedException {
Thread.sleep(3000);
System.out.println("-> receive machine monitor result");
Map<String, String> result = new HashMap<>();
result.put("result", "hello world");
publishProcess.setMachineMonitorResult(result);
publishProcess.setReceiveMachineMonitor(1);
kieSession.execute(publishProcess);
Thread.sleep(3000);
System.out.println("-> receive manual monitor result");
publishProcess.setReceiveManualMonitor(1);
kieSession.execute(publishProcess);
return null;
}
private static String doDrl() {
kieSession.execute(publishProcess);
return null;
}
}
执行程序,我们看到输出了结果:
do pre process do push monitor: push manual monitor and push machine monitor -> receive machine monitor result -> receive manual monitor result do process manual monitor, update article category and do article publish do process machine monitor, update article content and push to C department: {result=hello world}
虽然在我们的异步接收线程中,机器审核结果早于人工审核结果先被收到,但由于我们的流程编排,机器审核结果的处理则直到人工审核结果处理后才完成执行。
我们看到,在我们的模拟文章发布流程中,我们将复杂、多分支、存在竞争条件的文章发布流程通过规则引擎模拟实现的任务编排引擎成功变成了串行执行,没有竞争条件存在的简单流程。
但这个例子仍然是非常基础的,在实际的场景中,你可能还是会遇到以下这些问题:
在实际场景中,任务的某个节点需要重做是经常让人很头疼的一件事,因为对于线上场景,任务经常是可重入的,否则重复回调等常见情况就会造成你的任务出现问题,但既然任务是可重入的,那怎么让你的任务重新执行一次呢?进行上述改造以后,就不再存在这个问题了,因为编排引擎决定了任务不会被执行两次,如果某个任务需要被重新执行,只需要将状态描述类中对应的字段置为 0,其他不需要执行的任务对应的状态字段置为 1,即可保证仅重新执行该节点,而无需担心其他节点意外受到影响了。
从性能上来说,规则文件的解析与实例化是非常耗时的,因此,提前 build,例如在项目启动时就完成所有规则的实例化,然后将 kieSession 放在内存中,这样在实际执行的过程中,性能会有明显提升。另一方面,不要在单个规则中加入复杂的判断逻辑,对于复杂场景,拆分成多个 rule 可以有效提升性能,同时,不要在 then 块中进行判断,所有的判断应该都放在 when 块中。
显然,生产环境中要比上述 demo 更加复杂,最基本的一点,线上环境中,各个任务不会都在同一台机器上执行,同时,接收到异步回调的节点也会分布在不同的服务器上,虽然通过流程编排,解决了业务代码中的竞争条件,极大地简化了整个流程,但任务的状态描述结构仍然需要在分布式环境中共享,这就需要一个中心化的缓存,同时,分布式环境下,对任务状态对象中字段的修改也同样存在着竞争条件,因此最好的方法是将这个状态对象的缓存与竞争条件的加锁逻辑封装为一个新的工程框架,这就是一个任务编排引擎。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有