欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > IT业 > 一致性hash应用-分库分表

一致性hash应用-分库分表

2025/4/2 16:50:30 来源:https://blog.csdn.net/zhuocailing3390/article/details/146886777  浏览:    关键词:一致性hash应用-分库分表
😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: XXXXXXX
⏱️ @ 创作时间: 2025年03月31日

目录

    • 前言
    • 核心概念
    • 路由&扩容原理
    • 资源倾斜问题
    • 代码实现
      • 基础实现
      • 资源倾斜问题
    • 整合Spring使用(以分库分表为例)
      • 一致性hash工具
      • hash路由配置
      • 测试
    • 数据迁移方案

前言

一致性哈希(Consistent Hashing)是分布式系统中常用的数据分片技术,将数据分布到一个哈希环上,,它能在节点增减时最小化数据迁移量,哈希环的大小为2的32次方-1;

通过这种一致性哈希的分库分表方案,可以实现数据的均匀分布和平滑扩容,对于水平扩展的业务场景;在实现扩容,会伴随数据的迁移,通过一致性Hash,环实现对旧数据影响最小。

核心概念

  1. 哈希环:将哈希空间组织成一个环形结构。
  2. 虚拟节点:每个物理节点对应多个虚拟节点,实现更均匀的分布。
  3. 数据路由:数据键的哈希值在环上顺时针找到的第一个节点即为存储节点。
  4. 扩容处理:新增节点时只影响相邻节点的数据,相比于取模方式,影响范围小。

路由&扩容原理

1、计算节点的hash值,比如存在node1、node2、node3,分别在服务器A/B/C上,并且每个节点都有自己的hash值,如下图分布
在这里插入图片描述

2、 数据在进行存储时,计算数据key的hash值,然后顺时针找到的第一节点所属的hash值,那么数据就存储到该节点上,如下图:
在这里插入图片描述

3、数据的扩容,影响范围最小化,比如新增了一个nodeX,并且nodeX节点的Hash值比nodeC的hash值,此时有一个数据C顺时针的第一个节点就是nodeX了,但是数据A、和数据B并不会收到影响;对于扩容后我们只需要将nodeC的数据迁移到nodeX即可,其他节点不用动
在这里插入图片描述

资源倾斜问题

比如:

我们现在有三个节点(nodeA、nodeB、nodeC),这三个节点分别在Hash环的不同位置,并且数据Key分别为:a、b、c、d、e、f;

在进行hash计算时,可能出现 a、b、c、d可能在nodeA中,e在nodeB中、f在nodeC中,这就出现了资源倾斜;为了尽可能的实现平均分别,引入了虚拟节点;

设定每个节点都有100个虚拟接口,虚拟节点分别为:nodeA001…nodeA002…nodeA100、nodeB001…nodeB002…nodeB100、nodeC001…nodeC002…nodeC100;引入虚拟节点后此时三个真实节点实际在hash环上就有300个节点了,提高了节点的平均分布率,可以更好的实现数据均分到不同的节点上。

如图所示,节点A在Hash环上有A1和A2等虚拟节点,只要在A1和A2后面的数据key,都会存入到服务器A中;
在这里插入图片描述

代码实现

基础实现

引入依赖:

<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>33.0.0-jre</version>
</dependency>

hash计算工具类:

import com.google.common.hash.Hashing;import java.nio.charset.StandardCharsets;
import java.util.*;
public class ShardingRouterComponent {// 虚拟节点到物理节点的映射private final TreeMap<Long, String> VIRTUAL_NODE_MAP = new TreeMap<>();public ShardingRouterComponent(List<String> physicalShards) {initVirtualNodes(physicalShards);}private void initVirtualNodes(List<String> physicalShards) {for (String shard : physicalShards) {// addShard(shard);long hash = hash(shard);VIRTUAL_NODE_MAP.put(hash, shard);}}public String routeShard(Long key) {if (VIRTUAL_NODE_MAP.isEmpty()) {throw new RuntimeException("No shards available");}long hash = hash(key.toString());// 找到第一个大于等于该哈希值的节点SortedMap<Long, String> tail = VIRTUAL_NODE_MAP.tailMap(hash);if (tail.isEmpty()) {return VIRTUAL_NODE_MAP.firstEntry().getValue();}return tail.get(tail.firstKey());}public long hash(String key) {return Hashing.murmur3_128().hashString(key, StandardCharsets.UTF_8).asLong();}
}

