欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > Citus源码(2)分布式读流程分析与基础概念梳理(shardid、placementid、groupid)

Citus源码(2)分布式读流程分析与基础概念梳理(shardid、placementid、groupid)

2025/4/7 7:44:39 来源:https://blog.csdn.net/jackgo73/article/details/146840978  浏览:    关键词:Citus源码(2)分布式读流程分析与基础概念梳理(shardid、placementid、groupid)

0 总结

AdaptiveExecutor是citus分布式执行器的入口

  1. TupleStore初始化tuplestore_begin_heap
  2. 事务属性决策,是否需要2pcDecideTransactionPropertiesForTaskList
  3. 分布式执行器创建、启动:初始化分布式执行器CreateDistributedExecution,维护任务列表、连接池
  4. 任务调度与执行:
    • 决策并行串行ShouldRunTasksSequentially
    • 任务分配AssignTasksToConnectionsOrWorkerPool
      • 找到分片LookupTaskPlacementHostAndPort(groupId=6 → 127.0.0.1:3002)。
      • 连接复用GetConnectionIfPlacementAccessedInXact
      • 加入任务队列workerPool->readyTaskQueue
  5. 动态管理连接池ManageWorkerPool

概念

  1. shardid:逻辑分片,数据通过hash算法对应到一个分片上。
  2. placementid:逻辑分片的存储副本。
    • 一个shardid可能对应多个placementid(由副本数shard_replication_factor决定)
    • shard_replication_factor默认1,表示一个逻辑分片对应一个存储副本。
  3. placementid:每个placementid一定会保存在一个groupid上
  4. groupid:对应一个逻辑节点组,里面包含一个节点或者一个节点的主备。注意不会包含多个主节点
postgres=# select * from pg_dist_node;
+--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------+
| nodeid | groupid | nodename  | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards |
+--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------+
|      1 |       0 | 10.0.0.1  |     3002 | default  | t           | t        | primary  | default     | t              | f                |
|      7 |       6 | 127.0.0.1 |     3002 | default  | t           | t        | primary  | default     | t              | t                |
|      8 |       7 | 127.0.0.1 |     3003 | default  | t           | t        | primary  | default     | t              | t                |
+--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------+
(3 rows)postgres=# select * from pg_dist_placement;
+-------------+---------+------------+-------------+---------+
| placementid | shardid | shardstate | shardlength | groupid |
+-------------+---------+------------+-------------+---------+
|         650 |  102651 |          1 |           0 |       6 |
|         651 |  102652 |          1 |           0 |       7 |
|         652 |  102653 |          1 |           0 |       6 |
|         653 |  102654 |          1 |           0 |       7 |
|         654 |  102655 |          1 |           0 |       6 |
|         655 |  102656 |          1 |           0 |       7 |
|         656 |  102657 |          1 |           0 |       6 |
|         657 |  102658 |          1 |           0 |       7 |
...
...
...

1 前言

citus无侵入式扩展做的非常赞,这样以一个插件的形式可以一直跟着内核主干走,不会有分叉的问题。

本篇尝试分析几部分内容:

  1. 插件挂入内核的位置。
  2. 插件中几个主要函数的大体功能总结。
  3. 一个简单的SELECT DQL查询分布式表的执行流程。

2 插件接入位置

2.1 hook

作为一个PG的插件,hook的位置肯定在_PG_init中了,citus的_PG_init如下:

(只保留hook相关)

void
_PG_init(void)
{
...
.../* intercept planner */planner_hook = distributed_planner;/* register for planner hook */set_rel_pathlist_hook = multi_relation_restriction_hook;get_relation_info_hook = multi_get_relation_info_hook;set_join_pathlist_hook = multi_join_restriction_hook;ExecutorStart_hook = CitusExecutorStart;ExecutorRun_hook = CitusExecutorRun;ExplainOneQuery_hook = CitusExplainOneQuery;prev_ExecutorEnd = ExecutorEnd_hook;ExecutorEnd_hook = CitusAttributeToEnd;/* register hook for error messages */original_emit_log_hook = emit_log_hook;emit_log_hook = multi_log_hook;/** Register hook for counting client backends that* are successfully authenticated.*/original_client_auth_hook = ClientAuthentication_hook;ClientAuthentication_hook = CitusAuthHook;prev_shmem_request_hook = shmem_request_hook;shmem_request_hook = citus_shmem_request;...
...PrevObjectAccessHook = object_access_hook;object_access_hook = CitusObjectAccessHook;...
...
}

