Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Drasi Reactions SDK

Drasi Reactions SDK

作者头像
张善友
发布于 2025-02-16 12:10:51
发布于 2025-02-16 12:10:51
10500
代码可运行
举报
文章被收录于专栏:张善友的专栏张善友的专栏
运行总次数:0
代码可运行

Reaction(反应器)是Drasi系统中的一个重要组件,它能够对数据变化做出响应和处理。简单来说,当你的数据发生变化时,Reaction就会被触发,然后执行你定义的操作。扩展Drasi Reactions的文档参见:https://github.com/drasi-project/docs/blob/main/docs/content/how-to-guides/extend-drasi/implement-a-reaction.md 。

实现自定义Reaction的三个步骤

第一步:创建Docker镜像

你可以使用任何编程语言来编写Reaction,目前SDK 支持三种主流编程语言:JavaScript/TypeScriptPython 和 .NET。只要最终能打包成Docker镜像即可。这个Docker镜像需要:

  1. 能够读取配置信息
  2. 能够接收数据变化的消息
  3. 对数据变化做出响应
第二步:处理查询配置

当Reaction运行时,Drasi会在容器内的 /etc/queries 目录下为每个查询创建配置文件。

实际例子

假设你配置了一个简单的Reaction:

apiVersion: query.reactive-graph.io/v1 kind: Reaction metadata: name: my-reaction spec: reactionImage: my-reaction queries: - queryId: query1 - queryId: query2 - queryId: query3

这个配置会在 /etc/queries 目录下创建三个文件:query1、query2和query3

你也可以为每个查询添加额外的配置信息:

apiVersion: query.reactive-graph.io/v1 kind: Reaction metadata: name: my-reaction spec: reactionImage: my-reaction queries: - queryId: query1 options: > foo - queryId: query2 options: > bar

第三步:处理数据变化

当数据发生变化时,Drasi会通过 Dapr 发送消息到你的Reaction。当数据发生变化时,你会收到JSON格式的消息,包含三种类型的变化:

  • addedResults:新增的数据
  • deletedResults:删除的数据
  • updatedResults:更新的数据

Drasi Reactions SDK 是一个跨语言的开发工具包,用于实现和处理 Drasi 平台的 Reactions(反应器)功能。该 SDK 目前支持三种主流编程语言:JavaScript/TypeScript、Python 和 .NET。

主要功能和特点:

  1. 核心功能:
    • 处理来自 Continuous Query(持续查询)的变更事件(ChangeEvent)
    • 处理控制事件(ControlEvent)
    • 支持查询配置的解析和管理
    • 提供事件订阅和处理机制
  2. 主要事件类型:
    • ChangeEvent(变更事件):包含查询结果的添加、删除和更新信息
    • ControlEvent(控制事件):处理启动、停止等控制信号
  3. 支持的语言实现:

JavaScript/TypeScript 版本:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import { DrasiReaction, ChangeEvent } from '@drasi/reaction-sdk';

let myReaction = new DrasiReaction(async (event: ChangeEvent) => {
    console.log(`Received change sequence: ${event.sequence} for query ${event.queryId}`);
    // 处理添加的结果
    for (let added of event.addedResults) {
        console.log(`Added result: ${JSON.stringify(added)}`);
    }
    // 处理删除的结果
    for (let deleted of event.deletedResults) {
        console.log(`Removed result: ${JSON.stringify(deleted)}`);
    }
    // 处理更新的结果
    for (let updated of event.updatedResults) {
        console.log(`Updated result - before: ${JSON.stringify(updated.before)}, after: ${JSON.stringify(updated.after)}`);
    }
});

myReaction.start();

Python 版本:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from drasi.reaction.models.ChangeEvent import ChangeEvent
from drasi.reaction.sdk import DrasiReaction

async def change_event(event: ChangeEvent, query_configs: dict[str, Any] | None = None):
    print(f"Received change sequence {event.sequence} for query {event.queryId}")
    if event.addedResults:
        print(f"Added result: {event.addedResults}")
    if event.deletedResults:
        print(f"Removed result: {event.deletedResults}")
    if event.updatedResults:
        print(f"Updated result - before: {event.updatedResults[0].before}, after {event.updatedResults[0].after}")

