近些年,随着微服务的广泛使用,业务对系统的分布式事务处理能力的要求越来越高。
早期的基于XA协议的二阶段提交方案,将分布式事务的处理放在数据库驱动层,实现了对业务的无侵入,但是对数据的锁定时间很长,性能较低。
现在主流的TCC事务方案和SAGA事务方案,都是基于业务补偿机制,虽然没有全局锁,性能很高,但是一定程度上入侵了业务逻辑,增加了业务开发人员的开发时间和系统维护成本。
新兴的AT事务解决方案,例如Seata和Seata-golang,通过数据源代理层的资源管理器RM记录SQL回滚日志,跟随本地事务一起提交,大幅减少了数据的锁定时间,性能好且对业务几乎没有侵入。其缺点是支持的语言比较单一,例如Seata只支持Java语言类型的微服务,Seata-golang只支持Go语言类型的微服务。
为了突破AT事务对业务编程语言的限制,现在业界正在往DB Mesh的方向发展,通过将事务中间件部署在SideCar的方式,达到任何编程语言都能使用分布式事务中间件的效果。
DBPack是一个处理分布式事务的数据库代理,其能够拦截MySQL流量,生成对应的事务回滚镜像,通过与ETCD协调完成分布式事务,性能很高,且对业务没有入侵,能够自动补偿SQL操作,支持接入任何编程语言。DBPack还支持TCC事务模式,能够自动补偿HTTP请求。目前其demo已经有Java、Go、Python和PHP,TCC的sample也已经在路上了,demo示例可以关注dbpack-samples。
最新版DBPack不仅支持预处理的sql语句,还支持text类型的sql。DBPack最新版还兼容了php8的pdo_mysql扩展。Mysql 客户端在给用户发送 sql 执行结果时,如果执行没有异常,发送的第一个包为 OKPacket,该包中有一个标志位可以标识 sql 请求是否在一个事务中。如下图所示
这个包的内容为:
07 00 00 // 前 3 个字节表示 payload 的长度为 7 个字节
01 // sequence 响应的序号,前 4 个字节一起构成了 OKPacket 的 heade
00 // 标识 payload 为 OKPacket
00 // affected row
00 // last insert id
03 00 // 状态标志位
00 00 // warning 数量
dbpack 之前的版本将标志位设置为 0,java、golang、.net core、php 8.0 之前的 mysql driver 都能正确协调事务,php 8.0 的 pdo driver 会对标志位进行校验,所以 php 8.0 以上版本在使用 dbpack 协调分布式事务时,会抛出 transaction not active
异常。最新版本已经修复了这个问题。
下图是具体的DBPack事务流程图。
其事务流程简要描述如下:
本文将以PHP语言为例,详细介绍如何使用PHP对接DBPack完成分布式事务。实际使用其他语言时,对接过程也是类似的。
ETCD\_VER=v3.5.3
# choose either URL
GOOGLE\_URL=https://storage.googleapis.com/etcd
GITHUB\_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD\_URL=${GOOGLE\_URL}
rm -f /tmp/etcd-${ETCD\_VER}-linux-amd64.tar.gz
rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test
curl -L ${DOWNLOAD\_URL}/${ETCD\_VER}/etcd-${ETCD\_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD\_VER}-linux-amd64.tar.gz
tar xzvf /tmp/etcd-${ETCD\_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1
rm -f /tmp/etcd-${ETCD\_VER}-linux-amd64.tar.gz
/tmp/etcd-download-test/etcd --version
/tmp/etcd-download-test/etcdctl version
/tmp/etcd-download-test/etcdutl version
undo_log表用于存储本地事务的回滚镜像。
-- ----------------------------
-- Table structure for undo\_log
-- ----------------------------
DROP TABLE IF EXISTS `undo\_log`;
CREATE TABLE `undo\_log` (
`id` bigint(20) NOT NULL AUTO\_INCREMENT,
`branch\_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback\_info` longblob NOT NULL,
`log\_status` int(11) NOT NULL,
`log\_created` datetime NOT NULL,
`log\_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx\_unionkey` (`xid`,`branch\_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
# 更新distributed\_transaction.etcd\_config.endpoints
# 更新listeners配置项,调整为实际聚合层服务的地址和端口
# 更新filters配置项,配置聚合层服务的API endpoint
vim /path/to/your/aggregation-service/config-aggregation.yaml
# 更新distributed\_transaction.etcd\_config.endpoints
# 更新listeners配置项,配置业务数据库信息,包括dbpack代理的端口
# 更新data\_source\_cluster.dsn
vim /path/to/your/business-service/config-service.yaml
git clone git@github.com:cectc/dbpack.git
cd dbpack
# build on local env
make build-local
# build on production env
make build
./dist/dbpack start --config /path/to/your/config-aggregation.yaml
./dist/dbpack start --config /path/to/your/config-service.yaml
以Nginx为例,配置如下
server {
listen 3001; # 暴露的服务端口
index index.php index.html;
root /var/www/code/; # 业务代码根目录
location / {
try\_files $uri /index.php?$args;
}
location ~ \.php$ {
fastcgi\_split\_path\_info ^(.+\.php)(/.+)$;
fastcgi\_pass order-svc-app:9000; # php-fpm 端口
fastcgi\_index index.php;
include fastcgi\_params;
fastcgi\_param SCRIPT\_FILENAME $document\_root$fastcgi\_script\_name;
fastcgi\_param PATH\_INFO $fastcgi\_path\_info;
}
}
class AggregationSvc
{
public function CreateSo(string $xid, bool $rollback): bool
{
$createSoSuccess = $this->createSoRequest($xid);
if (!$createSoSuccess) {
return false;
}
$allocateInventorySuccess = $this->allocateInventoryRequest($xid);
if (!$allocateInventorySuccess) {
return false;
}
if ($rollback) {
return false;
}
return true;
}
// private function createSoRequest(string $xid) ...
// private function allocateInventoryRequest(string $xid) ...
}
$reqPath = strtok($\_SERVER["REQUEST\_URI"], '?');
$reaHeaders = getallheaders();
$xid = $reaHeaders['X-Dbpack-Xid'] ?? '';
if (empty($xid)) {
die('xid is not provided!');
}
$aggregationSvc = new AggregationSvc();
if ($\_SERVER['REQUEST\_METHOD'] === 'POST') {
switch ($reqPath) {
case '/v1/order/create':
if ($aggregationSvc->CreateOrder($xid, false)) {
responseOK();
} else {
responseError();
}
case '/v1/order/create2':
if ($aggregationSvc->CreateSo($xid, true)) {
responseOK();
} else {
responseError();
}
break;
default:
die('api not found');
}
}
function responseOK() {
http\_response\_code(200);
echo json\_encode([
'success' => true,
'message' => 'success',
]);
}
function responseError() {
http\_response\_code(400);
echo json\_encode([
'success' => false,
'message' => 'fail',
]);
}
class OrderDB
{
private PDO $\_connection;
private static OrderDB $\_instance;
private string $\_host = 'dbpack-order';
private int $\_port = 13308;
private string $\_username = 'dksl';
private string $\_password = '123456';
private string $\_database = 'order';
const insertSoMaster = "INSERT /\*+ XID('%s') \*/ INTO order.so\_master (sysno, so\_id, buyer\_user\_sysno, seller\_company\_code,
receive\_division\_sysno, receive\_address, receive\_zip, receive\_contact, receive\_contact\_phone, stock\_sysno,
payment\_type, so\_amt, status, order\_date, appid, memo) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,now(),?,?)";
const insertSoItem = "INSERT /\*+ XID('%s') \*/ INTO order.so\_item(sysno, so\_sysno, product\_sysno, product\_name, cost\_price,
original\_price, deal\_price, quantity) VALUES (?,?,?,?,?,?,?,?)";
public static function getInstance(): OrderDB
{
if (empty(self::$\_instance)) {
self::$\_instance = new self();
}
return self::$\_instance;
}
private function \_\_construct()
{
try {
$this->\_connection = new PDO(
"mysql:host=$this->\_host;port=$this->\_port;dbname=$this->\_database;charset=utf8",
$this->\_username,
$this->\_password,
[
PDO::ATTR\_PERSISTENT => true,
PDO::ATTR\_EMULATE\_PREPARES => false, // to let DBPack handle prepread sql
]
);
} catch (PDOException $e) {
die($e->getMessage());
}
}
private function \_\_clone()
{
}
public function getConnection(): PDO
{
return $this->\_connection;
}
public function createSo(string $xid, array $soMasters): bool
{
$this->getConnection()->setAttribute(PDO::ATTR\_ERRMODE, PDO::ERRMODE\_EXCEPTION);
try {
$this->getConnection()->beginTransaction();
foreach ($soMasters as $master) {
if (!$this->insertSo($xid, $master)) {
throw new PDOException("failed to insert soMaster");
}
}
$this->getConnection()->commit();
} catch (PDOException $e) {
$this->getConnection()->rollBack();
return false;
}
return true;
}
private function insertSo(string $xid, array $soMaster): bool
{
// insert into so\_master, so\_item ...
}
}
$reqPath = strtok($\_SERVER["REQUEST\_URI"], '?');
$reqHeaders = getallheaders();
$xid = $reqHeaders['Xid'] ?? '';
if (empty($xid)) {
die('xid is not provided!');
}
if ($\_SERVER['REQUEST\_METHOD'] === 'POST') {
if ($reqPath === '/createSo') {
$reqBody = file\_get\_contents('php://input');
$soMasters = json\_decode($reqBody, true);
$orderDB = OrderDB::getInstance();
$result = $orderDB->createSo($xid, $soMasters);
if ($result) {
responseOK();
} else {
responseError();
}
}
}
function responseOK() {
http\_response\_code(200);
echo json\_encode([
'success' => true,
'message' => 'success',
]);
}
function responseError() {
http\_response\_code(400);
echo json\_encode([
'success' => false,
'message' => 'fail',
]);
}
curl -X{HTTP Method} http://localhost:{DBPack监听的聚合层服务端口}/{聚合层服务的API endpoint}
mysql\_connect()
连接数据库(<=php5.4),在start transaction;
开始之后,后续的业务操作必须在同一个数据库连接上进行。insert /\*+ XID('%s') \*/ into xx ...;
卜贺贺。就职于日本楽天 Rakuten CNTD,任 Application Engineer,熟悉 AT 事务、Seata-golang 和 DBPack。GitHub:https://github.com/bohehe
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。