前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >Drasi Reactions SDK

Drasi Reactions SDK

作者头像
张善友
发布2025-02-16 20:10:51
发布2025-02-16 20:10:51
5800
代码可运行
举报
文章被收录于专栏:张善友的专栏张善友的专栏
运行总次数:0
代码可运行

Reaction(反应器)是Drasi系统中的一个重要组件,它能够对数据变化做出响应和处理。简单来说,当你的数据发生变化时,Reaction就会被触发,然后执行你定义的操作。扩展Drasi Reactions的文档参见:https://github.com/drasi-project/docs/blob/main/docs/content/how-to-guides/extend-drasi/implement-a-reaction.md 。

实现自定义Reaction的三个步骤

第一步:创建Docker镜像

你可以使用任何编程语言来编写Reaction,目前SDK 支持三种主流编程语言:JavaScript/TypeScript、Python 和 .NET。只要最终能打包成Docker镜像即可。这个Docker镜像需要:

  1. 能够读取配置信息
  2. 能够接收数据变化的消息
  3. 对数据变化做出响应
第二步:处理查询配置

当Reaction运行时,Drasi会在容器内的 /etc/queries 目录下为每个查询创建配置文件。

实际例子

假设你配置了一个简单的Reaction:

apiVersion: query.reactive-graph.io/v1 kind: Reaction metadata: name: my-reaction spec: reactionImage: my-reaction queries: - queryId: query1 - queryId: query2 - queryId: query3

这个配置会在 /etc/queries 目录下创建三个文件:query1、query2和query3

你也可以为每个查询添加额外的配置信息:

apiVersion: query.reactive-graph.io/v1 kind: Reaction metadata: name: my-reaction spec: reactionImage: my-reaction queries: - queryId: query1 options: > foo - queryId: query2 options: > bar

第三步:处理数据变化

当数据发生变化时,Drasi会通过 Dapr 发送消息到你的Reaction。当数据发生变化时,你会收到JSON格式的消息,包含三种类型的变化:

  • addedResults:新增的数据
  • deletedResults:删除的数据
  • updatedResults:更新的数据

Drasi Reactions SDK 是一个跨语言的开发工具包,用于实现和处理 Drasi 平台的 Reactions(反应器)功能。该 SDK 目前支持三种主流编程语言:JavaScript/TypeScript、Python 和 .NET。

主要功能和特点:

  1. 核心功能:
    • 处理来自 Continuous Query(持续查询)的变更事件(ChangeEvent)
    • 处理控制事件(ControlEvent)
    • 支持查询配置的解析和管理
    • 提供事件订阅和处理机制
  2. 主要事件类型:
    • ChangeEvent(变更事件):包含查询结果的添加、删除和更新信息
    • ControlEvent(控制事件):处理启动、停止等控制信号
  3. 支持的语言实现:

JavaScript/TypeScript 版本:

代码语言:javascript
代码运行次数:0
复制
import { DrasiReaction, ChangeEvent } from '@drasi/reaction-sdk';

let myReaction = new DrasiReaction(async (event: ChangeEvent) => {
    console.log(`Received change sequence: ${event.sequence} for query ${event.queryId}`);
    // 处理添加的结果
    for (let added of event.addedResults) {
        console.log(`Added result: ${JSON.stringify(added)}`);
    }
    // 处理删除的结果
    for (let deleted of event.deletedResults) {
        console.log(`Removed result: ${JSON.stringify(deleted)}`);
    }
    // 处理更新的结果
    for (let updated of event.updatedResults) {
        console.log(`Updated result - before: ${JSON.stringify(updated.before)}, after: ${JSON.stringify(updated.after)}`);
    }
});

myReaction.start();

Python 版本:

代码语言:javascript
代码运行次数:0
复制
from drasi.reaction.models.ChangeEvent import ChangeEvent
from drasi.reaction.sdk import DrasiReaction

async def change_event(event: ChangeEvent, query_configs: dict[str, Any] | None = None):
    print(f"Received change sequence {event.sequence} for query {event.queryId}")
    if event.addedResults:
        print(f"Added result: {event.addedResults}")
    if event.deletedResults:
        print(f"Removed result: {event.deletedResults}")
    if event.updatedResults:
        print(f"Updated result - before: {event.updatedResults[0].before}, after {event.updatedResults[0].after}")

reaction = DrasiReaction(on_change_event=change_event)
reaction.start()

.NET 版本:

代码语言:javascript
代码运行次数:0
复制
var reaction = new ReactionBuilder()
    .UseChangeEventHandler(async (evt, queryConfig) => {
        Console.WriteLine($"Received change event from query {evt.QueryId} sequence {evt.Sequence}");
        
        foreach (var item in evt.AddedResults)
            Console.WriteLine($"Added result: {item}");

        foreach (var item in evt.UpdatedResults)
            Console.WriteLine($"Updated result, before {item.Before}, after {item.After}");

        foreach (var item in evt.DeletedResults)
            Console.WriteLine($"Deleted result: {item}");
    })
    .Build();

