MIT6.5840-2023-Lab2C: Raft-Persistence

发布时间:2023年12月17日

前置知识

见上一篇 Lab2A。

实验内容

实现 RAFT,分为四个 part:leader election、log、persistence、log compaction。

实验环境

OS:WSL-Ubuntu-18.04
golang:go1.17.6 linux/amd64

Part 2C: persistence

大部分的bug都与这张图有关。如果前两次lab通过了千次以上测试,这边应该问题不大。注意rpc前后的状态判断。
在这里插入图片描述

实现持久化,重启后能快速恢复。真正的实现将在每次更改时在磁盘写下 raft 的持久状态,并在重新启动后从磁盘中读取状态。lab 实现时在 Persister 中存储和恢复。currentTerm、votedFor、log[]
persist 和 readPersist:通过 labgob实现。

// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
// before you've implemented snapshots, you should pass nil as the
// second argument to persister.Save().
// after you've implemented snapshots, pass the current snapshot
// (or nil if there's not yet a snapshot).
func (rf *Raft) persist() {
	// Your code here (2C).
	// Example:
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(rf.currentTerm)
	e.Encode(rf.votedFor)
	e.Encode(rf.log)
	raftstate := w.Bytes()
	rf.persister.Save(raftstate, nil)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
	if data == nil || len(data) < 1 { // bootstrap without any state?
		return
	}
	// Your code here (2C).
	// Example:
	r := bytes.NewBuffer(data)
	d := labgob.NewDecoder(r)
	var currentTerm, votedFor int
	var logs []Entry
	if d.Decode(&currentTerm) != nil ||
		d.Decode(&votedFor) != nil ||
		d.Decode(&logs) != nil {
		log.Println("decode fail")
	} else {
		rf.currentTerm = currentTerm
		rf.votedFor = votedFor
		rf.log = logs
		log.Println("restore success")
	}
}

优化:避免 leader 每次只往前移动一位;若日志很长的话在一段时间内无法达到冲突位置。若 leader 发送心跳时接收到的回复是 false,leader 会减小发送的 AppendEntriesArgs 中的 rf.nextIndex,但是这种每次仅减小 1 的方案无法在有限的时间内确定跟其他服务器发生冲突的下标位置。

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	// Your code here (2A, 2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if args.Term > rf.currentTerm { // all server
		rf.state = Follower
		rf.currentTerm = args.Term
		rf.votedFor = -1
		rf.persist()
	}

	if args.Term < rf.currentTerm {
		reply.Success = false
		reply.Term = rf.currentTerm
	} else if len(rf.log)-1 < args.PrevLogIndex ||
		rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
		log.Println(rf.me, "mismatch", "len(rf.log)", len(rf.log), "args.PrevLogIndex", args.PrevLogIndex)
		reply.Success = false
		reply.Term = rf.currentTerm

		timeout := time.Duration(250+rand.Intn(300)) * time.Millisecond
		rf.expiryTime = time.Now().Add(timeout)

		if args.PrevLogIndex > len(rf.log)-1 {
			reply.ConflictIndex = len(rf.log)
			reply.ConflictTerm = -1
		} else {
			reply.ConflictTerm = rf.log[args.PrevLogIndex].Term
			reply.ConflictIndex = 1
			for i := args.PrevLogIndex - 1; i >= 0; i-- {
				if rf.log[i].Term != reply.ConflictTerm {
					reply.ConflictIndex += i
					break
				}
			}

		}
	} else {
		reply.Success = true
		reply.Term = rf.currentTerm

		timeout := time.Duration(250+rand.Intn(300)) * time.Millisecond
		rf.expiryTime = time.Now().Add(timeout)

		// // delete
		// rf.log = rf.log[:args.PrevLogIndex+1]
		// // append
		// rf.log = append(rf.log, args.Entries...)

		insertIndex := args.PrevLogIndex + 1
		argsLogIndex := 0
		for {
			if insertIndex >= len(rf.log) ||
				argsLogIndex >= len(args.Entries) ||
				rf.log[insertIndex].Term != args.Entries[argsLogIndex].Term {
				break
			}
			insertIndex++
			argsLogIndex++
		}
		// for _, e := range rf.log {
		// 	log.Print(e)
		// }
		// for _, e := range args.Entries {
		// 	log.Print(e)
		// }
		if argsLogIndex < len(args.Entries) {
			rf.log = append(rf.log[:insertIndex], args.Entries[argsLogIndex:]...)
			rf.persist()
		}

		if args.LeaderCommit > rf.commitIndex {
			rf.commitIndex = int(math.Min(float64(args.LeaderCommit), float64(len(rf.log)-1)))
			log.Println(rf.me, "follower update commitIndex", "args.LeaderCommit", args.LeaderCommit, "len(rf.log)", len(rf.log), "rf.commitIndex", rf.commitIndex)
		}
	}

}
// 修改前
// if args.PrevLogIndex > 0 {
// 	rf.nextIndex[i] = args.PrevLogIndex
// }
// 修改后
						if reply.ConflictTerm >= 0 {
							lastIndex := -1
							for i := len(rf.log) - 1; i >= 0; i-- {
								if rf.log[i].Term == reply.ConflictTerm {
									lastIndex = i
									break
								}
							}
							if lastIndex >= 0 {
								rf.nextIndex[i] = lastIndex + 1
							} else {
								rf.nextIndex[i] = reply.ConflictIndex
							}
						} else {
							rf.nextIndex[i] = reply.ConflictIndex
						}

实验结果

在这里插入图片描述

文章来源:https://blog.csdn.net/qq_43680965/article/details/134973681
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。