Reaction(反应器)是Drasi系统中的一个重要组件,它能够对数据变化做出响应和处理。简单来说,当你的数据发生变化时,Reaction就会被触发,然后执行你定义的操作。扩展Drasi Reactions的文档参见:https://github.com/drasi-project/docs/blob/main/docs/content/how-to-guides/extend-drasi/implement-a-reaction.md 。
你可以使用任何编程语言来编写Reaction,目前SDK 支持三种主流编程语言:JavaScript/TypeScript、Python 和 .NET。只要最终能打包成Docker镜像即可。这个Docker镜像需要:
当Reaction运行时,Drasi会在容器内的 /etc/queries
目录下为每个查询创建配置文件。
假设你配置了一个简单的Reaction:
apiVersion: query.reactive-graph.io/v1 kind: Reaction metadata: name: my-reaction spec: reactionImage: my-reaction queries: - queryId: query1 - queryId: query2 - queryId: query3
这个配置会在 /etc/queries
目录下创建三个文件:query1
、query2
和query3
。
你也可以为每个查询添加额外的配置信息:
apiVersion: query.reactive-graph.io/v1 kind: Reaction metadata: name: my-reaction spec: reactionImage: my-reaction queries: - queryId: query1 options: > foo - queryId: query2 options: > bar
当数据发生变化时,Drasi会通过 Dapr 发送消息到你的Reaction。当数据发生变化时,你会收到JSON格式的消息,包含三种类型的变化:
addedResults
:新增的数据 deletedResults
:删除的数据 updatedResults
:更新的数据Drasi Reactions SDK 是一个跨语言的开发工具包,用于实现和处理 Drasi 平台的 Reactions(反应器)功能。该 SDK 目前支持三种主流编程语言:JavaScript/TypeScript、Python 和 .NET。
主要功能和特点:
JavaScript/TypeScript 版本:
import { DrasiReaction, ChangeEvent } from '@drasi/reaction-sdk';
let myReaction = new DrasiReaction(async (event: ChangeEvent) => {
console.log(`Received change sequence: ${event.sequence} for query ${event.queryId}`);
// 处理添加的结果
for (let added of event.addedResults) {
console.log(`Added result: ${JSON.stringify(added)}`);
}
// 处理删除的结果
for (let deleted of event.deletedResults) {
console.log(`Removed result: ${JSON.stringify(deleted)}`);
}
// 处理更新的结果
for (let updated of event.updatedResults) {
console.log(`Updated result - before: ${JSON.stringify(updated.before)}, after: ${JSON.stringify(updated.after)}`);
}
});
myReaction.start();
Python 版本:
from drasi.reaction.models.ChangeEvent import ChangeEvent
from drasi.reaction.sdk import DrasiReaction
async def change_event(event: ChangeEvent, query_configs: dict[str, Any] | None = None):
print(f"Received change sequence {event.sequence} for query {event.queryId}")
if event.addedResults:
print(f"Added result: {event.addedResults}")
if event.deletedResults:
print(f"Removed result: {event.deletedResults}")
if event.updatedResults:
print(f"Updated result - before: {event.updatedResults[0].before}, after {event.updatedResults[0].after}")
reaction = DrasiReaction(on_change_event=change_event)
reaction.start()
.NET 版本:
var reaction = new ReactionBuilder()
.UseChangeEventHandler(async (evt, queryConfig) => {
Console.WriteLine($"Received change event from query {evt.QueryId} sequence {evt.Sequence}");
foreach (var item in evt.AddedResults)
Console.WriteLine($"Added result: {item}");
foreach (var item in evt.UpdatedResults)
Console.WriteLine($"Updated result, before {item.Before}, after {item.After}");
foreach (var item in evt.DeletedResults)
Console.WriteLine($"Deleted result: {item}");
})
.Build();
await reaction.StartAsync();
上面是基础示例,接下来我们来看一个高级示例
class MyQueryConfig
{
[JsonPropertyName("greeting")]
public string? Greeting { get; set; }
}
class MyService
{
private readonly string _connectionString;
public MyService(IConfiguration configuration)
{
_connectionString = configuration["MyConnectionString"];
}
public void DoSomething()
{
Console.WriteLine("Doing something");
}
}
MyChangeHandler
) class MyChangeHandler : IChangeEventHandler<MyQueryConfig>
{
private readonly MyService _service;
private readonly ILogger<MyChangeHandler> _logger;
public MyChangeHandler(MyService service, ILogger<MyChangeHandler> logger)
{
_service = service;
_logger = logger;
}
public async Task HandleChange(ChangeEvent evt, MyQueryConfig? queryConfig)
{
_logger.LogInformation($"Received change event from query {evt.QueryId} sequence {evt.Sequence}. Query greeting is {queryConfig?.Greeting}");
_logger.LogInformation($"Full event: {evt.ToJson()}");
_service.DoSomething();
}
}
// 控制事件处理器(MyControlSignalHandler)
class MyControlSignalHandler : IControlEventHandler<MyQueryConfig>
{
private readonly ILogger<MyControlSignalHandler> _logger;
public MyControlSignalHandler(ILogger<MyControlSignalHandler> logger)
{
_logger = logger;
}
public async Task HandleControlSignal(ControlEvent evt, MyQueryConfig? queryConfig)
{
_logger.LogWarning($"Received control signal: {evt.ControlSignal?.Kind} for query {evt.QueryId}. Query greeting is {queryConfig?.Greeting}");
}
}
var reaction = new ReactionBuilder<MyQueryConfig>()
.UseChangeEventHandler<MyChangeHandler>() // Use your custom change handler
.UseControlEventHandler<MyControlSignalHandler>() // Use your custom control signal handler
.UseYamlQueryConfig() // Parse the per query configuration from Yaml
.ConfigureServices((services) => // Register your own services
{
services.AddSingleton<MyService>();
})
.Build();
// Start the reaction
await reaction.StartAsync();
注册你的ReactionProvider创建好Reaction后,需要通过ReactionProvider将它注册到Drasi系统中。 下面是一个ReactionProvider配置:apiVersion: v1
kind: ReactionProvider
name: Advanced
spec:
services:
reaction:
image: reaction-advanced
externalImage: true
config_schema:
type: object
properties:
MyConnectionString:
type: string
required:
- MyConnectionString
Reaction使用示例
kind: Reaction
apiVersion: v1
name: test-advanced
spec:
kind: Advanced
properties:
MyConnectionString: "some connection string"
queries:
query1: |
greeting: "Hello, World!"
query2: |
greeting: "Howdy!"
使用Drasi命令行工具部署你的Reaction:
# 先部署ReactionProvider
drasi apply -f reaction-provider.yaml
# 然后部署Reaction
drasi apply -f reaction.yaml
SDK 的设计理念是提供一个简单但强大的接口,让开发者能够方便地实现和管理 Drasi 平台的反应器功能。无论使用哪种编程语言,都可以通过相似的 API 设计模式来处理事件和管理配置。