本节将通过几个典型的场景案例说明 Dataway 在腾讯云数据连接器平台中应用。
案例一:使用 Dataway 获取 token
腾讯云数据连接器中封装企业微信连接器,可以完成成员管理、消息推送等多种操作,每次操作本质上都是调用 企业微信 API 完成的。在调用 API 前,需要获取 access_token。相当于创建一个登录凭证,其他的业务接口都依赖 access_token 鉴权调用者身份。
本示例将模拟该调用过程,并说明 Dataway 脚本在其中起到的作用。
上图为整个流的配置图,其中 Transform 组件使用 Dataway 表达式设置 corpId 和 corpSecret 两个 variable 参数。
HTTP Request 组件向企业微信 API 地址发起 get-token 请求调用,请求参数为 msg.vars 中的 corpId 和 corpSecret。在 HTTP Request 组件的请求参数中,使用 Dataway 表达式进行动态设置。
Set-Variable 组件将获取到 HTTP response 解析,获取 access_token 后存到 msg.vars 中,使用 Dataway 动态赋值。
Cache 组件通过键值对保存 access_token,以方便下次无需请求直接从缓存中获取。Cache 的数据使用 Dataway 表达式获取。
实际上企业微信连接器使用 XML 语言完成 get-token,本示例为了方便查看在流中复现该操作。Dataway 表达式在其中主要的作用是通过操作流转消息 Message,实现动态赋值,并交给上层调用组件使用。
案例二:将 JSON 类型输入转换成 XML 类型输入
使用 Dataway 表达式可以完成数据类型的转换,例如:JSON 转换成 XML、XML 转换成 CSV 等。本示例将通过一个将 JSON 类型的输入转换成 XML 的示例,说明 Dataway 的数据转换作用。
输入
我们在流中预先使用 Set Payload 组件对 Message 的 payload 属性设置成封装了 JSON 格式的 Entity 数据。
Dataway 表达式
在 Set Payload 组件后新增 Transform 组件,使用 Dataway 表达式重新设置 Message 消息的 payload。
def dw_process(msg):input_param = msg.payload['^value']return Entity.from_value({'root': {'name': input_param['name'],'age': input_param['age'],'male': input_param['male'],'brothers': input_param['brothers'],"@id": "haha"}}, mime_type="application/xml")
输出
Transform 组件将 Message 的 payload 设置为 Entity 类型,使用 Entity['^blob'] 获取二进制数据并解码,可以查看输出的 XML 结构字符串,如下所示:
<?xml version="1.0" encoding="utf-8"?><root id="haha"><name>zhangsan</name><age>12</age><male>true</male><brothers>lisi</brothers><brothers>wangwu</brothers></root>
案例三:Dataway 处理流式数据
流式数据指数据从流的开始到结束,过程是流式的,数据不需要全读入内存。这样的好处是能支持大文件且可以做到实时处理,因此在腾讯云数据连接器中常用于实时、格式化的处理大数据。
本示例将说明如何读取流式的输入和产生流式的输出。
产生流式输出
使用 Dataway 表达式可以产生流式的输出。在 Set Payload 组件中使用如下 Dataway 表达式:
Dataway 表达式
def dw_process(msg):for x in range(10):yield x
输出
Dataway 表达式将产生一个流式对象,并设置到 payload 中,供下游使用。
读取流式输入
使用 Dataway 表达式可以读取上一步产生的流式输入。msg.payload 为一个流式对象,可以使用 for 循环读取该流式对象,并进行数据处理。
Dataway 表达式
def dw_process(msg):sum = 0for x in msg.payload:sum += xreturn sum
输出
Dataway 表达式使用流式处理输入数据,并最终返回
45
。