JGraphlet是一个轻量级、零依赖的Java库,用于构建任务流水线。它的强大之处不在于冗长的功能列表,而在于一小套协同工作的核心设计原则。
JGraphlet的核心是简洁性,基于图模型构建。向流水线添加任务并连接它们以创建图。
每个任务都有输入和输出,TaskPipeline构建并执行流水线,同时管理每个任务的I/O。例如,使用Map进行扇入操作,使用Record定义自定义数据模型等。TaskPipeline还包含PipelineContext在任务间共享数据,此外任务还可以被缓存,避免重复计算。
您可以自定义任务流水线的流程,并选择使用SyncTask或AsyncTask。默认情况下所有任务都是异步的。
JGraphlet将工作流视为有向无环图。您将任务定义为节点,并显式绘制它们之间的连接。这使得扇出和扇入等复杂模式变得自然。
import dev.shaaf.jgraphlet.*;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
try (TaskPipeline pipeline = new TaskPipeline()) {
Task<String, String> fetchInfo = (id, ctx) -> CompletableFuture.supplyAsync(() -> "Info for " + id);
Task<String, String> fetchFeed = (id, ctx) -> CompletableFuture.supplyAsync(() -> "Feed for " + id);
Task<Map<String, Object>, String> combine = (inputs, ctx) -> CompletableFuture.supplyAsync(() ->
inputs.get("infoNode") + " | " + inputs.get("feedNode")
);
pipeline.addTask("infoNode", fetchInfo)
.addTask("feedNode", fetchFeed)
.addTask("summaryNode", combine);
pipeline.connect("infoNode", "summaryNode")
.connect("feedNode", "summaryNode");
String result = (String) pipeline.run("user123").join();
System.out.println(result); // "Info for user123 | Feed for user123"
}
JGraphlet提供两种可混合使用的任务类型:
try (TaskPipeline pipeline = new TaskPipeline()) {
Task<String, String> fetchName = (userId, ctx) ->
CompletableFuture.supplyAsync(() -> "John Doe");
SyncTask<String, String> toUpper = (name, ctx) -> name.toUpperCase();
pipeline.add("fetch", fetchName)
.then("transform", toUpper);
String result = (String) pipeline.run("user-42").join();
System.out.println(result); // "JOHN DOE"
}
JGraphlet避免复杂的构建器或魔法配置,API简洁明了:
try (TaskPipeline pipeline = new TaskPipeline()) {
SyncTask<String, Integer> lengthTask = (s, c) -> s.length();
SyncTask<Integer, String> formatTask = (i, c) -> "Length is " + i;
pipeline.addTask("calculateLength", lengthTask);
pipeline.addTask("formatOutput", formatTask);
pipeline.connect("calculateLength", "formatOutput");
String result = (String) pipeline.run("Hello").join();
System.out.println(result); // "Length is 5"
}
扇入任务接收Map<String, Object>,其中键是父任务ID,值是它们的结果。
try (TaskPipeline pipeline = new TaskPipeline()) {
SyncTask<String, String> fetchUser = (id, ctx) -> "User: " + id;
SyncTask<String, String> fetchPerms = (id, ctx) -> "Role: admin";
Task<Map<String, Object>, String> combine = (inputs, ctx) -> CompletableFuture.supplyAsync(() -> {
String userData = (String) inputs.get("userNode");
String permsData = (String) inputs.get("permsNode");
return userData + " (" + permsData + ")";
});
pipeline.addTask("userNode", fetchUser)
.addTask("permsNode", fetchPerms)
.addTask("combiner", combine);
pipeline.connect("userNode", "combiner").connect("permsNode", "combiner");
String result = (String) pipeline.run("user-1").join();
System.out.println(result); // "User: user-1 (Role: admin)"
}
执行流水线很简单:pipeline.run(input)返回最终结果的CompletableFuture。您可以使用.join()阻塞或使用异步链式调用。
String input = "my-data";
// 阻塞方式
try {
String result = (String) pipeline.run(input).join();
System.out.println("Result (blocking): " + result);
} catch (Exception e) {
System.err.println("Pipeline failed: " + e.getMessage());
}
// 非阻塞方式
pipeline.run(input)
.thenAccept(result -> System.out.println("Result (non-blocking): " + result))
.exceptionally(ex -> {
System.err.println("Async pipeline failed: " + ex.getMessage());
return null;
});
JGraphlet实现AutoCloseable。使用try-with-resources保证内部资源的安全关闭。
try (TaskPipeline pipeline = new TaskPipeline()) {
pipeline.add("taskA", new SyncTask<String, String>() {
@Override
public String executeSync(String input, PipelineContext context) {
if (input == null) {
throw new IllegalArgumentException("Input cannot be null");
}
return "Processed: " + input;
}
});
pipeline.run("data").join();
} // pipeline.shutdown()自动调用
System.out.println("Pipeline resources have been released.");
PipelineContext是线程安全的、每次运行的工作空间,用于存储元数据。
SyncTask<String, String> taskA = (input, ctx) -> {
ctx.put("requestID", "xyz-123");
return input;
};
SyncTask<String, String> taskB = (input, ctx) -> {
String reqId = ctx.get("requestID", String.class).orElse("unknown");
return "Processed input " + input + " for request: " + reqId;
};
任务可以选择加入缓存以避免重复计算。
Task<String, String> expensiveApiCall = new Task<>() {
@Override
public CompletableFuture<String> execute(String input, PipelineContext context) {
System.out.println("Performing expensive call for: " + input);
return CompletableFuture.completedFuture("Data for " + input);
}
@Override
public boolean isCacheable() { return true; }
};
try (TaskPipeline pipeline = new TaskPipeline()) {
pipeline.add("expensive", expensiveApiCall);
System.out.println("First call...");
pipeline.run("same-key").join();
System.out.println("Second call...");
pipeline.run("same-key").join(); // 结果来自缓存
}
最终结果是提供了一种清晰、可测试的方式来编排同步或异步任务,用于组合复杂流程,如并行检索、合并、判断和防护栏,而无需引入重量级的工作流引擎。
了解更多或尝试使用:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。