优化器

  • 分布式planner总入口:distributed_planner
    • 查询优化阶段入口:
      • set_rel_pathlist_hook = multi_relation_restriction_hook;
        • 收集分布式表的一些信息,定制一些分布式的优化逻辑。
      • get_relation_info_hook = multi_get_relation_info_hook;
        • 解决分布式分区表索引兼容性问题。
      • set_join_pathlist_hook = multi_join_restriction_hook;
        • 捕获跨分片join操作的元数据,生成高效的分布式执行计划。

分布式执行器

  • ExecutorStart_hook = CitusExecutorStart;
  • ExecutorRun_hook = CitusExecutorRun;
    • 核心调度逻辑:将分布式计划拆分为子任务,通过异步连接池并行发送到Worker,管理结果合并。
  • ExecutorEnd_hook = CitusAttributeToEnd;
    • 清理执行器资源。
  • ExplainOneQuery_hook = CitusExplainOneQuery;
    • 增强EXPLAIN输出,展示分布式执行计划的物理分片路由、任务并行度等细节。

日志

  • emit_log_hook = multi_log_hook;
    • 覆盖PG日志输出,追加额外信息。

2.2 RegisterCitusCustomScanMethods

插件也注册了四种customscan:

void
RegisterCitusCustomScanMethods(void)
{RegisterCustomScanMethods(&AdaptiveExecutorCustomScanMethods);RegisterCustomScanMethods(&NonPushableInsertSelectCustomScanMethods);RegisterCustomScanMethods(&NonPushableMergeCommandCustomScanMethods);
}

Citus Adaptive

CustomScanMethods AdaptiveExecutorCustomScanMethods = {"Citus Adaptive",AdaptiveExecutorCreateScan
};static CustomExecMethods AdaptiveExecutorCustomExecMethods = {.CustomName = "AdaptiveExecutorScan",.BeginCustomScan = CitusBeginScan,.ExecCustomScan = CitusExecScan,.EndCustomScan = CitusEndScan,.ReScanCustomScan = CitusReScan,.ExplainCustomScan = CitusExplainScan
};

Citus INSERT … SELECT

CustomScanMethods NonPushableInsertSelectCustomScanMethods = {"Citus INSERT ... SELECT",NonPushableInsertSelectCreateScan
};static CustomExecMethods NonPushableInsertSelectCustomExecMethods = {.CustomName = "NonPushableInsertSelectScan",.BeginCustomScan = CitusBeginScan,.ExecCustomScan = NonPushableInsertSelectExecScan,.EndCustomScan = CitusEndScan,.ReScanCustomScan = CitusReScan,.ExplainCustomScan = NonPushableInsertSelectExplainScan
};

Citus MERGE INTO …

CustomScanMethods NonPushableMergeCommandCustomScanMethods = {"Citus MERGE INTO ...",NonPushableMergeCommandCreateScan
};static CustomExecMethods NonPushableMergeCommandCustomExecMethods = {.CustomName = "NonPushableMergeCommandScan",.BeginCustomScan = CitusBeginScan,.ExecCustomScan = NonPushableMergeCommandExecScan,.EndCustomScan = CitusEndScan,.ReScanCustomScan = CitusReScan,.ExplainCustomScan = NonPushableMergeCommandExplainScan
};

3 CitusExecutorRun:分布式点查

drop table events;
CREATE TABLE events (device_id bigint,event_id bigserial,event_time timestamptz default now(),data jsonb not null,PRIMARY KEY (device_id, event_id)
);SELECT create_distributed_table('events', 'device_id');INSERT INTO events (device_id, data)
SELECT s % 100, ('{"measurement":'||random()||'}')::jsonb FROM generate_series(1,100000) s;explain analyze select * from events where device_id in (11,22,33);

计划

