Source提供了与系统的连接,Drasi 可以将这些系统视为变化源。source 在 Drasi 中执行三个重要功能:
Drasi Sources SDK 是一个用于实现 Drasi 数据源的多语言开发工具包,目前支持 Java、.NET、Rust 等编程语言。这个 SDK 的主要目的是帮助开发者创建和管理 Drasi 平台的数据源。扩展Drasi Sources的文档参见 https://github.com/drasi-project/docs/blob/main/docs/content/how-to-guides/extend-drasi/implement-a-source.md
每个数据源由两个核心部分组成:
b) Source Proxy(数据源代理):负责初始数据的获取和加载 :
SDK 的设计理念是提供一个统一的接口来实现各种数据源的接入,同时保持足够的灵活性以适应不同的使用场景。无论是简单的数据源还是复杂的数据处理系统,都可以通过这个 SDK 来实现与 Drasi 平台的集成。
Source Proxy主要负责在查询部署时获取初始数据。它需要提供一个HTTP服务器,并实现/acquire
接口来处理初始数据的加载。
using System.Runtime.CompilerServices; using System.Text.Json.Nodes; using Drasi.Source.SDK; using Drasi.Source.SDK.Models; using Microsoft.Extensions.Configuration;
var proxy = new SourceProxyBuilder() .UseBootstrapHandler<BootstrapHandler>() .Build();
await proxy.StartAsync();
class BootstrapHandler : IBootstrapHandler { public BootstrapHandler(IConfiguration configuration) { Console.WriteLine($"Connection string: {configuration["connectionString"]}"); }
public async IAsyncEnumerable<SourceElement> Bootstrap(BootstrapRequest request, [EnumeratorCancellation]CancellationToken cancellationToken = default) { if (request.NodeLabels.Contains("Person")) {
yield return new SourceElement("person-1", ["Person"], new JsonObject { { "name", "Alice" }, { "age", 30 } });
yield return new SourceElement("person-2", ["Person"], new JsonObject { { "name", "Bob" }, { "age", 40 } }); }
if (request.RelationLabels.Contains("Knows")) { yield return new SourceElement("1-2", ["Knows"], new JsonObject { { "since", 2010 } }, "person-1", "person-2"); } } }
Source Reactivator负责监控数据变化并通过Dapr的pub/sub(发布/订阅)功能将变化事件发送给其他组件。
所有的数据变化事件都需要包含三个必须字段:
op
:操作类型 payload
:数据负载 ts_ms
:时间戳(毫秒){
"op": "i", // i 表示 insert(插入)
"payload": {
"after": {
"id": "001",
"labels": ["用户", "VIP"],
"properties": {
"name": "张三",
"age": 30
}
},
"before": {}, // 新增时before为空
"source": {
"table": "node", // node表示节点,relation表示关系
"ts_ms": "1676908799000"
}
},
"ts_ms": 1676908799000
}
{
"op": "u", // u 表示 update(更新)
"payload": {
"after": {
"id": "001",
"labels": ["用户", "VIP"],
"properties": {
"name": "张三",
"age": 31
}
},
"before": {
"id": "001",
"labels": ["用户", "VIP"],
"properties": {
"name": "张三",
"age": 30
}
},
"source": {
"table": "node",
"ts_ms": "1676908799000"
}
},
"ts_ms": 1676908799000
}
{
"op": "d", // d 表示 delete(删除)
"payload": {
"after": {}, // 删除时after为空
"before": {
"id": "001",
"labels": ["用户", "VIP"],
"properties": {
"name": "张三",
"age": 31
}
},
"source": {
"table": "node",
"ts_ms": "1676908799000"
}
},
"ts_ms": 1676908799000
}
要注册新的数据源类型,你需要创建一个SourceProvider配置文件。这个配置描述了数据源的组件和配置选项。
apiVersion: v1 kind: SourceProvider name: MySource spec: services: proxy: image: my-proxy externalImage: true dapr: app-port: "80" reactivator: image: my-reactivator externalImage: true deprovisionHandler: true dapr: app-port: "80" config_schema: type: object properties: connectionString: # sample config property type: string
创建Source配置文件来使用已注册的数据源:
apiVersion: v1 kind: Source name: test-source spec: kind: MySource properties: connectionString: "my-connection-string"
# 注册数据源提供者
drasi apply -f source-provider.yaml
# 查看所有可用的数据源类型
drasi list sourceprovider
# 部署具体的数据源实例
drasi apply -f source.yaml
drasi apply --dry-run -f your-source.yaml