0 总结
AdaptiveExecutor是citus分布式执行器的入口
- TupleStore初始化
tuplestore_begin_heap
- 事务属性决策,是否需要2pc
DecideTransactionPropertiesForTaskList
- 分布式执行器创建、启动:初始化分布式执行器
CreateDistributedExecution
,维护任务列表、连接池。 - 任务调度与执行:
- 决策并行串行
ShouldRunTasksSequentially
- 任务分配
AssignTasksToConnectionsOrWorkerPool
- 找到分片
LookupTaskPlacementHostAndPort
(groupId=6 → 127.0.0.1:3002)。 - 连接复用
GetConnectionIfPlacementAccessedInXact
- 加入任务队列
workerPool->readyTaskQueue
- 找到分片
- 决策并行串行
- 动态管理连接池
ManageWorkerPool
概念
- shardid:逻辑分片,数据通过hash算法对应到一个分片上。
- placementid:逻辑分片的存储副本。
- 一个shardid可能对应多个placementid(由副本数shard_replication_factor决定)
- shard_replication_factor默认1,表示一个逻辑分片对应一个存储副本。
- placementid:每个placementid一定会保存在一个groupid上
- groupid:对应一个逻辑节点组,里面包含一个节点或者一个节点的主备。注意不会包含多个主节点。
postgres=# select * from pg_dist_node;
+--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------+
| nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards |
+--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------+
| 1 | 0 | 10.0.0.1 | 3002 | default | t | t | primary | default | t | f |
| 7 | 6 | 127.0.0.1 | 3002 | default | t | t | primary | default | t | t |
| 8 | 7 | 127.0.0.1 | 3003 | default | t | t | primary | default | t | t |
+--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------+
(3 rows)postgres=# select * from pg_dist_placement;
+-------------+---------+------------+-------------+---------+
| placementid | shardid | shardstate | shardlength | groupid |
+-------------+---------+------------+-------------+---------+
| 650 | 102651 | 1 | 0 | 6 |
| 651 | 102652 | 1 | 0 | 7 |
| 652 | 102653 | 1 | 0 | 6 |
| 653 | 102654 | 1 | 0 | 7 |
| 654 | 102655 | 1 | 0 | 6 |
| 655 | 102656 | 1 | 0 | 7 |
| 656 | 102657 | 1 | 0 | 6 |
| 657 | 102658 | 1 | 0 | 7 |
...
...
...
1 前言
citus无侵入式扩展做的非常赞,这样以一个插件的形式可以一直跟着内核主干走,不会有分叉的问题。
本篇尝试分析几部分内容:
- 插件挂入内核的位置。
- 插件中几个主要函数的大体功能总结。
- 一个简单的SELECT DQL查询分布式表的执行流程。
2 插件接入位置
2.1 hook
作为一个PG的插件,hook的位置肯定在_PG_init中了,citus的_PG_init如下:
(只保留hook相关)
void
_PG_init(void)
{
...
.../* intercept planner */planner_hook = distributed_planner;/* register for planner hook */set_rel_pathlist_hook = multi_relation_restriction_hook;get_relation_info_hook = multi_get_relation_info_hook;set_join_pathlist_hook = multi_join_restriction_hook;ExecutorStart_hook = CitusExecutorStart;ExecutorRun_hook = CitusExecutorRun;ExplainOneQuery_hook = CitusExplainOneQuery;prev_ExecutorEnd = ExecutorEnd_hook;ExecutorEnd_hook = CitusAttributeToEnd;/* register hook for error messages */original_emit_log_hook = emit_log_hook;emit_log_hook = multi_log_hook;/** Register hook for counting client backends that* are successfully authenticated.*/original_client_auth_hook = ClientAuthentication_hook;ClientAuthentication_hook = CitusAuthHook;prev_shmem_request_hook = shmem_request_hook;shmem_request_hook = citus_shmem_request;...
...PrevObjectAccessHook = object_access_hook;object_access_hook = CitusObjectAccessHook;...
...
}
优化器
- 分布式planner总入口:distributed_planner
- 查询优化阶段入口:
- set_rel_pathlist_hook = multi_relation_restriction_hook;
- 收集分布式表的一些信息,定制一些分布式的优化逻辑。
- get_relation_info_hook = multi_get_relation_info_hook;
- 解决分布式分区表索引兼容性问题。
- set_join_pathlist_hook = multi_join_restriction_hook;
- 捕获跨分片join操作的元数据,生成高效的分布式执行计划。
- set_rel_pathlist_hook = multi_relation_restriction_hook;
- 查询优化阶段入口:
分布式执行器
- ExecutorStart_hook = CitusExecutorStart;
- ExecutorRun_hook = CitusExecutorRun;
- 核心调度逻辑:将分布式计划拆分为子任务,通过异步连接池并行发送到Worker,管理结果合并。
- ExecutorEnd_hook = CitusAttributeToEnd;
- 清理执行器资源。
- ExplainOneQuery_hook = CitusExplainOneQuery;
- 增强EXPLAIN输出,展示分布式执行计划的物理分片路由、任务并行度等细节。
日志
- emit_log_hook = multi_log_hook;
- 覆盖PG日志输出,追加额外信息。
2.2 RegisterCitusCustomScanMethods
插件也注册了四种customscan:
void
RegisterCitusCustomScanMethods(void)
{RegisterCustomScanMethods(&AdaptiveExecutorCustomScanMethods);RegisterCustomScanMethods(&NonPushableInsertSelectCustomScanMethods);RegisterCustomScanMethods(&NonPushableMergeCommandCustomScanMethods);
}
Citus Adaptive
CustomScanMethods AdaptiveExecutorCustomScanMethods = {"Citus Adaptive",AdaptiveExecutorCreateScan
};static CustomExecMethods AdaptiveExecutorCustomExecMethods = {.CustomName = "AdaptiveExecutorScan",.BeginCustomScan = CitusBeginScan,.ExecCustomScan = CitusExecScan,.EndCustomScan = CitusEndScan,.ReScanCustomScan = CitusReScan,.ExplainCustomScan = CitusExplainScan
};
Citus INSERT … SELECT
CustomScanMethods NonPushableInsertSelectCustomScanMethods = {"Citus INSERT ... SELECT",NonPushableInsertSelectCreateScan
};static CustomExecMethods NonPushableInsertSelectCustomExecMethods = {.CustomName = "NonPushableInsertSelectScan",.BeginCustomScan = CitusBeginScan,.ExecCustomScan = NonPushableInsertSelectExecScan,.EndCustomScan = CitusEndScan,.ReScanCustomScan = CitusReScan,.ExplainCustomScan = NonPushableInsertSelectExplainScan
};
Citus MERGE INTO …
CustomScanMethods NonPushableMergeCommandCustomScanMethods = {"Citus MERGE INTO ...",NonPushableMergeCommandCreateScan
};static CustomExecMethods NonPushableMergeCommandCustomExecMethods = {.CustomName = "NonPushableMergeCommandScan",.BeginCustomScan = CitusBeginScan,.ExecCustomScan = NonPushableMergeCommandExecScan,.EndCustomScan = CitusEndScan,.ReScanCustomScan = CitusReScan,.ExplainCustomScan = NonPushableMergeCommandExplainScan
};
3 CitusExecutorRun:分布式点查
drop table events;
CREATE TABLE events (device_id bigint,event_id bigserial,event_time timestamptz default now(),data jsonb not null,PRIMARY KEY (device_id, event_id)
);SELECT create_distributed_table('events', 'device_id');INSERT INTO events (device_id, data)
SELECT s % 100, ('{"measurement":'||random()||'}')::jsonb FROM generate_series(1,100000) s;explain analyze select * from events where device_id in (11,22,33);
计划
postgres=# explain analyze select * from events where device_id in (11,22,33);
+----------------------------------------------------------------------------------------------------------------------------------+
| QUERY PLAN |
+----------------------------------------------------------------------------------------------------------------------------------+
| Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=56) (actual time=20.573..20.967 rows=3000 loops=1) |
| Task Count: 3 |
| Tuple data received from nodes: 177 kB |
| Tasks Shown: One of 3 |
| -> Task |
| Tuple data received from node: 59 kB |
| Node: host=127.0.0.1 port=3003 dbname=postgres |
| -> Seq Scan on events_102680 events (cost=0.00..76.25 rows=1000 width=63) (actual time=0.020..0.977 rows=1000 loops=1) |
| Filter: (device_id = ANY ('{11,22,33}'::bigint[])) |
| Rows Removed by Filter: 2000 |
| Planning Time: 0.081 ms |
| Execution Time: 1.161 ms |
| Planning Time: 0.517 ms |
| Execution Time: 21.575 ms |
+----------------------------------------------------------------------------------------------------------------------------------+
(14 rows)
3.1 distributed_planner生成的计划
3.2 执行过程
InitPlan阶段 → CitusBeginScan
CitusBeginScan......ExecInitResultSlot(&scanState->customScanState.ss.ps, &TTSOpsMinimalTuple);ExecInitScanTupleSlot(node->ss.ps.state, &node->ss, node->ss.ps.scandesc,&TTSOpsMinimalTuple);ExecAssignScanProjectionInfoWithVarno(&node->ss, INDEX_VAR);......
原来是TTSOpsVirtual重新初始化为minial。
ExecutorRun阶段 → CitusExecScan
- CitusPreExecScan
- ExecuteSubPlans
- standard_ExecutorRun
- ExecCustomScan
- CitusExecScan
- AdaptiveExecutor:citus的自适应执行器
- tuplestore_begin_heap:创建tts
- DecideTransactionPropertiesForTaskList:当前执行SQL的事务要求
- xactProperties=
{errorOnAnyFailure = false, useRemoteTransactionBlocks = TRANSACTION_BLOCKS_ALLOWED, requires2PC = false}
- xactProperties=
- CreateDistributedExecution:创建分布式执行器
- StartDistributedExecution:初始化分布式执行器
- RecordParallelRelationAccessForTaskList:记录分布式下访问的表
- EnsureTaskExecutionAllowed
- ShouldRunTasksSequentially:确认是否并行?
- RunDistributedExecution:开始运行
- AssignTasksToConnectionsOrWorkerPool:Citus分布式执行引擎的任务调度中枢
- foreach_ptr(task, taskList):3个任务需要分配
- 初始化ShardCommandExecution结构
- taskPlacement放在哪里执行,taskPlacement =
{type = {extensible = {type = T_ExtensibleNode, extnodename = 0x7f2a86ca5eca "ShardPlacement"}, citus_tag = T_ShardPlacement}, placementId = 680, shardId = 102681, shardLength = 0, groupId = 6, nodeName = 0x1ebfb38 "127.0.0.1", nodePort = 3002, nodeId = 7, partitionMethod = 104 'h', colocationGroupId = 13, representativeValue = 1879048192}
- LookupTaskPlacementHostAndPort:拿到一个连接, LookupNodeForGroup:使用groupId去拿一个worker node(下一篇分析这里)
- 初始化TaskPlacementExecution
- 任务加入workerPool->readyTaskQueue
- foreach_ptr(task, taskList):3个任务需要分配
- ManageWorkerPool:根据任务负载动态调整连接池规模(需要展开分析)
- ProcessWaitEvents:等任务都跑完开始处理(需要展开分析)
- AssignTasksToConnectionsOrWorkerPool:Citus分布式执行引擎的任务调度中枢
- FinishDistributedExecution:结束执行
- ReturnTupleFromTuplestore
- AdaptiveExecutor:citus的自适应执行器
- CitusExecScan
- ExecCustomScan
taskList内容:Citus定义的Task结构
(gdb) tr Task 0x1eb8b98
$41 = {type = {extensible = {type = T_ExtensibleNode,extnodename = 0x7f2a86ca5ea1 "Task"},citus_tag = T_Task},taskType = READ_TASK,jobId = 27142733692931,taskId = 1,taskQuery = {queryType = TASK_QUERY_TEXT,data = {jobQueryReferenceForLazyDeparsing = 0x1e787e8,queryStringLazy = 0x1e787e8 "SELECT device_id, event_id, event_time, data FROM public.events_102681 events WHERE (device_id OPERATOR(pg_catalog.=) ANY ('{11,22,33}'::bigint[]))",queryStringList = 0x1e787e8}},queryCount = 1,anchorDistributedTableId = 0,anchorShardId = 102681,taskPlacementList = 0x1ec04b8,dependentTaskList = 0x0,partitionId = 0,upstreamTaskId = 0,shardInterval = 0x0,assignmentConstrained = false,mapJobTargetList = 0x0,replicationModel = 105 'i',relationRowLockList = 0x0,modifyWithSubquery = false,relationShardList = 0x1eb8cf8,rowValuesLists = 0x0,partiallyLocalOrRemote = false,parametersInQueryStringResolved = true,tupleDest = 0x0,totalReceivedTupleData = 0,fetchedExplainAnalyzePlan = 0x0,fetchedExplainAnalyzePlacementIndex = 0,fetchedExplainAnalyzeExecutionDuration = 0,isLocalTableModification = false,cannotBeExecutedInTransaction = false,partitionKeyValue = 0x0,colocationId = 0
}(gdb) tr Task 0x1ebd788
$42 = {type = {extensible = {type = T_ExtensibleNode,extnodename = 0x7f2a86ca5ea1 "Task"},citus_tag = T_Task},taskType = READ_TASK,jobId = 27142733692931,taskId = 3,taskQuery = {queryType = TASK_QUERY_TEXT,data = {jobQueryReferenceForLazyDeparsing = 0x1ebc6b0,queryStringLazy = 0x1ebc6b0 "SELECT device_id, event_id, event_time, data FROM public.events_102654 events WHERE (device_id OPERATOR(pg_catalog.=) ANY ('{11,22,33}'::bigint[]))",queryStringList = 0x1ebc6b0}},queryCount = 1,anchorDistributedTableId = 0,anchorShardId = 102654,taskPlacementList = 0x1ec0558,dependentTaskList = 0x0,partitionId = 0,upstreamTaskId = 0,shardInterval = 0x0,assignmentConstrained = false,mapJobTargetList = 0x0,replicationModel = 105 'i',relationRowLockList = 0x0,modifyWithSubquery = false,relationShardList = 0x1ebd8e8,rowValuesLists = 0x0,partiallyLocalOrRemote = false,parametersInQueryStringResolved = true,tupleDest = 0x0,totalReceivedTupleData = 0,fetchedExplainAnalyzePlan = 0x0,fetchedExplainAnalyzePlacementIndex = 0,fetchedExplainAnalyzeExecutionDuration = 0,isLocalTableModification = false,cannotBeExecutedInTransaction = false,partitionKeyValue = 0x0,colocationId = 0
}(gdb) tr Task 0x1ebb1b8
$43 = {type = {extensible = {type = T_ExtensibleNode,extnodename = 0x7f2a86ca5ea1 "Task"},citus_tag = T_Task},taskType = READ_TASK,jobId = 27142733692931,taskId = 2,taskQuery = {queryType = TASK_QUERY_TEXT,data = {jobQueryReferenceForLazyDeparsing = 0x1eba0e0,queryStringLazy = 0x1eba0e0 "SELECT device_id, event_id, event_time, data FROM public.events_102680 events WHERE (device_id OPERATOR(pg_catalog.=) ANY ('{11,22,33}'::bigint[]))",queryStringList = 0x1eba0e0}},queryCount = 1,anchorDistributedTableId = 0,anchorShardId = 102680,taskPlacementList = 0x1ec05a8,dependentTaskList = 0x0,partitionId = 0,upstreamTaskId = 0,shardInterval = 0x0,assignmentConstrained = false,mapJobTargetList = 0x0,replicationModel = 105 'i',relationRowLockList = 0x0,modifyWithSubquery = false,relationShardList = 0x1ebb318,rowValuesLists = 0x0,partiallyLocalOrRemote = false,parametersInQueryStringResolved = true,tupleDest = 0x0,totalReceivedTupleData = 0,fetchedExplainAnalyzePlan = 0x0,fetchedExplainAnalyzePlacementIndex = 0,fetchedExplainAnalyzeExecutionDuration = 0,isLocalTableModification = false,cannotBeExecutedInTransaction = false,partitionKeyValue = 0x0,colocationId = 0
}