postgres=# explain analyze select * from events where device_id in (11,22,33);
+----------------------------------------------------------------------------------------------------------------------------------+
|                                                            QUERY PLAN                                                            |
+----------------------------------------------------------------------------------------------------------------------------------+
| Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=56) (actual time=20.573..20.967 rows=3000 loops=1)              |
|   Task Count: 3                                                                                                                  |
|   Tuple data received from nodes: 177 kB                                                                                         |
|   Tasks Shown: One of 3                                                                                                          |
|   ->  Task                                                                                                                       |
|         Tuple data received from node: 59 kB                                                                                     |
|         Node: host=127.0.0.1 port=3003 dbname=postgres                                                                           |
|         ->  Seq Scan on events_102680 events  (cost=0.00..76.25 rows=1000 width=63) (actual time=0.020..0.977 rows=1000 loops=1) |
|               Filter: (device_id = ANY ('{11,22,33}'::bigint[]))                                                                 |
|               Rows Removed by Filter: 2000                                                                                       |
|             Planning Time: 0.081 ms                                                                                              |
|             Execution Time: 1.161 ms                                                                                             |
| Planning Time: 0.517 ms                                                                                                          |
| Execution Time: 21.575 ms                                                                                                        |
+----------------------------------------------------------------------------------------------------------------------------------+
(14 rows)

3.1 distributed_planner生成的计划

请添加图片描述

3.2 执行过程

InitPlan阶段 → CitusBeginScan

CitusBeginScan......ExecInitResultSlot(&scanState->customScanState.ss.ps, &TTSOpsMinimalTuple);ExecInitScanTupleSlot(node->ss.ps.state, &node->ss, node->ss.ps.scandesc,&TTSOpsMinimalTuple);ExecAssignScanProjectionInfoWithVarno(&node->ss, INDEX_VAR);......

原来是TTSOpsVirtual重新初始化为minial。
在这里插入图片描述

ExecutorRun阶段 → CitusExecScan

  • CitusPreExecScan
    • ExecuteSubPlans
    • standard_ExecutorRun
      • ExecCustomScan
        • CitusExecScan
          • AdaptiveExecutor:citus的自适应执行器
            • tuplestore_begin_heap:创建tts
            • DecideTransactionPropertiesForTaskList:当前执行SQL的事务要求
              • xactProperties= {errorOnAnyFailure = false, useRemoteTransactionBlocks = TRANSACTION_BLOCKS_ALLOWED, requires2PC = false}
            • CreateDistributedExecution:创建分布式执行器
            • StartDistributedExecution:初始化分布式执行器
              • RecordParallelRelationAccessForTaskList:记录分布式下访问的表
              • EnsureTaskExecutionAllowed
            • ShouldRunTasksSequentially:确认是否并行?
            • RunDistributedExecution:开始运行
              • AssignTasksToConnectionsOrWorkerPool:Citus分布式执行引擎的任务调度中枢
                • foreach_ptr(task, taskList):3个任务需要分配
                  • 初始化ShardCommandExecution结构
                  • taskPlacement放在哪里执行,taskPlacement = {type = {extensible = {type = T_ExtensibleNode, extnodename = 0x7f2a86ca5eca "ShardPlacement"}, citus_tag = T_ShardPlacement}, placementId = 680, shardId = 102681, shardLength = 0, groupId = 6, nodeName = 0x1ebfb38 "127.0.0.1", nodePort = 3002, nodeId = 7, partitionMethod = 104 'h', colocationGroupId = 13, representativeValue = 1879048192}
                  • LookupTaskPlacementHostAndPort:拿到一个连接, LookupNodeForGroup:使用groupId去拿一个worker node(下一篇分析这里)
                  • 初始化TaskPlacementExecution
                  • 任务加入workerPool->readyTaskQueue
              • ManageWorkerPool:根据任务负载动态调整连接池规模(需要展开分析)
              • ProcessWaitEvents:等任务都跑完开始处理(需要展开分析)
            • FinishDistributedExecution:结束执行
          • ReturnTupleFromTuplestore

taskList内容:Citus定义的Task结构

