目录
- 前言
- 核心概念
- 路由&扩容原理
- 资源倾斜问题
- 代码实现
- 基础实现
- 资源倾斜问题
- 整合Spring使用(以分库分表为例)
- 一致性hash工具
- hash路由配置
- 测试
- 数据迁移方案
前言
一致性哈希(Consistent Hashing)是分布式系统中常用的数据分片技术,将数据分布到一个哈希环上,,它能在节点增减时最小化数据迁移量,哈希环的大小为2的32次方-1;
通过这种一致性哈希的分库分表方案,可以实现数据的均匀分布和平滑扩容,对于水平扩展的业务场景;在实现扩容,会伴随数据的迁移,通过一致性Hash,环实现对旧数据影响最小。
核心概念
- 哈希环:将哈希空间组织成一个环形结构。
- 虚拟节点:每个物理节点对应多个虚拟节点,实现更均匀的分布。
- 数据路由:数据键的哈希值在环上顺时针找到的第一个节点即为存储节点。
- 扩容处理:新增节点时只影响相邻节点的数据,相比于取模方式,影响范围小。
路由&扩容原理
1、计算节点的hash值,比如存在node1、node2、node3,分别在服务器A/B/C上,并且每个节点都有自己的hash值,如下图分布
2、 数据在进行存储时,计算数据key的hash值,然后顺时针找到的第一节点所属的hash值,那么数据就存储到该节点上,如下图:
3、数据的扩容,影响范围最小化,比如新增了一个nodeX,并且nodeX节点的Hash值比nodeC的hash值,此时有一个数据C顺时针的第一个节点就是nodeX了,但是数据A、和数据B并不会收到影响;对于扩容后我们只需要将nodeC的数据迁移到nodeX即可,其他节点不用动
资源倾斜问题
比如:
我们现在有三个节点(nodeA、nodeB、nodeC),这三个节点分别在Hash环的不同位置,并且数据Key分别为:a、b、c、d、e、f;
在进行hash计算时,可能出现 a、b、c、d可能在nodeA中,e在nodeB中、f在nodeC中,这就出现了资源倾斜;为了尽可能的实现平均分别,引入了虚拟节点;
设定每个节点都有100个虚拟接口,虚拟节点分别为:nodeA001…nodeA002…nodeA100、nodeB001…nodeB002…nodeB100、nodeC001…nodeC002…nodeC100;引入虚拟节点后此时三个真实节点实际在hash环上就有300个节点了,提高了节点的平均分布率,可以更好的实现数据均分到不同的节点上。
如图所示,节点A在Hash环上有A1和A2等虚拟节点,只要在A1和A2后面的数据key,都会存入到服务器A中;
代码实现
基础实现
引入依赖:
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>33.0.0-jre</version>
</dependency>
hash计算工具类:
import com.google.common.hash.Hashing;import java.nio.charset.StandardCharsets;
import java.util.*;
public class ShardingRouterComponent {// 虚拟节点到物理节点的映射private final TreeMap<Long, String> VIRTUAL_NODE_MAP = new TreeMap<>();public ShardingRouterComponent(List<String> physicalShards) {initVirtualNodes(physicalShards);}private void initVirtualNodes(List<String> physicalShards) {for (String shard : physicalShards) {// addShard(shard);long hash = hash(shard);VIRTUAL_NODE_MAP.put(hash, shard);}}public String routeShard(Long key) {if (VIRTUAL_NODE_MAP.isEmpty()) {throw new RuntimeException("No shards available");}long hash = hash(key.toString());// 找到第一个大于等于该哈希值的节点SortedMap<Long, String> tail = VIRTUAL_NODE_MAP.tailMap(hash);if (tail.isEmpty()) {return VIRTUAL_NODE_MAP.firstEntry().getValue();}return tail.get(tail.firstKey());}public long hash(String key) {return Hashing.murmur3_128().hashString(key, StandardCharsets.UTF_8).asLong();}
}
验证
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class MainTest {public static void main(String[] args) {// 创建4个节点List<String> list = new ArrayList<>();for (int i = 0; i <= 4; i++) {String s = "node_" + i;list.add(s);}ShardingRouterComponent routerComponent = new ShardingRouterComponent(list);System.out.println(routerComponent.getAllPhysicalShards());Map<String, List<Long>> m1 = new HashMap<>();for (long a = 1; a <= 30; a++) {String shard = routerComponent.routeShard(a);System.out.println("key=" + a + " -> shard=" + shard);List<Long> longList = m1.get(shard);if (longList == null) {longList = new ArrayList<>();longList.add(a);m1.put(shard, longList);} else {longList.add(a);}}System.out.println(m1);}
}
运行上面代码后,可以看到不同的key进入了不同的node节点中,但是存在一个问题:有点节点数据较少,有的节点数据较多,这就是资源倾斜了;要处理资源倾斜,我们需要增加一个虚拟节点
来处理;
资源倾斜问题
增加虚拟节点:
修改ShardingRouterComponent
类来实现虚拟节点的功能,所谓虚拟节点
就是我们虚构的节点,不存在真实的服务器,我们将使用真实节点+'#VN'+虚拟数
的方式拼接一个key,并且将该key的hash值与真实节点
进行绑定;
完整代码如下:
import com.google.common.hash.Hashing;import java.nio.charset.StandardCharsets;
import java.util.*;public class ShardingRouterComponent {// 虚拟节点到物理节点的映射private final TreeMap<Long, String> VIRTUAL_NODE_MAP = new TreeMap<>();// 每个物理节点对应的虚拟节点数private final int VIRTUAL_NODE_SHARD_NUM;public ShardingRouterComponent(List<String> physicalShards, int virtualNodePerShard) {this.VIRTUAL_NODE_SHARD_NUM = virtualNodePerShard;initVirtualNodes(physicalShards);}private void initVirtualNodes(List<String> physicalShards) {for (String shard : physicalShards) {addShard(shard);}}public String routeShard(Long key) {if (VIRTUAL_NODE_MAP.isEmpty()) {throw new RuntimeException("No shards available");}long hash = hash(key.toString());// 找到第一个大于等于该哈希值的节点SortedMap<Long, String> tail = VIRTUAL_NODE_MAP.tailMap(hash);if (tail.isEmpty()) {return VIRTUAL_NODE_MAP.firstEntry().getValue();}return tail.get(tail.firstKey());}public long hash(String key) {return Hashing.murmur3_128().hashString(key, StandardCharsets.UTF_8).asLong();}public void addShard(String shard) {for (int i = 0; i < VIRTUAL_NODE_SHARD_NUM; i++) {long hash = hash(shard + "#VN#" + i);VIRTUAL_NODE_MAP.put(hash, shard);}}}
验证
我们现在有4个物理节点(node0...node3)
,并且为每个节点,都设置100个虚拟节点,验证代码如下:
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class MainTest {public static void main(String[] args) {List<String> list = new ArrayList<>();for (int i = 0; i <= 4; i++) {String s = "node_" + i;list.add(s);}ShardingRouterComponent routerComponent = new ShardingRouterComponent(list, 10);System.out.println(routerComponent.getAllPhysicalShards());Map<String, List<Long>> m1 = new HashMap<>();for (long a = 1; a <= 30; a++) {String shard = routerComponent.routeShard(a);System.out.println("key=" + a + " -> shard=" + shard);List<Long> longList = m1.get(shard);if (longList == null) {longList = new ArrayList<>();longList.add(a);m1.put(shard, longList);} else {longList.add(a);}}System.out.println(m1);}
}
运行上面代码后,可以看到数据相对较均匀
整合Spring使用(以分库分表为例)
一致性hash工具
import com.google.common.hash.Hashing;import java.nio.charset.StandardCharsets;
import java.util.*;public class ShardingRouterComponent {// 虚拟节点到物理节点的映射private final TreeMap<Long, String> VIRTUAL_NODE_MAP = new TreeMap<>();// 每个物理节点对应的虚拟节点数private final int VIRTUAL_NODE_SHARD_NUM;public ShardingRouterComponent(List<String> physicalShards, int virtualNodePerShard) {this.VIRTUAL_NODE_SHARD_NUM = virtualNodePerShard;initVirtualNodes(physicalShards);}private void initVirtualNodes(List<String> physicalShards) {for (String shard : physicalShards) {addShard(shard);}}public String routeShard(Long key) {if (VIRTUAL_NODE_MAP.isEmpty()) {throw new RuntimeException("No shards available");}long hash = hash(key.toString());// 找到第一个大于等于该哈希值的节点SortedMap<Long, String> tail = VIRTUAL_NODE_MAP.tailMap(hash);if (tail.isEmpty()) {return VIRTUAL_NODE_MAP.firstEntry().getValue();}return tail.get(tail.firstKey());}public long hash(String key) {return Hashing.murmur3_128().hashString(key, StandardCharsets.UTF_8).asLong();}public void addShard(String shard) {for (int i = 0; i < VIRTUAL_NODE_SHARD_NUM; i++) {long hash = hash(shard + "#VN#" + i);VIRTUAL_NODE_MAP.put(hash, shard);}}}
hash路由配置
模拟场景:对数据进行分库分表操作,定义了4个库和9个表,并且每个节点进行200个虚拟节点;
@Component
public class ShardingRouterConfig {@Beanpublic ShardingRouterComponent databaseSharding() {// 可以写到yml中定义int dbNum = 4;int tableNum = 9;// 可以写到yml中定义int virtualNodePerShard = 200;List<String> list = new ArrayList<>();for (int i = 0; i <= dbNum; i++) {list.add("db_" + i);}return new ShardingRouterComponent(list, virtualNodePerShard);}@Beanpublic ShardingRouterComponent tableSharding() {List<String> list = new ArrayList<>();for (int i = 0; i <= tableNum; i++) {list.add("table_" + i);}return new ShardingRouterComponent(list, virtualNodePerShard);}
}
测试
为了演示demo,直接在controller中进行使用,实际可以单独创建一个bean进行key路由的计算;
@RestController
@Slf4j
public class TestController {@Resourceprivate ShardingRouterComponent databaseSharding;@Resourceprivate ShardingRouterComponent tableSharding;@GetMapping("/test")public void test() throws Exception {// 随机一个keylong key = System.currentTimeMillis();// 计算所属库log.info(databaseSharding.routeShard(key)); // 计算所属表log.info(tableSharding.routeShard(key));}
}
数据迁移方案
一致性hash的一个优点就是:数据的扩容,影响范围最小化
比如新增了一个nodeX,并且nodeX节点的Hash值比nodeC的hash值,此时有一个数据C顺时针的第一个节点就是nodeX了,但是数据A、和数据B并不会收到影响;对于扩容后我们只需要将nodeC的数据迁移到nodeX即可,其他节点不用动
数据迁移方案
第一步:单独一个应用,应用会做一下操作
- 该应用中的hash节点数是扩容后节点数据,应用通过多线程的方式对所有的表按照
id升序
进行查询,将查询后的数据重新计算分片键的hash值,并且将数据insert到新hash值所映射的库/表
中;PS:在主体应用中 如果不做新节点的配置,是不会有数据被使用到的 - 该应用通过
Canal组件
监听扩容前的所有表,如何是旧的节点表
出现insert、delete、update
,则需要判断分片健的hash是否匹配当前节点表,如果不匹配 就需要更新或者新增
到新表中;
第二步:主体应用发布,主体应用发布后,进行以下操作
-
主体应用成功发布后,数据的新增和修改都是在新表进行的,持续观察一段时间;
-
独立应用再进行一个新的任务,该任务扫描旧表的所有数据,通过id进行倒叙查询
- 如果旧数据的hash值属于新表,并且新表中不存在新表则进行新增;
- 旧数据已经存在新表时 ,则比较旧表数据的修改时间与新表的修改时间。如果旧表数据修改时间大于新表修改时间,则更新新表数据+删除旧表数据(一个事务中);如果旧表数据更新时间小于等于新表数据, 则直接删除旧表;