await reaction.StartAsync();

上面是基础示例,接下来我们来看一个高级示例

  • 自定义配置类
代码语言:javascript
代码运行次数:0
复制
class MyQueryConfig
{
    [JsonPropertyName("greeting")]
    public string? Greeting { get; set; }
}
  • 自定义服务实现
代码语言:javascript
代码运行次数:0
复制
class MyService
{
    private readonly string _connectionString;

    public MyService(IConfiguration configuration)
    {
        _connectionString = configuration["MyConnectionString"];
    }

    public void DoSomething()
    {
        Console.WriteLine("Doing something");
    }
}
  • 事件处理器实现
  • 变更事件处理器(MyChangeHandler
代码语言:javascript
代码运行次数:0
复制
class MyChangeHandler : IChangeEventHandler<MyQueryConfig>
{   
    private readonly MyService _service;
    private readonly ILogger<MyChangeHandler> _logger;

    public MyChangeHandler(MyService service, ILogger<MyChangeHandler> logger)
    {
        _service = service;
        _logger = logger;
    }

    public async Task HandleChange(ChangeEvent evt, MyQueryConfig? queryConfig)
    {
        _logger.LogInformation($"Received change event from query {evt.QueryId} sequence {evt.Sequence}. Query greeting is {queryConfig?.Greeting}");
        _logger.LogInformation($"Full event: {evt.ToJson()}");
        _service.DoSomething();
    }
}

// 控制事件处理器(MyControlSignalHandler) 
class MyControlSignalHandler : IControlEventHandler<MyQueryConfig>
{
    private readonly ILogger<MyControlSignalHandler> _logger;

    public MyControlSignalHandler(ILogger<MyControlSignalHandler> logger)
    {
        _logger = logger;
    }

    public async Task HandleControlSignal(ControlEvent evt, MyQueryConfig? queryConfig)
    {
        _logger.LogWarning($"Received control signal: {evt.ControlSignal?.Kind} for query {evt.QueryId}. Query greeting is {queryConfig?.Greeting}");
    }
}
代码语言:javascript
代码运行次数:0
复制
var reaction = new ReactionBuilder<MyQueryConfig>()
    .UseChangeEventHandler<MyChangeHandler>()               // Use your custom change handler
    .UseControlEventHandler<MyControlSignalHandler>()       // Use your custom control signal handler
    .UseYamlQueryConfig()                                   // Parse the per query configuration from Yaml
    .ConfigureServices((services) =>                        // Register your own services
    {
        services.AddSingleton<MyService>();
    })
    .Build();

// Start the reaction
await reaction.StartAsync();
注册你的ReactionProvider创建好Reaction后,需要通过ReactionProvider将它注册到Drasi系统中。 下面是一个ReactionProvider配置:apiVersion: v1

kind: ReactionProvider

name: Advanced

spec:
   services:
     reaction:
       image: reaction-advanced
       externalImage: true
   config_schema:
     type: object
     properties: 
       MyConnectionString:
         type: string
     required:
       - MyConnectionString
Reaction使用示例

kind: Reaction

apiVersion: v1

name: test-advanced

spec:
   kind: Advanced
   properties:
     MyConnectionString: "some connection string"
   queries:
     query1: |
       greeting: "Hello, World!"
     query2: |
       greeting: "Howdy!"

如何部署

使用Drasi命令行工具部署你的Reaction:

代码语言:javascript
代码运行次数:0
复制
# 先部署ReactionProvider
drasi apply -f reaction-provider.yaml

# 然后部署Reaction
drasi apply -f reaction.yaml

调试技巧

  1. 使用环境变量来配置你的Reaction
  2. 查看Docker容器日志来排查问题
  3. 利用Drasi的VSCode插件来验证配置文件
  4. 确保你的Docker镜像能够正确处理Dapr消息

常见问题

  1. Q: 为什么我的Reaction没有收到数据变化? A: 检查查询ID是否配置正确,以及Dapr订阅是否成功。
  2. Q: 如何测试我的Reaction? A: 可以先使用调试模式运行,打印收到的所有消息。
  3. Q: 配置文件验证失败怎么办? A: 使用Drasi的VSCode插件或CLI工具验证配置文件格式是否正确。

SDK 的设计理念是提供一个简单但强大的接口,让开发者能够方便地实现和管理 Drasi 平台的反应器功能。无论使用哪种编程语言,都可以通过相似的 API 设计模式来处理事件和管理配置。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-02-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实现自定义Reaction的三个步骤
  • 如何部署
  • 调试技巧
  • 常见问题
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档