PG中的行锁在上一片中做了分析《Postgresql源码(131)行锁的原理分析》,本篇对分布式PG(PGXL)中的行锁做一些分析。(版本:Postgres-XL 10alpha2)
分布式PG中的计划生成有两个入口:
pgxc_planner
result = pgxc_FQS_planner(query, cursorOptions, boundParams);
if (result) return result;
result = standard_planner(query, cursorOptions, boundParams);
return result;
查询首先通过pgxc_FQS_planner看是否适合快速分发。如果不适合,会继续走standard_planner。
XL默认对行锁的SQL不能走FQS,这里为了简单介绍下FQS用了一个点查的例子。
用例
drop table TBL_33;
create table TBL_33(c3 int);
insert into TBL_33 values(0);
SELECT c3 FROM TBL_33 WHERE c3=0;;
分布式执行计划
explain SELECT c3 FROM TBL_33 WHERE c3=0;
QUERY PLAN
---------------------------------------------------------------
Remote Fast Query Execution (cost=0.00..0.00 rows=0 width=0)
Node/s: datanode_2
-> Seq Scan on tbl_33 (cost=0.00..41.88 rows=13 width=4)
Filter: (c3 = 0)
pgxc_FQS_planner
static PlannedStmt *
pgxc_FQS_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
{
if (!enable_fast_query_shipping)
return NULL;
if (cursorOptions & CURSOR_OPT_SCROLL)
return NULL;
if (query->utilityStmt && IsA(query->utilityStmt, RemoteQuery))
{
RemoteQuery *stmt = (RemoteQuery *) query->utilityStmt;
if (stmt->exec_direct_type != EXEC_DIRECT_NONE)
return NULL;
}
exec_nodes = pgxc_is_query_shippable(query, 0);
if (exec_nodes == NULL)
return NULL;
glob = makeNode(PlannerGlobal);
glob->boundParams = boundParams;
root = makeNode(PlannerInfo);
root->parse = query;
root->glob = glob;
root->query_level = 1;
root->planner_cxt = CurrentMemoryContext;
top_plan = (Plan *)pgxc_FQS_create_remote_plan(query, exec_nodes, false);
top_plan = set_plan_references(root, top_plan);
result = makeNode(PlannedStmt);
result->commandType = query->commandType;
result->canSetTag = query->canSetTag;
result->utilityStmt = query->utilityStmt;
if (query->commandType != CMD_SELECT)
result->resultRelations = list_make1_int(query->resultRelation);
result->planTree = top_plan;
result->rtable = query->rtable;
result->queryId = query->queryId;
result->relationOids = glob->relationOids;
result->invalItems = glob->invalItems;
return result;
}
回到行锁用例上:
drop table TBL_33;
create table TBL_33(c33 int);
insert into TBL_33 values(0);
SELECT c33 FROM TBL_33 WHERE c33=0 for update;
分布式执行计划
explain SELECT c33 FROM TBL_33 WHERE c33=0 for update;
QUERY PLAN
-------------------------------------------------------------------------------
Remote Subquery Scan on all (datanode_2) (cost=0.00..42.01 rows=13 width=10)
-> LockRows (cost=0.00..42.01 rows=13 width=10)
-> Seq Scan on tbl_33 (cost=0.00..41.88 rows=13 width=10)
Filter: (c33 = 0)
subquery_planner生成计划:
standard_planner → make_remotesubplan
standard_planner
...
best_path = get_cheapest_fractional_path(final_rel, tuple_fraction);
if (!root->distribution)
root->distribution = best_path->distribution;
top_plan = create_plan(root, best_path);
if (root->distribution)
top_plan = (Plan *) make_remotesubplan(root, top_plan, NULL, root->distribution, root->sort_pathkeys);
explain SELECT c33 FROM TBL_33 WHERE c33=0 for update;
QUERY PLAN
-------------------------------------------------------------------------------
Remote Subquery Scan on all (datanode_2) (cost=0.00..42.01 rows=13 width=10)
-> LockRows (cost=0.00..42.01 rows=13 width=10)
-> Seq Scan on tbl_33 (cost=0.00..41.88 rows=13 width=10)
Filter: (c33 = 0)
SELECT c33 FROM TBL_33 WHERE c33=0 for update;
执行时会生成两个算子:
Path *
create_seqscan_path(PlannerInfo *root, RelOptInfo *rel,
Relids required_outer, int parallel_workers)
{
Path *pathnode = makeNode(Path);
pathnode->pathtype = T_SeqScan;
pathnode->parent = rel;
pathnode->pathtarget = rel->reltarget;
pathnode->param_info = get_baserel_parampathinfo(root, rel,
required_outer);
pathnode->parallel_aware = parallel_workers > 0 ? true : false;
pathnode->parallel_safe = rel->consider_parallel;
pathnode->parallel_workers = parallel_workers;
pathnode->pathkeys = NIL; /* seqscan has unordered result */
#ifdef XCP
set_scanpath_distribution(root, rel, pathnode);
if (rel->baserestrictinfo)
{
ListCell *lc;
foreach (lc, rel->baserestrictinfo)
{
RestrictInfo *ri = (RestrictInfo *) lfirst(lc);
restrict_distribution(root, ri, pathnode);
}
}
#endif
cost_seqscan(pathnode, root, rel, pathnode->param_info);
return pathnode;
}
pathnode->distribution->nodes
标记了dn0、dn1。p/t pathnode->distribution->nodes->words[0] = 11
pathnode->distribution->nodesrestrictNodes
只标记了datanode1。p/t pathnode->distribution->restrictNodes->words[0] = 10
p *pathnode->distribution
$27 = {type = T_Distribution, distributionType = 72 'H', distributionExpr = 0x135fea8, nodes = 0x1360650, restrictNodes = 0x1360898}
(gdb) p/t pathnode->distribution->nodes->words[0]
$31 = 11
(gdb) p/t pathnode->distribution->restrictNodes->words[0]
$30 = 10
LockRowsPath *
create_lockrows_path(PlannerInfo *root, RelOptInfo *rel,
Path *subpath, List *rowMarks, int epqParam)
{
LockRowsPath *pathnode = makeNode(LockRowsPath);
pathnode->path.pathtype = T_LockRows;
...
...
pathnode->path.distribution = copyObject(subpath->distribution);
...
...
return pathnode;
}