欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 金融 > 从etcd学习raft

从etcd学习raft

2024/10/24 17:24:53 来源:https://blog.csdn.net/okm6666/article/details/141574327  浏览:    关键词:从etcd学习raft

在etcd的项目下有一个使用raft的示例,在之前读etcd代码的时候会比较难理解raft相关的代码。因此通过这个示例会更容易的了解raft相关的实现细节。

我将这部分代码推送到了我的git仓库:https://github.com/yugu2day/raftexample

在示例中,主要是构建了一个基于map的k-v存储服务, 支持PUTGET 对键值内容的存取,通过POSTDELETE 来添加/删除raft集群中的节点。

如何使用示例

可以按照代码中的README在本地启动这个简单的示例, 这里也大概描述一下过程。在构建出可执行文件并通过启动参数设置节点ID,服务端口之后,就可以通过HTTP请求进行键值对的更新查询。可以从下面的输出看到在启动之后会有一些日志打印出来, 我们启动的节点id是1,在日志中就包括member1 在不同的term中的角色切换。

go build -o raftexample
./raftexample --id 1 --cluster http://127.0.0.1:12379 --port 123802024/08/26 16:07:28 replaying WAL of member 1
2024/08/26 16:07:28 loading WAL at term 0 and index 0
raft2024/08/26 16:07:28 INFO: 1 switched to configuration voters=()
raft2024/08/26 16:07:28 INFO: 1 became follower at term 0
raft2024/08/26 16:07:28 INFO: newRaft 1 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0]
raft2024/08/26 16:07:28 INFO: 1 became follower at term 1
raft2024/08/26 16:07:28 INFO: 1 switched to configuration voters=(1)
raft2024/08/26 16:07:28 INFO: 1 switched to configuration voters=(1)
raft2024/08/26 16:07:29 INFO: 1 is starting a new election at term 1
raft2024/08/26 16:07:29 INFO: 1 became candidate at term 2
raft2024/08/26 16:07:29 INFO: 1 received MsgVoteResp from 1 at term 2
raft2024/08/26 16:07:29 INFO: 1 became leader at term 2
raft2024/08/26 16:07:29 INFO: raft.node: 1 elected leader 1 at term 2

通过http请求启动的服务, 可以看到现在GETPUT 请求是能够正常的获取和更新键值对的内容的。

curl -L http://127.0.0.1:12380/my-key -XPUT -d hello
curl -L http://127.0.0.1:12380/my-key
hello%
curl -L http://127.0.0.1:12380/my-key -XPUT -d hello11
hello11%

raftnode 的启动

在main函数中,会根据我们的启动参数构建对应的raftnode,后续raft相关消息的传递都是由raftnode完成。

func main() {...getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)...
}func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {commitC := make(chan *commit)errorC := make(chan error)rc := &raftNode{proposeC:    proposeC,confChangeC: confChangeC,commitC:     commitC,errorC:      errorC,id:          id,peers:       peers,join:        join,waldir:      fmt.Sprintf("raftexample-%d", id),snapdir:     fmt.Sprintf("raftexample-%d-snap", id),getSnapshot: getSnapshot,snapCount:   defaultSnapshotCount,stopc:       make(chan struct{}),httpstopc:   make(chan struct{}),httpdonec:   make(chan struct{}),logger: zap.NewExample(),snapshotterReady: make(chan *snap.Snapshotter, 1),// rest of structure populated after WAL replay}go rc.startRaft()return commitC, errorC, rc.snapshotterReady
}

在初始化raftnode实例后, 会开启一个goroutine启动该节点。可以注意一下waldirsnapDir, 在启动示例之后我们本地也会多出两个文件夹。这两个文件夹与我们存储内容的持久化相关, 因此在启动node之后首先会加载这两个文件的内容,将快照内容和WAL的内容加载到内存中。(这里的概念和redolog类似)加载结束后会写rc.snapshotterReady channel, main函数中就会初始化kv实例并启动web服务。

func main(){...kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)// the key-value http handler will propose updates to raftserveHttpKVAPI(kvs, *kvport, confChangeC, errorC)...

在replay结束后,会创建对应的raft通信相关的对象,并添加其他节点到本节点的关联关系。

	rc.transport = &rafthttp.Transport{Logger:      rc.logger,ID:          types.ID(rc.id),ClusterID:   0x1000,Raft:        rc,ServerStats: stats.NewServerStats("", ""),LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)),ErrorC:      make(chan error),}rc.transport.Start()for i := range rc.peers {if i+1 != rc.id {rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})}}

最后起两个goroutine, 分别处理raft相关的网络请求和相关channel的消息。

go rc.serveRaft()
go rc.serveChannels()

PUT请求的处理

这部分API的代码比较简单,将k-v解析出来之后会序列化并写入此前创建node时返回的proposeCchannel 中。在案例中并没有等待raft处理的结果,而是直接返回了状态码。因此我们在PUT之后马上GET可能不能马上得到最新的结果。那么消息写入proposeC中之后, 就会在前面提到的serveChannels方法中处理。 最终这个消息会和前面文章提到PUT请求的处理类似将消息加载到ready实例上。

在示例中, 会将ready实例的内容写WAL,并写入commitC

		// store raft entries to wal, then publish over commit channelcase rd := <-rc.node.Ready():rc.wal.Save(rd.HardState, rd.Entries)if !raft.IsEmptySnap(rd.Snapshot) {rc.saveSnap(rd.Snapshot)rc.raftStorage.ApplySnapshot(rd.Snapshot)rc.publishSnapshot(rd.Snapshot)}rc.raftStorage.Append(rd.Entries)rc.transport.Send(rd.Messages)applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))if !ok {rc.stop()return}rc.maybeTriggerSnapshot(applyDoneC)rc.node.Advance()

在初始化的kvStore里, 会处理commitC里的消息。并将其应用到map中

func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {for commit := range commitC {if commit == nil {// signaled to load snapshotsnapshot, err := s.loadSnapshot()if err != nil {log.Panic(err)}if snapshot != nil {log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)if err := s.recoverFromSnapshot(snapshot.Data); err != nil {log.Panic(err)}}continue}for _, data := range commit.data {var dataKv kvdec := gob.NewDecoder(bytes.NewBufferString(data))if err := dec.Decode(&dataKv); err != nil {log.Fatalf("raftexample: could not decode message (%v)", err)}s.mu.Lock()s.kvStore[dataKv.Key] = dataKv.Vals.mu.Unlock()}close(commit.applyDoneC)}if err, ok := <-errorC; ok {log.Fatal(err)}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com