reaction = DrasiReaction(on_change_event=change_event)
reaction.start()

.NET 版本:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
var reaction = new ReactionBuilder()
    .UseChangeEventHandler(async (evt, queryConfig) => {
        Console.WriteLine($"Received change event from query {evt.QueryId} sequence {evt.Sequence}");
        
        foreach (var item in evt.AddedResults)
            Console.WriteLine($"Added result: {item}");

        foreach (var item in evt.UpdatedResults)
            Console.WriteLine($"Updated result, before {item.Before}, after {item.After}");

        foreach (var item in evt.DeletedResults)
            Console.WriteLine($"Deleted result: {item}");
    })
    .Build();

await reaction.StartAsync();

上面是基础示例,接下来我们来看一个高级示例

  • 自定义配置类
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class MyQueryConfig
{
    [JsonPropertyName("greeting")]
    public string? Greeting { get; set; }
}
  • 自定义服务实现
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class MyService
{
    private readonly string _connectionString;

    public MyService(IConfiguration configuration)
    {
        _connectionString = configuration["MyConnectionString"];
    }

    public void DoSomething()
    {
        Console.WriteLine("Doing something");
    }
}
  • 事件处理器实现
  • 变更事件处理器(MyChangeHandler
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class MyChangeHandler : IChangeEventHandler<MyQueryConfig>
{   
    private readonly MyService _service;
    private readonly ILogger<MyChangeHandler> _logger;

    public MyChangeHandler(MyService service, ILogger<MyChangeHandler> logger)
    {
        _service = service;
        _logger = logger;
    }

    public async Task HandleChange(ChangeEvent evt, MyQueryConfig? queryConfig)
    {
        _logger.LogInformation($"Received change event from query {evt.QueryId} sequence {evt.Sequence}. Query greeting is {queryConfig?.Greeting}");
        _logger.LogInformation($"Full event: {evt.ToJson()}");
        _service.DoSomething();
    }
}

// 控制事件处理器(MyControlSignalHandler) 
class MyControlSignalHandler : IControlEventHandler<MyQueryConfig>
{
    private readonly ILogger<MyControlSignalHandler> _logger;

    public MyControlSignalHandler(ILogger<MyControlSignalHandler> logger)
    {
        _logger = logger;
    }

    public async Task HandleControlSignal(ControlEvent evt, MyQueryConfig? queryConfig)
    {
        _logger.LogWarning($"Received control signal: {evt.ControlSignal?.Kind} for query {evt.QueryId}. Query greeting is {queryConfig?.Greeting}");
    }
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
var reaction = new ReactionBuilder<MyQueryConfig>()
    .UseChangeEventHandler<MyChangeHandler>()               // Use your custom change handler
    .UseControlEventHandler<MyControlSignalHandler>()       // Use your custom control signal handler
    .UseYamlQueryConfig()                                   // Parse the per query configuration from Yaml
    .ConfigureServices((services) =>                        // Register your own services
    {
        services.AddSingleton<MyService>();
    })
    .Build();

// Start the reaction
await reaction.StartAsync();
注册你的ReactionProvider创建好Reaction后,需要通过ReactionProvider将它注册到Drasi系统中。 下面是一个ReactionProvider配置:apiVersion: v1

kind: ReactionProvider

name: Advanced

spec:
   services:
     reaction:
       image: reaction-advanced
       externalImage: true
   config_schema:
     type: object
     properties: 
       MyConnectionString:
         type: string
     required:
       - MyConnectionString
Reaction使用示例

kind: Reaction

apiVersion: v1

name: test-advanced

spec:
   kind: Advanced
   properties:
     MyConnectionString: "some connection string"
   queries:
     query1: |
       greeting: "Hello, World!"
     query2: |
       greeting: "Howdy!"

如何部署

使用Drasi命令行工具部署你的Reaction:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 先部署ReactionProvider
drasi apply -f reaction-provider.yaml

# 然后部署Reaction
drasi apply -f reaction.yaml

调试技巧

  1. 使用环境变量来配置你的Reaction
  2. 查看Docker容器日志来排查问题
  3. 利用Drasi的VSCode插件来验证配置文件
  4. 确保你的Docker镜像能够正确处理Dapr消息

常见问题

  1. Q: 为什么我的Reaction没有收到数据变化? A: 检查查询ID是否配置正确,以及Dapr订阅是否成功。
  2. Q: 如何测试我的Reaction? A: 可以先使用调试模式运行,打印收到的所有消息。
  3. Q: 配置文件验证失败怎么办? A: 使用Drasi的VSCode插件或CLI工具验证配置文件格式是否正确。

SDK 的设计理念是提供一个简单但强大的接口,让开发者能够方便地实现和管理 Drasi 平台的反应器功能。无论使用哪种编程语言,都可以通过相似的 API 设计模式来处理事件和管理配置。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-02-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
深入解析C#委托:从基础原理到实战精要
我仍清晰记得初次邂逅C#委托的那个深夜。当时正在调试一段事件驱动代码,突然遇到了这个神秘的delegate关键字。乍看之下,它仿佛魔法般难以捉摸,又像是冗余的复杂设计。
郑子铭
2025/06/19
480
深入解析C#委托:从基础原理到实战精要
基于.NET 6.0 自研轻量级ORM框架
项目:https://gitee.com/China-Mr-zhong/Fast.Framework
郑子铭
2022/03/22
1K0
.NET 中的设计模式应用
设计模式是软件设计与开发过程中常见问题的可复用解决方案。它们是通用模板或最佳实践,用于指导开发人员创建结构良好、可维护且高效的代码。
郑子铭
2025/02/18
540
.NET 中的设计模式应用
.Net Core中使用DiagnosticSource进行日志记录
System.Diagnostics.DiagnosticSource 可以丰富地记录程序中地日志,包括不可序列化的类型(例如 HttpResponseMessage 或 HttpContext)。
Chester Chen
2024/03/13
1770
.Net Core中使用DiagnosticSource进行日志记录
国内开源社区巨作AspectCore-Framework入门
在软件业,AOP为Aspect Oriented Programming的缩写,意为:面向切面编程,通过预编译方式和运行期动态代理实现程序功能的统一维护的一种技术。
码农阿宇
2018/11/04
1.3K0
大升级!支持CQRS|异步订阅发布-CodeWF.EventBus
事件总线,即EventBus,是一种解耦模块间通讯的强大工具。在CodeWF.EventBus库中,我们得以轻松实现CQRS模式,并通过清晰、简洁的接口进行事件订阅与发布。接下来,我们将详细探讨如何使用这个库来处理事件。
沙漠尽头的狼
2024/06/18
2080
大升级!支持CQRS|异步订阅发布-CodeWF.EventBus
Orleans 知多少 | 3. Hello Orleans
是的,Orleans v3.0.0 已经发布了,并已经完全支持 .NET Core 3.0。 所以,Orleans 系列是时候继续了,抱歉,让大家久等了。 万丈高楼平地起,这一节我们就先来了解下Orleans的基本使用。
圣杰
2020/06/19
8540
Orleans 知多少 | 3. Hello Orleans
.NET 6新东西--PeriodicTimer
在.NET 6中引入了新Timer:System.Threading.PeriodicTimer,它和之前的Timer相比,最大的区别就是新的PeriodicTimer事件处理可以方便地使用异步,消除使用callback机制减少使用复杂度。讲解PeriodicTimer之前我们先来看以下该怎么使用它:
喵叔
2021/12/06
4890
MediatR: .NET 平台上的命令查询职责分离 (CQRS) 库
MediatR 是一个轻量级的库,用于实现应用程序中的中介者模式【实现命令查询职责分离 (CQRS) 模式和面向消息的架构】。通过将请求(命令或查询)与处理程序解耦,简化了应用程序的逻辑分层和扩展。
郑子铭
2025/04/04
710
MediatR: .NET 平台上的命令查询职责分离 (CQRS) 库
C#3.0新增功能01 自动实现的属性
在 C# 3.0 及更高版本,当属性访问器中不需要任何其他逻辑时,自动实现的属性会使属性声明更加简洁。 它们还允许客户端代码创建对象。 当你声明以下示例中所示的属性时,编译器将创建仅可以通过该属性的 get 和 set 访问器访问的专用、匿名支持字段。
张传宁IT讲堂
2019/09/17
6220
.NET 中50种常见错误使用方法及推荐用法
下面是一个经过改进和扩展的列表,其中包含破坏 .NET 应用程序的 50 种方法,并解释了每种做法不佳的原因,以及演示如何解决每个问题的更正代码示例。这个全面的指南将有助于识别不良做法,并说明如何编写干净、可维护的代码。
郑子铭
2024/11/29
2950
.NET 中50种常见错误使用方法及推荐用法
【愚公系列】2023年03月 .NET CORE工具案例-基于CacheManager缓存中间件
CacheManager 是用 C# 编写的 .NET 开源缓存抽象层。它支持各种缓存提供程序并实现许多高级功能。
愚公搬代码
2023/03/16
3740
【愚公系列】2023年03月 .NET CORE工具案例-基于CacheManager缓存中间件
零基础写框架(3): Serilog.NET 中的日志使用技巧
Serilog 是 .NET 社区中使用最广泛的日志框架,所以笔者使用一个小节单独讲解使用方法。
痴者工良
2024/06/19
5110
零基础写框架(3): Serilog.NET 中的日志使用技巧
一个可用于生产项目 基于 .NET 6 自研ORM
Fast Framework 基于NET6.0 封装的轻量级 ORM 框架 支持多种数据库 SqlServer Oracle MySql PostgreSql Sqlite
郑子铭
2023/08/29
3220
一个可用于生产项目 基于 .NET 6 自研ORM
Metalama简介3.自定义.NET项目中的代码分析
这里所说的代码分析,是可以通过一些自定义的方法,在使用不符合条件的代码时产生错误或警告。
重典
2022/04/13
5290
Metalama简介3.自定义.NET项目中的代码分析
一步一步学Linq to sql(八):继承与关系
1.首先定义的是Topic实体基类,然后两个子类的继承,NewTopic--主题帖,Reply--回复帖。 2.Topic类上的特性,下面先来看一下特性类
aehyok
2018/09/11
4080
一步一步学Linq to sql(八):继承与关系
深入浅出理解Continuous Queries和Cypher Query Language
连续查询是 Drasi 最重要的组件。它们是您告诉 Drasi 要在源系统中检测哪些更改以及检测到更改时要分发的数据的机制。源为订阅的 Continuous Queries 提供源更改,然后为订阅的 Reactions 提供查询结果更改。
张善友
2025/02/18
640
深入浅出理解Continuous Queries和Cypher Query Language
在ASP.NET Core微服务架构下使用RabbitMQ如何实现CQRS模式
在现代软件开发中,微服务架构和CQRS模式都是备受关注的技术趋势。微服务架构通过将应用程序拆分为一系列小型、自治的服务,提供了更好的可伸缩性和灵活性。而CQRS模式则通过将读操作和写操作分离,优化了系统的性能和可维护性。本文小编将为大家介绍如何在ASP.NET Core微服务架构下使用RabbitMQ来实现CQRS模式。
葡萄城控件
2024/01/10
3460
在ASP.NET Core微服务架构下使用RabbitMQ如何实现CQRS模式
基于winserver部署Apollo初次体验(附.net客户端demo)
配置中心伴随着这几年分布式系统演变和微服务架构的兴起,已经成为必不可少的需求之一。试下一下如果哪天公司的所有应用服务,从公司服务器迁移到云服务,成千上万的配置,修改起来是多么耗时费劲的事(我们公司就是……)。
陈珙
2018/09/12
1.4K0
基于winserver部署Apollo初次体验(附.net客户端demo)
CodeWF.EventBus:轻量级事件总线,让通信更流畅
EventBus(事件总线),用于解耦模块之间的通讯。本库(CodeWF.EventBus)适用于进程内消息传递(无其他外部依赖),与大家普遍使用的MediatR部分类似,但MediatR库侧重于ASP.NET Core设计使用,而本库也有点点优势:
沙漠尽头的狼
2024/06/10
3120
CodeWF.EventBus:轻量级事件总线,让通信更流畅
相关推荐
深入解析C#委托:从基础原理到实战精要
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验