验证

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class MainTest {public static void main(String[] args) {// 创建4个节点List<String> list = new ArrayList<>();for (int i = 0; i <= 4; i++) {String s = "node_" + i;list.add(s);}ShardingRouterComponent routerComponent = new ShardingRouterComponent(list);System.out.println(routerComponent.getAllPhysicalShards());Map<String, List<Long>> m1 = new HashMap<>();for (long a = 1; a <= 30; a++) {String shard = routerComponent.routeShard(a);System.out.println("key=" + a + " -> shard=" + shard);List<Long> longList = m1.get(shard);if (longList == null) {longList = new ArrayList<>();longList.add(a);m1.put(shard, longList);} else {longList.add(a);}}System.out.println(m1);}
}

运行上面代码后,可以看到不同的key进入了不同的node节点中,但是存在一个问题:有点节点数据较少,有的节点数据较多,这就是资源倾斜了;要处理资源倾斜,我们需要增加一个虚拟节点来处理;
在这里插入图片描述

资源倾斜问题

增加虚拟节点:

修改ShardingRouterComponent类来实现虚拟节点的功能,所谓虚拟节点就是我们虚构的节点,不存在真实的服务器,我们将使用真实节点+'#VN'+虚拟数的方式拼接一个key,并且将该key的hash值与真实节点进行绑定;

完整代码如下:

import com.google.common.hash.Hashing;import java.nio.charset.StandardCharsets;
import java.util.*;public class ShardingRouterComponent {// 虚拟节点到物理节点的映射private final TreeMap<Long, String> VIRTUAL_NODE_MAP = new TreeMap<>();// 每个物理节点对应的虚拟节点数private final int VIRTUAL_NODE_SHARD_NUM;public ShardingRouterComponent(List<String> physicalShards, int virtualNodePerShard) {this.VIRTUAL_NODE_SHARD_NUM = virtualNodePerShard;initVirtualNodes(physicalShards);}private void initVirtualNodes(List<String> physicalShards) {for (String shard : physicalShards) {addShard(shard);}}public String routeShard(Long key) {if (VIRTUAL_NODE_MAP.isEmpty()) {throw new RuntimeException("No shards available");}long hash = hash(key.toString());// 找到第一个大于等于该哈希值的节点SortedMap<Long, String> tail = VIRTUAL_NODE_MAP.tailMap(hash);if (tail.isEmpty()) {return VIRTUAL_NODE_MAP.firstEntry().getValue();}return tail.get(tail.firstKey());}public long hash(String key) {return Hashing.murmur3_128().hashString(key, StandardCharsets.UTF_8).asLong();}public void addShard(String shard) {for (int i = 0; i < VIRTUAL_NODE_SHARD_NUM; i++) {long hash = hash(shard + "#VN#" + i);VIRTUAL_NODE_MAP.put(hash, shard);}}}

验证

我们现在有4个物理节点(node0...node3),并且为每个节点,都设置100个虚拟节点,验证代码如下:

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class MainTest {public static void main(String[] args) {List<String> list = new ArrayList<>();for (int i = 0; i <= 4; i++) {String s = "node_" + i;list.add(s);}ShardingRouterComponent routerComponent = new ShardingRouterComponent(list, 10);System.out.println(routerComponent.getAllPhysicalShards());Map<String, List<Long>> m1 = new HashMap<>();for (long a = 1; a <= 30; a++) {String shard = routerComponent.routeShard(a);System.out.println("key=" + a + " -> shard=" + shard);List<Long> longList = m1.get(shard);if (longList == null) {longList = new ArrayList<>();longList.add(a);m1.put(shard, longList);} else {longList.add(a);}}System.out.println(m1);}
}

运行上面代码后,可以看到数据相对较均匀
在这里插入图片描述

整合Spring使用(以分库分表为例)

一致性hash工具

import com.google.common.hash.Hashing;import java.nio.charset.StandardCharsets;
import java.util.*;public class ShardingRouterComponent {// 虚拟节点到物理节点的映射private final TreeMap<Long, String> VIRTUAL_NODE_MAP = new TreeMap<>();// 每个物理节点对应的虚拟节点数private final int VIRTUAL_NODE_SHARD_NUM;public ShardingRouterComponent(List<String> physicalShards, int virtualNodePerShard) {this.VIRTUAL_NODE_SHARD_NUM = virtualNodePerShard;initVirtualNodes(physicalShards);}private void initVirtualNodes(List<String> physicalShards) {for (String shard : physicalShards) {addShard(shard);}}public String routeShard(Long key) {if (VIRTUAL_NODE_MAP.isEmpty()) {throw new RuntimeException("No shards available");}long hash = hash(key.toString());// 找到第一个大于等于该哈希值的节点SortedMap<Long, String> tail = VIRTUAL_NODE_MAP.tailMap(hash);if (tail.isEmpty()) {return VIRTUAL_NODE_MAP.firstEntry().getValue();}return tail.get(tail.firstKey());}public long hash(String key) {return Hashing.murmur3_128().hashString(key, StandardCharsets.UTF_8).asLong();}public void addShard(String shard) {for (int i = 0; i < VIRTUAL_NODE_SHARD_NUM; i++) {long hash = hash(shard + "#VN#" + i);VIRTUAL_NODE_MAP.put(hash, shard);}}}

hash路由配置

模拟场景:对数据进行分库分表操作,定义了4个库和9个表,并且每个节点进行200个虚拟节点;

@Component
public class ShardingRouterConfig {@Beanpublic ShardingRouterComponent databaseSharding() {// 可以写到yml中定义int dbNum = 4;int tableNum = 9;// 可以写到yml中定义int virtualNodePerShard = 200;List<String> list = new ArrayList<>();for (int i = 0; i <= dbNum; i++) {list.add("db_" + i);}return new ShardingRouterComponent(list, virtualNodePerShard);}@Beanpublic ShardingRouterComponent tableSharding() {List<String> list = new ArrayList<>();for (int i = 0; i <= tableNum; i++) {list.add("table_" + i);}return new ShardingRouterComponent(list, virtualNodePerShard);}
}

测试

为了演示demo,直接在controller中进行使用,实际可以单独创建一个bean进行key路由的计算;

@RestController
@Slf4j
public class TestController {@Resourceprivate ShardingRouterComponent databaseSharding;@Resourceprivate ShardingRouterComponent tableSharding;@GetMapping("/test")public void test() throws Exception {// 随机一个keylong key = System.currentTimeMillis();// 计算所属库log.info(databaseSharding.routeShard(key)); // 计算所属表log.info(tableSharding.routeShard(key));}
}

数据迁移方案

一致性hash的一个优点就是:数据的扩容,影响范围最小化

比如新增了一个nodeX,并且nodeX节点的Hash值比nodeC的hash值,此时有一个数据C顺时针的第一个节点就是nodeX了,但是数据A、和数据B并不会收到影响;对于扩容后我们只需要将nodeC的数据迁移到nodeX即可,其他节点不用动

在这里插入图片描述

数据迁移方案

第一步:单独一个应用,应用会做一下操作

  • 该应用中的hash节点数是扩容后节点数据,应用通过多线程的方式对所有的表按照id升序进行查询,将查询后的数据重新计算分片键的hash值,并且将数据insert到新hash值所映射的库/表中;PS:在主体应用中 如果不做新节点的配置,是不会有数据被使用到的
  • 该应用通过Canal组件监听扩容前的所有表,如何是旧的节点表出现insert、delete、update,则需要判断分片健的hash是否匹配当前节点表,如果不匹配 就需要更新或者新增到新表中;

第二步:主体应用发布,主体应用发布后,进行以下操作

  • 主体应用成功发布后,数据的新增和修改都是在新表进行的,持续观察一段时间;

  • 独立应用再进行一个新的任务,该任务扫描旧表的所有数据,通过id进行倒叙查询

    • 如果旧数据的hash值属于新表,并且新表中不存在新表则进行新增;
    • 旧数据已经存在新表时 ,则比较旧表数据的修改时间与新表的修改时间。如果旧表数据修改时间大于新表修改时间,则更新新表数据+删除旧表数据(一个事务中);如果旧表数据更新时间小于等于新表数据, 则直接删除旧表;

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词