(gdb)  tr Task 0x1eb8b98
$41 = {type = {extensible = {type = T_ExtensibleNode,extnodename = 0x7f2a86ca5ea1 "Task"},citus_tag = T_Task},taskType = READ_TASK,jobId = 27142733692931,taskId = 1,taskQuery = {queryType = TASK_QUERY_TEXT,data = {jobQueryReferenceForLazyDeparsing = 0x1e787e8,queryStringLazy = 0x1e787e8 "SELECT device_id, event_id, event_time, data FROM public.events_102681 events WHERE (device_id OPERATOR(pg_catalog.=) ANY ('{11,22,33}'::bigint[]))",queryStringList = 0x1e787e8}},queryCount = 1,anchorDistributedTableId = 0,anchorShardId = 102681,taskPlacementList = 0x1ec04b8,dependentTaskList = 0x0,partitionId = 0,upstreamTaskId = 0,shardInterval = 0x0,assignmentConstrained = false,mapJobTargetList = 0x0,replicationModel = 105 'i',relationRowLockList = 0x0,modifyWithSubquery = false,relationShardList = 0x1eb8cf8,rowValuesLists = 0x0,partiallyLocalOrRemote = false,parametersInQueryStringResolved = true,tupleDest = 0x0,totalReceivedTupleData = 0,fetchedExplainAnalyzePlan = 0x0,fetchedExplainAnalyzePlacementIndex = 0,fetchedExplainAnalyzeExecutionDuration = 0,isLocalTableModification = false,cannotBeExecutedInTransaction = false,partitionKeyValue = 0x0,colocationId = 0
}(gdb) tr Task 0x1ebd788
$42 = {type = {extensible = {type = T_ExtensibleNode,extnodename = 0x7f2a86ca5ea1 "Task"},citus_tag = T_Task},taskType = READ_TASK,jobId = 27142733692931,taskId = 3,taskQuery = {queryType = TASK_QUERY_TEXT,data = {jobQueryReferenceForLazyDeparsing = 0x1ebc6b0,queryStringLazy = 0x1ebc6b0 "SELECT device_id, event_id, event_time, data FROM public.events_102654 events WHERE (device_id OPERATOR(pg_catalog.=) ANY ('{11,22,33}'::bigint[]))",queryStringList = 0x1ebc6b0}},queryCount = 1,anchorDistributedTableId = 0,anchorShardId = 102654,taskPlacementList = 0x1ec0558,dependentTaskList = 0x0,partitionId = 0,upstreamTaskId = 0,shardInterval = 0x0,assignmentConstrained = false,mapJobTargetList = 0x0,replicationModel = 105 'i',relationRowLockList = 0x0,modifyWithSubquery = false,relationShardList = 0x1ebd8e8,rowValuesLists = 0x0,partiallyLocalOrRemote = false,parametersInQueryStringResolved = true,tupleDest = 0x0,totalReceivedTupleData = 0,fetchedExplainAnalyzePlan = 0x0,fetchedExplainAnalyzePlacementIndex = 0,fetchedExplainAnalyzeExecutionDuration = 0,isLocalTableModification = false,cannotBeExecutedInTransaction = false,partitionKeyValue = 0x0,colocationId = 0
}(gdb) tr Task 0x1ebb1b8
$43 = {type = {extensible = {type = T_ExtensibleNode,extnodename = 0x7f2a86ca5ea1 "Task"},citus_tag = T_Task},taskType = READ_TASK,jobId = 27142733692931,taskId = 2,taskQuery = {queryType = TASK_QUERY_TEXT,data = {jobQueryReferenceForLazyDeparsing = 0x1eba0e0,queryStringLazy = 0x1eba0e0 "SELECT device_id, event_id, event_time, data FROM public.events_102680 events WHERE (device_id OPERATOR(pg_catalog.=) ANY ('{11,22,33}'::bigint[]))",queryStringList = 0x1eba0e0}},queryCount = 1,anchorDistributedTableId = 0,anchorShardId = 102680,taskPlacementList = 0x1ec05a8,dependentTaskList = 0x0,partitionId = 0,upstreamTaskId = 0,shardInterval = 0x0,assignmentConstrained = false,mapJobTargetList = 0x0,replicationModel = 105 'i',relationRowLockList = 0x0,modifyWithSubquery = false,relationShardList = 0x1ebb318,rowValuesLists = 0x0,partiallyLocalOrRemote = false,parametersInQueryStringResolved = true,tupleDest = 0x0,totalReceivedTupleData = 0,fetchedExplainAnalyzePlan = 0x0,fetchedExplainAnalyzePlacementIndex = 0,fetchedExplainAnalyzeExecutionDuration = 0,isLocalTableModification = false,cannotBeExecutedInTransaction = false,partitionKeyValue = 0x0,colocationId = 0
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词