文章目录
- 前言
- 一、项目大纲
- 二、Raft模块
- 1.Raft介绍
- 2.大致内容
- Leader与选举
- 日志同步、心跳
- raft日志的两个特点
- 3.主要流程
- 1. raft类的定义
- 关键函数
- m_nextIndex 和 m_matchIndex作用
- 2.启动初始化
- 3.竞选leader
- electionTimeOutTicker:
- doElection
- sendRequestVote
- RequestVote
- 4.日志复制、心跳
- leaderHearBeatTicker
- doHeartBeat
- sendAppendEntries
- AppendEntries
- 5.snapshot快照
- 快照是什么?
- 何时创建快照?
- 快照的传输
- 三.持久化
- 1.持久化介绍
- 1.raft节点的部分信息
- 2.kvDb的快照
- 2.为什么要持久化这些内容
- 3.什么时候持久化
- 4.谁来调用持久化
- 5.具体怎么实现持久化/使用哪个函数持久化
- 四.kvServer
- 1.kvServer介绍
- 2.kvServer怎么和上层kvDB沟通,怎么和下层raft节点沟通
- 3.kvServer怎么处理外部请求
- 接收与响应外部请求
- 待续
前言
构建一种基于Raft一致性算法的分布式键值存储数据库,以确保数据的一致性、可用性和分区容错性
一、项目大纲
- raft节点:raft算法实现的核心层,负责与其他机器的raft节点沟通,达到 分布式共识 的目的。
- raftServer:负责raft节点与k-v数据库中间的协调服务;负责持久化k-v数据库的数据(可选)。
- 上层状态机(k-v数据库):负责数据存储。
- 持久层:负责相关数据的落盘,对于raft节点,根据共识算法要求,必须对一些关键数据进行落盘处理,以保证节点宕机后重启程序可以恢复关键数据;对于raftServer,可能会有一些k-v数据库的东西需要落盘持久化。
- RPC通信:在 领导者选举、日志复制、数据查询、心跳等多个Raft重要过程中提供多节点快速简单的通信能力。
二、Raft模块
1.Raft介绍
参考
文章: 两万字长文解析raft算法原理
视频: 解析分布式共识算法之Raft算法
本项目用Raft解决的问题:
- 一致性: 通过Raft算法确保数据的强一致性,使得系统在正常和异常情况下都能够提供一致的数据视图。
- 可用性: 通过分布式节点的复制和自动故障转移,实现高可用性,即使在部分节点故障的情况下,系统依然能够提供服务。
- 分区容错: 处理网络分区的情况,确保系统在分区恢复后能够自动合并数据一致性。
2.大致内容
详细的看上面参考网站
Leader与选举
Raft是一个强Leader 模型,可以粗暴理解成Leader负责统领follower,如果Leader出现故障,那么整个集群都会对外停止服务,直到选举出下一个Leader。
- 节点之间通过网络通信,其他节点(follower)如何知道leader出现故障?
leader会定时向集群中剩下的节点(follower)发送AppendEntry
(作为心跳,hearbeat )以通知自己仍然存活。
可以推知,如果follower在一段时间内没有接收leader发送的AppendEntry
,那么follower就会认为当前的leader 出现故障,从而发起选举。
判断心跳超时,可以用一个定时器和一个标志位来实现,每到定时时间检查这期间有无AppendEntry
即可。 - AppendEntry作用
- 心跳
- 携带日志entry及其辅助信息,以控制日志的同步和日志向状态机提交
- 通告leader的index和term等关键信息以便follower对比确认follower自己或者leader是否过期
- follower知道leader出现故障后如何选举出leader?
follower认为leader故障后只能通过:term增加,变成candidate,向其他节点发起RequestVoteRPC
申请其他follower的选票,过一段时间之后会发生如下情况:- 赢得选举,马上成为leader (此时term已经增加了)
- 发现有符合要求的leader,自己马上变成follower 了,这个符合要求包括:leader的term≥自己的term
- 一轮选举结束,无人变成leader,那么循环这个过程
- 为了防止在同一时间有太多的follower转变为candidate导致一直无法选出leader, Raft 采用了随机选举超时(randomized election timeouts)的机制, 每一个candidate 在发起选举后,都会随机化一个新的选举超时时间。
- 符合什么条件的节点可以成为leader?
也可以称为“选举限制”,有限制的目的是为了保证选举出的 leader 一定包含了整个集群中目前已 committed 的所有日志。
当 candidate 发送RequestVoteRPC
时,会带上最后一个 entry 的信息。 所有的节点收到该请求后,都会比对自己的日志,如果发现自己的日志更新一些,则会拒绝投票给该 candidate。
需要比较两个东西:最新日志entry的term和对应的index。index为日志entry在整个日志的索引。
if 两个节点最新日志entry的term不同term大的日志更新
else最新日志entry的index大的更新
end
这样的限制可以保证:成为leader的节点,其日志已经是多数节点中最完备的,即包含了整个集群的所有 committed entries。
日志同步、心跳
在RPC中 日志同步 和 心跳 是放在一个RPC函数AppendEntryRPC
中来实现的,原因为:
- 心跳RPC 可以看成是没有携带日志的特殊的日志同步RPC。
- 对于一个follower,如果leader认为其日志已经和自己匹配了,那么在
AppendEntryRPC
中不用携带日志(再携带日志属于无效信息了,但其他信息依然要携带),反之如果follower的日志只有部分匹配,那么就需要在AppendEntryRPC
中携带对应的日志。
- 为什么不直接让follower拷贝leader的日志 或者 leader发送全部的日志给follower?
leader发送日志的目的是让follower同步自己的日志,当然可以让leader发送自己全部的日志给follower,然后follower接收后就覆盖自己原有的日志,但是这样就会携带大量的无效的日志(因为这些日志follower本身就有)。
因此raft的方式是:先找到日志不匹配的那个点,然后只同步那个点之后的日志。 - leader如何知道follower的日志是否与自己完全匹配?
在AppendEntryRPC
中携带上 entry的index和对应的term(日志的term),可以通过比较最后一个日志的index和term来得出某个follower日志是否匹配。 - 如果发现不匹配,那么如何知道哪部分日志是匹配的,哪部分日志是不匹配的呢?
leader每次发送AppendEntryRPC
后,follower都会根据其entry的index和对应的term来判断某一个日志是否匹配。
在leader刚当选,会从最后一个日志开始判断是否匹配,如果匹配,那么后续发送AppendEntryRPC
就不需要携带日志entry了。
如果不匹配,那么下一次就发送 倒数第2个 日志entry的index和其对应的term来判断匹配,
如果还不匹配,那么依旧重复这个过程,直到遇到一个匹配的日志。
raft日志的两个特点
- 两个节点的日志中,有两个 entry 拥有相同的 index 和 term,那么它们一定记录了相同的内容/操作,即两个日志匹配
- 两个节点的日志中,有两个 entry 拥有相同的 index 和 term,那么它们前面的日志entry也相同
- 如何保证?
- 保证第一点:仅有 leader 可以生成 entry,保证一致性
- 保证第二点:leader 在通过
AppendEntriesRPC
和 follower 通讯时,除了带上自己的term等信息外,还会带上entry的index和对应的term等信息,follower在接收到后通过对比就可以知道自己与leader的日志是否匹配,不匹配则拒绝请求。
leader发现follower拒绝后就知道entry不匹配,那么下一次就会尝试匹配前一个entry,直到遇到一个entry匹配,并将不匹配的entry给删除(覆盖)。
3.主要流程
1. raft类的定义
class Raft :
{
private:std::mutex m_mtx;std::vector<std::shared_ptr< RaftRpc >> m_peers; //需要与其他raft节点通信,这里保存与其他结点通信的rpc入口std::shared_ptr<Persister> m_persister; //持久化层,负责raft数据的持久化int m_me; //raft是以集群启动,这个用来标识自己的的编号int m_currentTerm; //记录当前的termint m_votedFor; //记录当前term给谁投票过std::vector<mprrpc:: LogEntry> m_logs; 日志条目数组,包含了状态机要执行的指令集,以及收到领导时的任期号// 这两个状态所有结点都在维护,易失int m_commitIndex;int m_lastApplied; // 已经汇报给状态机(上层应用)的log 的index// 这两个状态是由leader来维护,易失 ,这两个部分在内容补充的部分也会再讲解// 这两个状态的下标1开始,因为通常commitIndex和lastApplied从0开始,应该是一个无效的index,因此下标从1开始std::vector<int> m_nextIndex; //领导者使用 m_nextIndex 来确定需要发送给追随者的下一批日志条目。std::vector<int> m_matchIndex; //追随者使用 m_matchIndex 来记录已经成功复制的日志条目,并向领导者发送确认信息。enum Status{Follower,Candidate,Leader};// 保存当前身份Status m_status;std::shared_ptr<LockQueue<ApplyMsg>> applyChan; // client从这里取日志,client与raft通信的接口// ApplyMsgQueue chan ApplyMsg // raft内部使用的chan,applyChan是用于和服务层交互,最后好像没用上// 选举超时std::chrono::_V2::system_clock::time_point m_lastResetElectionTime;// 心跳超时,用于leaderstd::chrono::_V2::system_clock::time_point m_lastResetHearBeatTime;// 用于传入快照点// 储存了快照中的最后一个日志的Index和Termint m_lastSnapshotIncludeIndex;int m_lastSnapshotIncludeTerm;public:void AppendEntries1(const mprrpc::AppendEntriesArgs *args, mprrpc::AppendEntriesReply *reply); //日志同步 + 心跳 rpc ,重点关注void applierTicker(); //定期向状态机写入日志,非重点函数bool CondInstallSnapshot(int lastIncludedTerm, int lastIncludedIndex, std::string snapshot); //快照相关,非重点void doElection(); //发起选举void doHeartBeat(); //leader定时发起心跳// 每隔一段时间检查睡眠时间内有没有重置定时器,没有则说明超时了
// 如果有则设置合适睡眠时间:睡眠到重置时间+超时时间void electionTimeOutTicker(); //监控是否该发起选举了std::vector<ApplyMsg> getApplyLogs();int getNewCommandIndex();void getPrevLogInfo(int server, int *preIndex, int *preTerm);void GetState(int *term, bool *isLeader); //看当前节点是否是leadervoid InstallSnapshot( const mprrpc::InstallSnapshotRequest *args, mprrpc::InstallSnapshotResponse *reply); void leaderHearBeatTicker(); //检查是否需要发起心跳(leader)void leaderSendSnapShot(int server); void leaderUpdateCommitIndex(); //leader更新commitIndexbool matchLog(int logIndex, int logTerm); //对应Index的日志是否匹配,只需要Index和Term就可以知道是否匹配void persist(); //持久化void RequestVote(const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply); //变成candidate之后需要让其他结点给自己投票bool UpToDate(int index, int term); //判断当前节点是否含有最新的日志int getLastLogIndex();void getLastLogIndexAndTerm(int *lastLogIndex, int *lastLogTerm);int getLogTermFromLogIndex(int logIndex);int GetRaftStateSize();int getSlicesIndexFromLogIndex(int logIndex); //设计快照之后logIndex不能与在日志中的数组下标相等了,根据logIndex找到其在日志数组中的位置bool sendRequestVote(int server , std::shared_ptr<mprrpc::RequestVoteArgs> args , std::shared_ptr<mprrpc::RequestVoteReply> reply, std::shared_ptr<int> votedNum) ; // 请求其他结点的投票bool sendAppendEntries(int server ,std::shared_ptr<mprrpc::AppendEntriesArgs> args , std::shared_ptr<mprrpc::AppendEntriesReply> reply , std::shared_ptr<int> appendNums ) ; //Leader发送心跳后,对心跳的回复进行对应的处理//rf.applyChan <- msg //不拿锁执行 可以单独创建一个线程执行,但是为了同意使用std:thread ,避免使用pthread_create,因此专门写一个函数来执行void pushMsgToKvServer(ApplyMsg msg); //给上层的kvserver层发送消息void readPersist(std::string data); std::string persistData();void Start(Op command,int* newLogIndex,int* newLogTerm,bool* isLeader ) ; // 发布发来一个新日志
// 即kv-server主动发起,请求raft(持久层)保存snapshot里面的数据,index是用来表示snapshot快照执行到了哪条命令void Snapshot(int index , std::string snapshot );public:void init(std::vector<std::shared_ptr< RaftRpc >> peers,int me,std::shared_ptr<Persister> persister,std::shared_ptr<LockQueue<ApplyMsg>> applyCh); //初始化
关键函数
- Raft的主要流程:
- 领导选举(
sendRequestVote
,RequestVote
) - 日志同步、心跳(
sendAppendEntries
,AppendEntries
)
- 定时器的维护:
- Raft向状态机定时写入(
applierTicker
) - 心跳维护定时器(
leaderHearBeatTicker
) - 选举超时定时器(
electionTimeOutTicker
)
- 持久化相关:
- 哪些内容需要持久化,什么时候需要持久化(persist)
m_nextIndex 和 m_matchIndex作用
m_nextIndex
保存leader下一次应该从哪一个日志开始发送给follower;
m_matchIndex
表示follower在哪一个日志是已经匹配了的(由于日志安全性,某一个日志匹配,那么这个日志及其之前的日志都是匹配的)
一个比较容易弄错的问题是:m_nextIndex 与m_matchIndex 是否有冗余,即使用一个m_nextIndex 可以吗?
显然是不行的,m_nextIndex 的作用是用来寻找m_matchIndex ,不能直接取代。我们可以从这两个变量的变化看,在当选leader后,m_nextIndex 初始化为最新日志index,m_matchIndex 初始化为0,如果日志不匹配,那么m_nextIndex 就会不断的缩减,直到遇到匹配的日志,这时候m_nextIndex 应该一直为m_matchIndex+1 。
如果一直不发生故障,那么后期m_nextIndex就没有太大作用了,但是raft考虑需要考虑故障的情况,因此需要使用两个变量。
2.启动初始化
void Raft::init(std::vector<std::shared_ptr<RaftRpc>> peers, int me, std::shared_ptr<Persister> persister, std::shared_ptr<LockQueue<ApplyMsg>> applyCh) {m_peers = peers; //与其他结点沟通的rpc类m_persister = persister; //持久化类m_me = me; //标记自己,毕竟不能给自己发送rpc吧m_mtx.lock();//applierthis->applyChan = applyCh; //与kv-server沟通
// rf.ApplyMsgQueue = make(chan ApplyMsg)m_currentTerm = 0; //初始化term为0m_status = Follower; //初始化身份为followerm_commitIndex = 0; m_lastApplied = 0;m_logs.clear();for (int i =0;i<m_peers.size();i++){m_matchIndex.push_back(0);m_nextIndex.push_back(0);}m_votedFor = -1; //当前term没有给其他人投过票就用-1表示m_lastSnapshotIncludeIndex = 0;m_lastSnapshotIncludeTerm = 0;m_lastResetElectionTime = now();m_lastResetHearBeatTime = now();// initialize from state persisted before a crashreadPersist(m_persister->ReadRaftState());if(m_lastSnapshotIncludeIndex > 0){m_lastApplied = m_lastSnapshotIncludeIndex;//rf.commitIndex = rf.lastSnapshotIncludeIndex 崩溃恢复不能读取commitIndex}m_mtx.unlock();// start ticker 开始三个定时器std::thread t(&Raft::leaderHearBeatTicker, this);t.detach();std::thread t2(&Raft::electionTimeOutTicker, this);t2.detach();std::thread t3(&Raft::applierTicker, this);t3.detach();
}
从上面可以看到一共产生了三个定时器,分别维护:选举、日志同步和心跳、raft节点与kv-server的联系。相互之间是比较隔离的
3.竞选leader
在Raft算法中,每个节点(无论是追随者(follower)还是候选人(candidate))都有一个选举定时器。如果追随者在一定的时间内没有收到任何来自领导者或候选人的消息,它会认为当前没有有效的领导者,然后启动选举定时器。一旦定时器到期,追随者会转换为候选人状态,并开始新一轮的领导者选举。
electionTimeOutTicker
:负责查看是否该发起选举,如果该发起选举就执行doElection
发起选举。doElection
:实际发起选举,构造需要发送的rpc,并多线程调用sendRequestVote
处理rpc及其相应。sendRequestVote
:负责发送选举中的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。RequestVote
:接收别人发来的选举请求,主要检验是否要给对方投票。
electionTimeOutTicker:
选举定时器,负责查看是否该发起选举,如果该发起选举就执行doElection
发起选举。
void Raft::electionTimeOutTicker() {// Check if a Leader election should be started.while (true) {m_mtx.lock();auto nowTime = now(); //睡眠前记录时间auto suitableSleepTime = getRandomizedElectionTimeout() + m_lastResetElectionTime - nowTime;m_mtx.unlock();if (suitableSleepTime.count() > 1) {std::this_thread::sleep_for(suitableSleepTime);}if ((m_lastResetElectionTime - nowTime).count() > 0) { //说明睡眠的这段时间有重置定时器,那么就没有超时,再次睡眠continue;}doElection();}
}
在死循环中,
首先计算距离上次重置选举计时器的时间m_lastResetElectionTime
- nowTime
加上随机化的选举超时时间getRandomizedElectionTimeout
,
计算得到距离下一次超时应该睡眠的时间suitableSleepTime
,然后线程根据这个时间决定是否睡眠。
若超时时间未到,线程进入睡眠状态,若在此期间选举计时器被重置,则继续循环。
若超时时间已到,调用doElection()
函数启动领导者选举过程。
随机化的选举超时时间是为了避免多个追随者几乎同时成为候选人,导致选举失败
doElection
实际发起选举,构造需要发送的rpc,并多线程调用sendRequestVote
处理rpc及其相应。
void Raft::doElection() {lock_guard<mutex> g(m_mtx); //c11新特性,使用raii避免死锁if (m_status != Leader) {DPrintf("[ ticker-func-rf(%d) ] 选举定时器到期且不是leader,开始选举 \n", m_me);//当选举的时候定时器超时就必须重新选举,不然没有选票就会一直卡住//重竞选超时,term也会增加的m_status = Candidate;///开始新一轮的选举m_currentTerm += 1; //无论是刚开始竞选,还是超时重新竞选,term都要增加m_votedFor = m_me; //即是自己给自己投票,也避免candidate给同辈的candidate投persist(); std::shared_ptr<int> votedNum = std::make_shared<int>(1); // 使用 make_shared 函数初始化 // 重新设置定时器m_lastResetElectionTime = now();// 发布RequestVote RPCfor (int i = 0; i < m_peers.size(); i++) {if (i == m_me) {continue;}int lastLogIndex = -1, lastLogTerm = -1;getLastLogIndexAndTerm(&lastLogIndex, &lastLogTerm);//获取最后一个log的term和下标,以添加到RPC的发送//初始化发送参数std::shared_ptr<mprrpc::RequestVoteArgs> requestVoteArgs = std::make_shared<mprrpc::RequestVoteArgs>();requestVoteArgs->set_term(m_currentTerm);requestVoteArgs->set_candidateid(m_me);requestVoteArgs->set_lastlogindex(lastLogIndex);requestVoteArgs->set_lastlogterm(lastLogTerm);std::shared_ptr<mprrpc::RequestVoteReply> requestVoteReply = std::make_shared<mprrpc::RequestVoteReply>();//使用匿名函数执行避免其拿到锁std::thread t(&Raft::sendRequestVote, this, i, requestVoteArgs, requestVoteReply,votedNum); // 创建新线程并执行函数,并传递参数t.detach();}}
}
sendRequestVote
负责发送选举中的RPC,在发送完rpc后还需要根据调用RequestVote
得到的reply响应结果,负责接收并处理对端发送回来的响应,对发起投票的候选者状态进行更新
bool Raft::sendRequestVote(int server, std::shared_ptr<mprrpc::RequestVoteArgs> args, std::shared_ptr<mprrpc::RequestVoteReply> reply,std::shared_ptr<int> votedNum) {bool ok = m_peers[server]->RequestVote(args.get(),reply.get());if (!ok) {return ok;//rpc通信失败就立即返回,避免资源消耗}lock_guard<mutex> lg(m_mtx);if(reply->term() > m_currentTerm){//回复的term比自己大,说明自己落后了,那么就更新自己的状态并且退出m_status = Follower; //三变:身份,term,和投票m_currentTerm = reply->term();m_votedFor = -1; //term更新了,那么这个term自己肯定没投过票,为-1persist(); //持久化return true;} else if ( reply->term() < m_currentTerm ) {//回复的term比自己的term小,不应该出现这种情况return true;}if(!reply->votegranted()){ //这个节点因为某些原因没给自己投票,没啥好说的,结束本函数return true;}//给自己投票了*votedNum = *votedNum + 1; //voteNum多一个if (*votedNum >= m_peers.size()/2+1) {//变成leader*votedNum = 0; //重置voteDNum,如果不重置,那么就会变成leader很多次,是没有必要的,甚至是错误的!!!// 第一次变成leader,初始化状态和nextIndex、matchIndexm_status = Leader;int lastLogIndex = getLastLogIndex();for (int i = 0; i <m_nextIndex.size() ; i++) {m_nextIndex[i] = lastLogIndex + 1 ;//有效下标从1开始,因此要+1m_matchIndex[i] = 0; //每换一个领导都是从0开始,见论文的fig2}std::thread t(&Raft::doHeartBeat, this); //马上向其他节点宣告自己就是leadert.detach();persist(); }return true;
}
RequestVote
得到投票请求后,节点根据传递来的信息进行判断是否对其投票,构造reply返回值
void Raft::RequestVote( const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply) {lock_guard<mutex> lg(m_mtx);Defer ec1([this]() -> void { //应该先持久化,再撤销lock,因此这个写在lock后面this->persist();});//对args的term的三种情况分别进行处理,大于小于等于自己的term都是不同的处理//reason: 出现网络分区,该竞选者已经OutOfDate(过时)if (args->term() < m_currentTerm) {reply->set_term(m_currentTerm);reply->set_votestate(Expire);reply->set_votegranted(false);return;}//论文fig2:右下角,如果任何时候rpc请求或者响应的term大于自己的term,更新term,并变成followerif (args->term() > m_currentTerm) {m_status = Follower;m_currentTerm = args->term();m_votedFor = -1;// 重置定时器:收到leader的ae,开始选举,透出票//这时候更新了term之后,votedFor也要置为-1}// 现在节点任期都是相同的(任期小的也已经更新到新的args的term了)// 要检查log的term和index是不是匹配的了int lastLogTerm = getLastLogIndex();//只有没投票,且candidate的日志的新的程度 ≥ 接受者的日志新的程度 才会授票if (!UpToDate(args->lastlogindex(), args->lastlogterm())) {//日志太旧了reply->set_term(m_currentTerm);reply->set_votestate(Voted);reply->set_votegranted(false);return;}// 当因为网络质量不好导致的请求丢失重发就有可能!!!!
// 因此需要避免重复投票if (m_votedFor != -1 && m_votedFor != args->candidateid()) {reply->set_term(m_currentTerm);reply->set_votestate(Voted);reply->set_votegranted(false);return;} else {//同意投票m_votedFor = args->candidateid();m_lastResetElectionTime = now();//认为必须要在投出票的时候才重置定时器,reply->set_term(m_currentTerm);reply->set_votestate(Normal);reply->set_votegranted(true);return;}
}
4.日志复制、心跳
leaderHearBeatTicker
:负责查看是否该发送心跳了,如果该发起就执行doHeartBeat
。doHeartBeat
:实际发送心跳,判断到底是构造需要发送的rpc,并多线程调用sendRequestVote
处理rpc及其响应。sendAppendEntries
:负责发送日志的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。leaderSendSnapShot
:负责发送快照的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。AppendEntries
:接收leader发来的日志请求,主要检验用于检查当前日志是否匹配并同步leader的日志到本机。InstallSnapshot
:接收leader发来的快照请求,同步快照到本机。
leaderHearBeatTicker
心跳定时器,负责查看是否该发送心跳了,如果该发起就执行doHeartBeat
。
void Raft::leaderHearBeatTicker() {while (true) {auto nowTime = now();m_mtx.lock();auto suitableSleepTime = std::chrono::milliseconds(HeartBeatTimeout) + m_lastResetHearBeatTime - nowTime;m_mtx.unlock();if (suitableSleepTime.count() < 1) {suitableSleepTime = std::chrono::milliseconds(1);}std::this_thread::sleep_for(suitableSleepTime);if ((m_lastResetHearBeatTime - nowTime).count() > 0) { //说明睡眠的这段时间有重置定时器,那么就没有超时,再次睡眠continue;}doHeartBeat();}
}
其基本逻辑和选举定时器electionTimeOutTicker
一模一样
不一样之处在于设置的休眠时间不同,这里是根据HeartBeatTimeout
来设置,固定时间。
而electionTimeOutTicker
中是根据getRandomizedElectionTimeout()
设置,随机一个时间。
doHeartBeat
实际发送心跳,判断是Leader则构造需要发送的rpc,并多线程调用sendRequestVote
处理rpc及其响应。
void Raft::doHeartBeat() {std::lock_guard<mutex> g(m_mtx);if (m_status == Leader) {auto appendNums = std::make_shared<int>(1); //正确返回的节点的数量//对Follower(除了自己外的所有节点发送AE)for (int i = 0; i < m_peers.size(); i++) {if(i == m_me){ //不对自己发送AEcontinue;}//日志压缩加入后要判断是发送快照还是发送AEif (m_nextIndex[i] <= m_lastSnapshotIncludeIndex) {//应该发送的日志已经被压缩成快照,必须发送快照了std::thread t(&Raft::leaderSendSnapShot, this, i); t.detach();continue;}//发送心跳,构造发送值int preLogIndex = -1;int PrevLogTerm = -1;getPrevLogInfo(i, &preLogIndex, &PrevLogTerm); //获取本次发送的一系列日志的上一条日志的信息,以判断是否匹配std::shared_ptr<mprrpc::AppendEntriesArgs> appendEntriesArgs = std::make_shared<mprrpc::AppendEntriesArgs>();appendEntriesArgs->set_term(m_currentTerm);appendEntriesArgs->set_leaderid(m_me);appendEntriesArgs->set_prevlogindex(preLogIndex);appendEntriesArgs->set_prevlogterm(PrevLogTerm);appendEntriesArgs->clear_entries();appendEntriesArgs->set_leadercommit(m_commitIndex);// 作用是携带上prelogIndex的下一条日志及其之后的所有日志//leader对每个节点发送的日志长短不一,但是都保证从prevIndex发送直到最后if (preLogIndex != m_lastSnapshotIncludeIndex) {for (int j = getSlicesIndexFromLogIndex(preLogIndex) + 1; j < m_logs.size(); ++j) {mprrpc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries();*sendEntryPtr = m_logs[j]; }} else {for (const auto& item: m_logs) {mprrpc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries();*sendEntryPtr = item; }}int lastLogIndex = getLastLogIndex();//初始化返回值const std::shared_ptr<mprrpc::AppendEntriesReply> appendEntriesReply = std::make_shared<mprrpc::AppendEntriesReply>();std::thread t(&Raft::sendAppendEntries, this, i, appendEntriesArgs, appendEntriesReply,appendNums); // 创建新线程并执行b函数,并传递参数t.detach();}m_lastResetHearBeatTime = now(); //leader发送心跳,重置心跳时间,}
}
sendAppendEntries
负责发送日志的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。
bool
Raft::sendAppendEntries(int server, std::shared_ptr<mprrpc::AppendEntriesArgs> args, std::shared_ptr<mprrpc::AppendEntriesReply> reply,std::shared_ptr<int> appendNums) {// todo: paper中5.3节第一段末尾提到,如果append失败应该不断的retries ,直到这个log成功的被storebool ok = m_peers[server]->AppendEntries(args.get(), reply.get());if (!ok) {return ok;}lock_guard<mutex> lg1(m_mtx);//对reply进行处理// 对于rpc通信,无论什么时候都要检查termif(reply->term() > m_currentTerm){m_status = Follower;m_currentTerm = reply->term();m_votedFor = -1;return ok;} else if (reply->term() < m_currentTerm) {//正常不会发生return ok;}if (m_status != Leader) { //如果不是leader,那么就不要对返回的情况进行处理了return ok;}//term相等if (!reply->success()){//日志不匹配,正常来说就是index要往前-1,既然能到这里,第一个日志(idnex = 1)发送后肯定是匹配的,因此不用考虑变成负数//因为真正的环境不会知道是服务器宕机还是发生网络分区了if (reply->updatenextindex() != -100) { //-100只是一个特殊标记而已,没有太具体的含义,这里表示任期落后了// 优化日志匹配,让follower决定到底应该下一次从哪一个开始尝试发送m_nextIndex[server] = reply->updatenextindex(); }// 如果感觉rf.nextIndex数组是冗余的,看下论文fig2,其实不是冗余的} else {*appendNums = *appendNums +1; //到这里代表同意接收了本次心跳或者日志m_matchIndex[server] = std::max(m_matchIndex[server],args->prevlogindex()+args->entries_size() ); //同意了日志,就更新对应的m_matchIndex和m_nextIndexm_nextIndex[server] = m_matchIndex[server]+1;int lastLogIndex = getLastLogIndex();if (*appendNums >= 1 + m_peers.size()/2) { //可以commit了//两种方法保证幂等性,1.赋值为0 2.上面≥改为==*appendNums = 0; //置0//日志的安全性保证!!!!! leader只有在当前term有日志提交的时候才更新commitIndex,因为raft无法保证之前term的Index是否提交//只有当前term有日志提交,之前term的log才可以被提交,只有这样才能保证“领导人完备性{当选领导人的节点拥有之前被提交的所有log,当然也可能有一些没有被提交的}”//说白了就是只有当前term有日志提交才会提交if(args->entries_size() >0 && args->entries(args->entries_size()-1).logterm() == m_currentTerm){m_commitIndex = std::max(m_commitIndex,args->prevlogindex() + args->entries_size());}}}return ok;
}
m_nextIndex[server] = reply->updatenextindex(); 中涉及日志寻找匹配加速的优化
对于leader只有在当前term有日志提交的时候才更新commitIndex这个安全性保证,详情看参考公众号文章的:7.6是否一项提议只需要被多数派通过就可以提交?
AppendEntries
接收leader发来的日志请求,主要检验用于检查当前日志是否匹配并同步leader的日志到本机。
void Raft::AppendEntries1(const mprrpc:: AppendEntriesArgs *args, mprrpc::AppendEntriesReply *reply) {std::lock_guard<std::mutex> locker(m_mtx);// 不同的人收到AppendEntries的反应是不同的,要注意无论什么时候收到rpc请求和响应都要检查termif (args->term() < m_currentTerm) {reply->set_success(false);reply->set_term(m_currentTerm);reply->set_updatenextindex(-100); // 论文中:让领导人可以及时更新自己DPrintf("[func-AppendEntries-rf{%d}] 拒绝了 因为Leader{%d}的term{%v}< rf{%d}.term{%d}\n", m_me, args->leaderid(),args->term() , m_me, m_currentTerm) ;return; // 注意从过期的领导人收到消息不要重设定时器}Defer ec1([this]() -> void { this->persist(); });//由于这个局部变量创建在锁之后,因此执行persist的时候应该也是拿到锁的. //本质上就是使用raii的思想让persist()函数执行完之后再执行if (args->term() > m_currentTerm) {// 三变 ,防止遗漏,无论什么时候都是三变m_status = Follower;m_currentTerm = args->term();m_votedFor = -1; // 这里设置成-1有意义,如果突然宕机然后上线理论上是可以投票的// 这里可不返回,应该改成让改节点尝试接收日志// 如果是领导人和candidate突然转到Follower好像也不用其他操作// 如果本来就是Follower,那么其term变化,相当于“不言自明”的换了追随的对象,因为原来的leader的term更小,是不会再接收其消息了}// 如果发生网络分区,那么candidate可能会收到同一个term的leader的消息,要转变为Follower,为了和上面,因此直接写m_status = Follower; // 这里是有必要的,因为如果candidate收到同一个term的leader的AE,需要变成follower// term相等m_lastResetElectionTime = now(); //重置选举超时定时器// 不能无脑的从prevlogIndex开始阶段日志,因为rpc可能会延迟,导致发过来的log是很久之前的// 那么就比较日志,日志有3种情况if (args->prevlogindex() > getLastLogIndex()) { //追随者的日志比领导者的要短。这种情况,追随者需要从领导者那里接收缺失的日志条目。reply->set_success(false);reply->set_term(m_currentTerm);reply->set_updatenextindex(getLastLogIndex() + 1);return;} else if (args->prevlogindex() < m_lastSnapshotIncludeIndex) { // 如果prevlogIndex还没有更上快照//追随者可能已经通过快照机制截断了其日志 或 追随者接受了一个快照,丢弃了快照索引之前的日志 或 领导者的日志落后reply->set_success(false);reply->set_term(m_currentTerm);reply->set_updatenextindex(m_lastSnapshotIncludeIndex + 1); //不会浪费时间重试发送追随者已经用快照截断的日志条目}// 本机日志有那么长,冲突(same index,different term),截断日志// 注意:这里目前当args.PrevLogIndex == rf.lastSnapshotIncludeIndex与不等的时候要分开考虑,可以看看能不能优化这块if (matchLog(args->prevlogindex(), args->prevlogterm())) {//日志匹配,那么就复制日志for (int i = 0; i < args->entries_size(); i++) {auto log = args->entries(i);if (log.logindex() > getLastLogIndex()) { //超过就直接添加日志m_logs.push_back(log);} else { //没超过就比较是否匹配,不匹配再更新,而不是直接截断 检查当前日志条目是否已经存在于追随者的日志中。//判断追随者日志中相应索引位置的日志条目的任期是否与请求中的日志条目的任期相同。如果任期不同,说明日志不匹配。//参考前面公众号文章 4.1 写 case3if (m_logs[getSlicesIndexFromLogIndex(log.logindex())].logterm() != log.logterm()) { //不匹配就更新m_logs[getSlicesIndexFromLogIndex(log.logindex())] = log;}}}if (args->leadercommit() > m_commitIndex) {m_commitIndex = std::min(args->leadercommit(), getLastLogIndex());// 这个地方不能无脑跟上getLastLogIndex(),因为可能存在args->leadercommit()落后于 getLastLogIndex()的情况}// 领导会一次发送完所有的日志reply->set_success(true);reply->set_term(m_currentTerm);return;} else {// 不匹配,不匹配不是一个一个往前,而是有优化加速// PrevLogIndex 长度合适,但是不匹配,因此往前寻找 矛盾的term的第一个元素// 为什么该term的日志都是矛盾的呢?也不一定都是矛盾的,只是这么优化减少rpc而已// ?什么时候term会矛盾呢?很多情况,比如leader接收了日志之后马上就崩溃等等reply->set_updatenextindex(args->prevlogindex());for (int index = args->prevlogindex(); index >= m_lastSnapshotIncludeIndex; --index) {if (getLogTermFromLogIndex(index) != getLogTermFromLogIndex(args->prevlogindex())) {reply->set_updatenextindex(index + 1);break;}}reply->set_success(false);reply->set_term(m_currentTerm);return;}}
日志寻找匹配加速
这部分在AppendEntries
函数最后部分。
// 不匹配,不匹配不是一个一个往前,而是有优化加速
// PrevLogIndex 长度合适,但是不匹配,因此往前寻找 矛盾的term的第一个元素
// 为什么该term的日志都是矛盾的呢?也不一定都是矛盾的,只是这么优化减少rpc而已
// ?什么时候term会矛盾呢?很多情况,比如leader接收了日志之后马上就崩溃等等
reply->set_updatenextindex(args->prevlogindex());for (int index = args->prevlogindex(); index >= m_lastSnapshotIncludeIndex; --index) {if (getLogTermFromLogIndex(index) != getLogTermFromLogIndex(args->prevlogindex())) {reply->set_updatenextindex(index + 1);break;}
}reply->set_success(false);
reply->set_term(m_currentTerm);return;
之前说过,如果日志不匹配的话可以一个一个往前的倒退。但是这样的话可能会设计很多个rpc之后才能找到匹配的日志,那么就一次多倒退几个数。
倒退几个呢?这里认为如果某一个日志不匹配,那么这一个日志所在的term的所有日志大概率都不匹配,那么就倒退到 最后一个日志所在的term的最后那个命令。
5.snapshot快照
快照是什么?
当在Raft协议中的日志变得太大时,为了避免无限制地增长,系统可能会采取快照(snapshot)的方式来压缩日志。快照是系统状态的一种紧凑表示形式,包含在某个特定时间点的所有必要信息,以便在需要时能够还原整个系统状态。
如果你学习过redis,那么快照说白了就是rdb,而raft的日志可以看成是aof日志。rdb的目的只是为了崩溃恢复的加速,如果没有的话也不会影响系统的正确性,这也是为什么选择不详细讲解快照的原因,因为只是日志的压缩而已。
何时创建快照?
快照通常在日志达到一定大小时创建。这有助于限制日志的大小,防止无限制的增长。快照也可以在系统空闲时(没有新的日志条目被追加)创建。
快照的传输
快照的传输主要涉及:kv数据库与raft节点之间;不同raft节点之间。
kv数据库与raft节点之间:因为快照是数据库的压缩表示,因此需要由数据库打包快照,并交给raft节点。当快照生成之后,快照内设计的操作会被raft节点从日志中删除(不删除就相当于有两份数据,冗余了)。
不同raft节点之间:当leader已经把某个日志及其之前的内容变成了快照,那么当涉及这部的同步时,就只能通过快照来发送。
三.持久化
持久化就是把不能丢失的数据保存到磁盘。
1.持久化介绍
持久化的内容为两部分:
1.raft节点的部分信息;2.kvDb的快照
1.raft节点的部分信息
m_currentTerm
:当前节点的Term,避免重复到一个Term,可能会遇到重复投票等问题。
m_votedFor
:当前Term给谁投过票,避免故障后重复投票。
m_logs
:raft节点保存的全部的日志信息。
不妨想一想,其他的信息为什么不用持久化,比如说:身份、commitIndex、applyIndex等等。
applyIndex不持久化是经典raft的实现,在一些工业实现上可能会优化,从而持久化。
即applyIndex不持久化不会影响“共识”。
2.kvDb的快照
m_lastSnapshotIncludeIndex
:快照的信息,快照最新包含哪个日志Index
m_lastSnapshotIncludeTerm
:快照的信息,快照最新包含哪个日志Term,与m_lastSnapshotIncludeIndex
是对应的。
Snapshot是kvDb的快照,也可以看成是日志,因此:全部的日志 = m_logs
+ snapshot
因为Snapshot是kvDB生成的,kvDB肯定不知道raft的存在,而什么term、什么日志Index都是raft才有的概念,因此snapshot中肯定没有term和index信息。所以需要raft自己来保存这些信息。
故,快照与m_logs联合起来理解即可。
2.为什么要持久化这些内容
两部分原因:共识安全、优化。
除了snapshot相关的部分,其他部分都是为了共识安全。
而snapshot是因为日志一个一个的叠加,会导致最后的存储非常大,因此使用snapshot来压缩日志。
为什么snashot可以压缩日志?
日志是追加写的,对于一个变量的重复修改可能会重复保存,理论上对一个变量的反复修改会导致日志不断增大。
而snapshot是原地写,即只保存一个变量最后的值,自然所需要的空间就小了。
3.什么时候持久化
需要持久化的内容发送改变的时候就要注意持久化。
比如term 增加,日志增加等等。
*具体查看代码中的void Raft::persist() 相关内容
4.谁来调用持久化
谁来调用都可以,只要能保证需要持久化的内容能正确持久化。
代码中选择的是raft类自己来完成持久化。因为raft类最方便感知自己的term之类的信息有没有变化。
注意,虽然持久化很耗时,但是持久化这些内容的时候不要放开锁,以防其他线程改变了这些值,导致其它异常。
5.具体怎么实现持久化/使用哪个函数持久化
其实持久化是一个非常难的事情,因为持久化需要考虑:速度、大小、二进制安全。
因此代码中目前采用的是使用boost库中的持久化实现,将需要持久化的数据序列化转成std::string 类型再写入磁盘。
当然其他的序列化方式也少可行的,可以看到这一块还是有优化空间的。
四.kvServer
1.kvServer介绍
图中是raftServer,这里叫成kvServer,是一样的。
kvServer其实是个中间组件,负责沟通kvDB和raft节点。
那么外部请求是Server来负责,加入后变成了:
2.kvServer怎么和上层kvDB沟通,怎么和下层raft节点沟通
std::shared_ptr<LockQueue<ApplyMsg> > applyChan; //kvServer和raft节点的通信管道
std::unordered_map<std::string, std::string> m_kvDB; //kvDB,用unordered_map来替代
kvDB:使用的是unordered_map来代替上层的kvDB,因此没啥好说的。
raft节点:其中LockQueue 是一个并发安全的队列,这种方式其实是模仿的go中的channel机制。
在raft类中这里可以看到,raft类中也拥有一个applyChan,kvSever和raft类都持有同一个applyChan,来完成相互的通信。
3.kvServer怎么处理外部请求
从上面的结构图中可以看到kvServer负责与外部clerk通信。
那么一个外部请求的处理可以简单的看成:
- 接收外部请求。
- 本机内部与raft和kvDB协商如何处理该请求。
- 返回外部响应。
接收与响应外部请求
对于1和3,请求和返回的操作我们可以通过http、自定义协议等等方式实现,但是既然我们已经写出了rpc通信的一个简单的实现(源代码可见:这里),那就使用rpc来实现吧。
而且rpc可以直接完成请求和响应这一步,后面就不用考虑外部通信的问题了,好好处理好本机的流程即可。
相关函数是:
void PutAppend(google::protobuf::RpcController *controller,const ::raftKVRpcProctoc::PutAppendArgs *request,::raftKVRpcProctoc::PutAppendReply *response,::google::protobuf::Closure *done) override;void Get(google::protobuf::RpcController *controller,const ::raftKVRpcProctoc::GetArgs *request,::raftKVRpcProctoc::GetReply *response,::google::protobuf::Closure *done) override;
见名知意,请求分成两种:get和put(也就是set)。
如果是putAppend,clerk中就调用PutAppend 的rpc。
如果是Get,clerk中就调用Get 的rpc。
与raft节点沟通
在正式开始之前我们必须要先了解 线性一致性 的相关概念。
待续
代码如下(示例):