领导人选举 Raft 将所有节点分为三个身份:
Leader
:集群内最多只会有一个 leader,负责发起心跳,响应客户端,创建日志,同步日志。
Candidate
:leader 选举过程中的临时角色,由 follower 转化而来,发起投票参与竞选。
Follower
:接受 leader 的心跳和日志同步数据,投票给 candidate。
Raft使用心跳来维持leader身份。每个节点都以follower的身份启动。leader定期发送心跳给所有follower以维持自身的身份。每当follower收到了leader的合法心跳,就刷新自己的超时选举时间。每个节点在初始化时启动一个定时器协程节点选举超时后转变为candidate,提高任期,并发起这轮任期内的选举。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func (rf *Raft) startElection() { args := rf.genRequestVoteArgs() DPrintf("[Node %v] starts election with RequestVoteArgs: %v" , rf.me, args) rf.votedFor = rf.me rf.voteCnt = 1 rf.persist() for peer := range rf.peers { if peer == rf.me { continue } go func (peer int ) { reply := new (RequestVoteReply) if rf.sendRequestVote(peer, args, reply) { rf.mu.Lock() defer rf.mu.Unlock() rf.handleRequestVoteReply(peer, args, reply) } }(peer) } }
其他节点收到candidate的投票请求时,根据投票请求中所携带的candidate的任期,比较candidate和本身所存放日志的新旧程度判断是否投票
在candidate任期满足条件的情况下,只有当candidate的最后一条日志log-1与当前节点的最后一条日志log-2相比,log-1拥有更高的Term,或者log-1与log-2的Term相同,但log-1拥有更大的index时,才给这个candidate投票,否则拒绝投票。
需要注意的是,为了防止活锁现象,TA在guide中指出:
you should only restart your election timer if a) you get an AppendEntries
RPC from the current leader (i.e., if the term in the AppendEntries
arguments is outdated, you should not reset your timer); b) you are starting an election; or c) you grant a vote to another peer
尤其注意第三点,不能一收到更高任期的请求就重置自己的选举计时器,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { rf.mu.Lock() defer rf.mu.Unlock() defer rf.persist() if args.Term < rf.currentTerm { reply.VoteGranted = false reply.Term = rf.currentTerm return } if args.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != args.CandidateId { reply.VoteGranted = false reply.Term = rf.currentTerm return } if args.Term > rf.currentTerm { state := rf.state rf.changeState(Follower) if state != Follower { rf.reInitFollowTimer() } rf.currentTerm = args.Term rf.votedFor = -1 } if args.LastLogTerm < rf.getLastLog().Term || (args.LastLogTerm == rf.getLastLog().Term && args.LastLogIndex < rf.getLastLog().Index) { reply.VoteGranted = false reply.Term = rf.currentTerm return } rf.votedFor = args.CandidateId rf.reInitFollowTimer() reply.Term = rf.currentTerm reply.VoteGranted = true }
当收到半数以上的票数时成功当选leader,此时需要立即发送心跳以维持身份
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (rf *Raft) handleRequestVoteReply(peer int , args *RequestVoteArgs, reply *RequestVoteReply) { if rf.state == Candidate && rf.currentTerm == args.Term { if reply.Term > rf.currentTerm { rf.changeState(Follower) rf.electionTimer.Reset(RandomElectionTimeout()) rf.currentTerm = reply.Term rf.votedFor = -1 rf.voteCnt = 0 rf.persist() return } if reply.VoteGranted { rf.voteCnt += 1 if rf.voteCnt > len (rf.peers)/2 { DPrintf("[Node %v] receives majority votes in term %v" , rf.me, rf.currentTerm) rf.changeState(Leader) rf.reInitLeaderState() rf.broadcastHeartbeat(true ) } } } }
日志复制
快速恢复
场景1:S1没有任期6的任何Log,因此我们需要回退一整个任期的Log CASE 1 s1 4 5 5 s2 4 6 6 6
场景2:S1收到了任期4的旧Leader的多条Log,但是作为新Leader,S2只收到了一条任期4的Log。所以这里,我们需要覆盖S1中有关旧Leader的一些Log CASE 2 s1 4 4 4 s2 4 6 6 6
场景3:S1与S2的Log不冲突,但是S1缺失了部分S2中的Log CASE 3 s1 4 s2 4 6 6 6
为了区分上述三种情况,在AppendEntriesReply中添加下面三个字段:
XTerm:表示Follower中与Leader冲突的Log对应的任期号。Leader会在prevLogTerm中带上本地Log记录中,前一条Log的任期号。如果Follower在对应位置的任期号不匹配,它会拒绝Leader的AppendEntries消息,并将自己的任期号放在XTerm中。如果Follower在对应位置没有Log,那么这里会返回 -1。
XIndex:Follower中s对应任期号为XTerm的第一条Log条目的槽位号。
XLen:如果Follower在对应位置没有Log,那么XTerm会返回-1,此时XLen表示空白的Log槽位数。
因此根据上述三种场景,我们可以得到这三个字段的计算规则:
场景1。Follower(S1)会返回XTerm=5,XIndex=2。Leader(S2)发现自己没有任期5的日志,它会将自己本地记录的,S1的nextIndex设置到XIndex,也就是S1中,任期5的第一条Log对应的槽位号。所以,如果Leader完全没有XTerm的任何Log,那么它应该回退到XIndex对应的位置(这样,Leader发出的下一条AppendEntries就可以一次覆盖S1中所有XTerm对应的Log)。
场景2。Follower(S1)会返回XTerm=4,XIndex=1。Leader(S2)发现自己其实有任期4的日志,它会将自己本地记录的S1的nextIndex设置到本地在XTerm位置的Log条目后面,也就是槽位2。下一次Leader发出下一条AppendEntries时,就可以一次覆盖S1中槽位2和槽位3对应的Log。
场景3。Follower(S1)会返回XTerm=-1,XLen=2。这表示S1中日志太短了,以至于在冲突的位置没有Log条目,Leader应该回退到Follower最后一条Log条目的下一条,也就是槽位2,并从这开始发送AppendEntries消息。槽位2可以从XLen中的数值计算得到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() defer rf.persist() if args.Term < rf.currentTerm { reply.Term = rf.currentTerm reply.Success = false return } if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.votedFor = -1 } rf.changeState(Follower) rf.reInitFollowTimer() if args.PreLogIndex < rf.getFirstLog().Index { reply.Term = rf.currentTerm reply.Success = false return } lastLogIndex := rf.getLastLog().Index firstLogIndex := rf.getFirstLog().Index if args.PreLogIndex > lastLogIndex || rf.logs[args.PreLogIndex-firstLogIndex].Term != args.PreLogTerm { reply.Success = false reply.Term = rf.currentTerm if args.PreLogIndex > lastLogIndex { reply.XTerm = -1 reply.XLen = args.PreLogIndex - rf.getLastLog().Index } else { reply.XTerm = rf.logs[args.PreLogIndex-firstLogIndex].Term i := 0 for i = args.PreLogIndex; i > firstLogIndex; i-- { if rf.logs[i-firstLogIndex].Term != reply.XTerm { break } } reply.XIndex = i + 1 } return } for i, entry := range args.Entries { if entry.Index > lastLogIndex || rf.logs[entry.Index-firstLogIndex].Term != entry.Term { rf.logs = append (rf.logs[:entry.Index-firstLogIndex], args.Entries[i:]...) } } rf.followerCommit(args.LeaderCommit) reply.Success = true reply.Term = rf.currentTerm } func (rf *Raft) handleAppendEntriesReply(peer int , args *AppendEntriesArgs, reply *AppendEntriesReply) { if rf.state == Leader && rf.currentTerm == args.Term { if reply.Term > rf.currentTerm { rf.changeState(Follower) rf.reInitFollowTimer() rf.currentTerm = reply.Term rf.votedFor = -1 rf.persist() return } if reply.Success { rf.matchIndex[peer] = args.PreLogIndex + len (args.Entries) rf.nextIndex[peer] = rf.matchIndex[peer] + 1 rf.leaderCommit() } else { if reply.XTerm == -1 { rf.nextIndex[peer] -= reply.XLen } else { firstLogIndex := rf.getFirstLog().Index find := false for i := args.PreLogIndex; i >= firstLogIndex; i-- { if rf.logs[i-firstLogIndex].Term == reply.XTerm { find = true rf.nextIndex[peer] = reply.XIndex + 1 break } } if !find { rf.nextIndex[peer] = reply.XIndex } } } } }
持久化 根据论文所述,需要持久化的数据有:term,votedFor,log。持久化term与votedFor的原因在于确保一个任期内一个follower只能给一个candidate投票。而日志就不必多说了,既代表了上层状态机所需的command,也是维护raft集群在异常情况下重新选举的重要依据(只能给日志至少不比自己旧的候选人投票,参考RequestVote的实现)。
试想当一个follower节点S1给一个candidate节点S2投票后宕机并立即重启了,此时有另一个节点S3选举计时器超时,成为了candidate并向S1发送投票请求,假如S1没有持久化数据,那么它也会给S3投票,于是它在同一个任期内投了两次票,因此很有可能产生两个leader,由此发生脑裂。
所以当上述变量发生了修改,就需要进行持久化。
快照 raft节点之间、上层service与raft节点的通信过程如下图所示:
日志不能无限增长,需要定时进行快照,所以当上层service发现持久化文件的大小增长到一定程度或者时,便可执行生成快照的操作。
与快照相关的三个api是:
Snapshot
:由上层service主动调用,作用是通知下层的raft节点生成快照,快照点为最后一条被apply到状态机的日志的index,如下图所示:
InstallSnapshot
:用于leader向log过于落后的follower节点发送最新快照,当follower收到leader发送的快照副本,需要判断快照的时效性,再将快照信息通过ApplyMsg发送给上层service
CondInstallSnapshot
:判断下层raft节点在将applyMsg放入applyCh,到上层service收到applyMsg期间有没有apply log,如果apply了新log,则这个snapshot可能是过时的,不能应用到本地。
其他 脑裂问题 脑裂的概念:由于网络分区,出现了多个可以读写的leader
raft如何防止脑裂:raft通过限制在一个任期内只能给一个候选人投票来保证的。
假设这样一种场景:三个raft节点,选举产生了leader,随后由于网络分区,leader无法向follower发送心跳,因此剩余的两个follwer会发起一轮新的选举,会产生一个具有更高任期的新leader,于是这个新leader会正常的完成日志的同步和之后的应用到状态机的步骤,因为三个节点的集群,只要有半数以上,即两个节点拥有同一条日志,这条日志就可以视作commit了。老的leader由于网络分区,它还不知道已经产生了新的leader,于是它会继续发送心跳并且试图同步日志,但由于收不到超过半数的票,因此它写在本地的日志就无法完成提交;假设网络分区现象消失了,老的leader重新加入到了集群,它会继续尝试发送心跳,然后不管是新的leader还是另一个follower,它们收到的心跳的任期号都是比自己小的,因此它们就会拒绝这个老leader的日志,然后发送包含它们当前的任期号的响应,老leader收到这个响应之后,发现是一个更高的任期号,因此它就会降级为follower,然后接受新leader的日志。
网络分区问题