Contents
前言
k8s使用 etcd 作存储,存放了集群内的所有内容,之前遇到过几个etcd进程crash,排查过程有些艰难,于是找找书系统性的补补课,京东上专门讲etcd书就2本,一个《etcd 技术内幕》,一个《云原生分布式存储基石-etcd 深入解析》,名字又臭又长。。。找了第一个的 pdf,买了第二个的纸质书。
etcd 使用了raft协议做一致性,和 zk 的 ZAB(类paxos)相比更易于理解和工程化实现。
- 第2章主要阐述了 raft 原理
- 第3章是etcd-raft的工程化实现,因为etcd 代码库更新较快,书上的部分代码已经过时,不过数据结构没什么变化,走读的时候加一些注释
第2章和第3章算是理解协议实现的核心章节,后面章节的内容对比看了《云原生分布式存储基石-etcd 深入解析》,偏实践运维,后面的结合工作内容再补吧
Etcd-概述
选型
1.consul-vs-etcd-vs-zookeeper对比图:链接
2.服务发现: Consul还是Zookeeper还是etcd:链接
etcd的优势
- 一致性算法:etcd使用raft协议,zookeeper使用的是paxos协议,raft相对更容易理解,工程化也更加容易
- 接口:etcd支持http和grpc(v3版本),zookeeper使用的是sdk客户端
- 性能:1万+的每秒写入,优于zookeeper
- 安全性:etcd支持TLS访问,zookeeper安全方面相对不完整
数据模型
- 键值对存储、watch机制
- 定期压缩、清理旧数据
- B树索引
- V2版本:存储全内存实现
- V3版本底层存储用的BoltDB
Etcd-Raft协议
一.概述
作用:一致性协议来保证集群中大部分节点可用时,集群可以给出一个正确的结果,服务不受影响
分类:Paxos、Raft
paxos比较早,但是缺点为:理解困难、工程化困难2点
二.Leader选举
模式:leader-follower模式
1.工作原理:
每个节点都会有一个状态机,保持为三种状态中的一种:Leader、Follower、Candidate
* Leader节点负责所有的请求的处理,并向Follower发送心跳检测
* Follower只响应Leader或Candidate的请求,不处理用户请求,收到请求会转发给leader
* Candidate是一个中间状态,当Follower收不到leader的检测消息,等到选举计时器(election timer)过期,就会把自己的状态设置为Candidate,触发新的选举
a.一般情况下的选举
A、B、C三个节点,A先超时,变为Candidate状态
* 1.集群初始化时,所有节点状态均为Follower,没有Leader,因此一段时间后收不到Leader的心跳election timer会超时,任期过时+1,Follower会变成Candidate,开始选举
* 2.A机器的election timer首先超时,变为Candidate,发起选举,此时A的election timer重置。
* 3.A投票给自己,并向B和C发起选举请求,B和C还处在任期0中,且为Follower状态,因此投票给A,A得到半数以上投票,成为Leader
* 4.投票后,B和C会将自己的任期+1,election timer重置,以免出现多个Candidate。
* 5.任期1内,A为leader,开始工作。各节点会记录期数,以及当期的投票结果。
* 6.A开始向B和C发送心跳检测,原则:心跳超时时间(hearthbeat timeout) << 选举超时时间(election timer),远小于。
Candidate发起发起选举前,会先尝试连接集群中的其他节点。如果连接不上,放弃本次选举。这个状态称之为:Prevote
b.多个Candidate的选举
A、B、C、D四个节点,A和B同时变为Candidate状态
* 1.第4期,A拿到了C的选票,B拿到了D的选票,
* 2.各自都没有超过一半,因此本次选举失败,无效,均变成Follower状态
* 3.重新等待,因为第4期A和B在发起选举时重置计时,C和D在投票后重置计时
* 4.election timer是一个随机数,因此这种情况一般不会连续出现
* 5.A再次变为Candidate,第5期,向BCD发起选举
* 6.A拿到3票,成为Leader
c.Leader宕机后的选举
A、B、C、D四个节点,原来的Leader节点A宕机了(任期5
* 1.leader宕机,不再有心跳发出,D首先超时,成为Candidate,发起选举
* 2.B和C收到投票,投给D,A宕机,不参与投票,过了半数,D成为Leader
* 3.A节点恢复后,收到了D的心跳(任期为6,大于A的任期5),A自动变为Follower,任期设置为6,重置计时器
d.思考
- 1.如果Leader节点频繁宕机,或者选举反复进行,怎么办?
要求:广播时间 << 选举超时时间 << 平均故障间隔时间
* 广播时间:节点直接发送心跳信息的完整返回时间 hearthbeat timeout:0.5ms~50ms
* 选举超时时间:election timer:200ms~1s
* 故障间隔时间:两次故障的平均时间:1个月或更多
这样可以最大程度保证:不会频繁选举(广播时间 << 选举超时时间),故障时间最多为200ms~1s(选举超时时间 << 平均故障间隔时间)
2.是不是谁先发起了选举请求,谁就得到了Leader?
不是,除了看先后顺序,还要取决于Candidate节点的日志是不是最新最全的日志,否则拒绝投票,防止出现日志(即数据)丢失的情况。
具体情况参考下面内容
2.日志复制
a.一般请求过程
A,B,C三个节点,A为Leader节点
* 1.Leader节点接到请求后(如 set a=10),将本请求计入本地log
* 2.A节点向B和C发送Append Entries消息(set a=10)
* 3.B和C将此信息计入本地log,并返回给A,已ok
* 4.A将日志信息置为 已提交(Commited),然后状态机处理,返回
* 5.向B和C发送消息,该信息已提交
* 6.B和C接到消息后,交给自己的状态机处理
b.某个Follower节点宕机
A,B,C三个节点,A为Leader节点,C宕机了
* 1.Leader节点会保存所有子节点的日志状态:nextIndex数组和matchIndex数组
* 2.Index:下一条发给子节点的索引值,match:已发送的最大的日志索引值
* 3.C宕机
* 4.A节点完整数据:1、2、3,A中记录了C的数据:1,nextIndex和matchIndex均为1
* 5.因此A发送给C的信息中带有index=2,成功后,A将C的数组更新
* 6.A发送给C的信息中带有index=2,成功后,A将C的数组更新
c.某个Leader点宕机
A,B,C三个节点,A为Leader节点,A宕机了,B成为了Leader节点
* 1.B成为新Leader后,并不知道原来Leader中的数组信息,B只知道自己的,[1,2,3]
* 2.B认为A、C和自己一样,有完整的数组
* 3.B向A发送index,A本身是完整的,返回成功
* 4.B向C发送index,C只有1,没有2,3,因此返回报错
* 5.B的C数组index前移1,C还是报错
* 6.B的C数组index再前移1,C返回成功
* 7.C开始补数据
2.网络分区
A、B、C、D、E五个节点,A为Leader节点
A和B在一个网络分区 C和D、E在一个分区,突然网络故障,两个分区无法相互访问
a.网络故障后的选举情况
A、B , A为Leader节点,任期为1
C、D、E, E为Leader节点,任期为2
* 1.A无法向CDE发送心跳,随着时间的流逝,CDE会有一个选举计时器超时
* 2.E成为新的Leader,因为有3票,超过半数。任期为2
* 3.这个时候,两个分区,会有两个Leader同时运行。
* 4.网络恢复之后
* 5.A向CDE发送心跳,因为任期1小于2,被CDE忽略
* 6.E向AB发送心跳,AB成为Follow,更新任期为2
* 7.E成为最新的Leader
问题:如果ABC在一个分区,DE在另一个分区,E拿不到半数以上的选票,会一直选举吗?
不会,Candidate发起发起选举前,会先尝试连接集群中的其他节点。如果连接不上,放弃本次选举。这个状态称之为:Prevote
a.网络故障后的请求情况
A、B , A为Leader节点,任期为1
C、D、E, E为Leader节点,任期为2
恢复后,新的Leader为E
* 1.当客户端请求到集群时,会随机选择一台服务器进行请求。
* 2.如果这台服务器不是Leader节点, 会转发给Leader节点。
* 3.因为A集群只有A和B,不超过半数,日志记录无法提交,因此请求到A的会异常
* 4.此时转发给C或者D,发现leader为E,转给E,正常返回
也可以采用代理方案,即Follower节点将请求转发给Leader,处理后在由Follower返回出去
2.日志压缩与快照
- 日志不能无限制的增长
- 节点重启的时候,会重放所有的日志,如果日志太多,会很慢。
- 压缩、清理
快照
- 1.快照是最简单。也是最常用的压缩办法。
- 2.每个节点都会有自己的快照。
- 3.快照会存放这个时间之前的所有日志记录。快照生成后,这些日志记录就可以删除,释放空间
- 4.快照内容:任期:3 索引号:6 节点状态:此时a值为10,b为20
如果Follower节点宕机
- 1.Leader节点将快照发送给Follower,采用单独的消息通道
- 2.根据节点元信息判断,Follower决定是否使用这份快照
3.其他技术点
只执行一次linearizable(线性化)
问题:Leader节点处理完,在返回给客户端时,网络故障,客户端会再向集群发起请求,此时会重复2次请求,怎么办?
方案:对于每一个请求都产生一个唯一的序列号,然后服务端对每个请求做去重,如果该请求之前已经处理过,就不再执行。
只读请求
Leader节点自定义
问题:假如现有的Leader节点需要升级系统,暂时将Leader改为其他机器,之后再转移回来?
方案:
- 1.让一个指定的Follower的本地日志与Leader完全同步
- 2.同步完成后,这个Follower立刻开始选举
- 3.由于其Term较大,原来的Leader会被替换
- 4.期间需要控制时间,防止其他Follwer在这个期间发起选举
Etcd-raft模块详解
跟着第3章走读部分代码,相关解释都放在了注释里
raft结构体
文件:raft/raft.go
type raft struct {
// 当前节点在集群中的ID
id uint64
// 当前任期号
Term uint64
// 当前任期中 当前节点将票投给了谁(节点id),没投票则为None
Vote uint64
// 只读模式
readStates []ReadState
// 日志:每个节点都会记录本地log
raftLog *raftLog
// 单条消息的最大字节数
maxMsgSize uint64
// 最大未提交字节数
maxUncommittedSize uint64
// 已经发送出去,但没响应的消息的最大限制数,如果超过了这个数目,则该节点不再发送消息
// 主要为了防止每个节点不断发送消息,导致整个集群瘫痪
maxInflight int
// 每个follower节点的日志复制情况(Next Index和Match Index),都会记录在Progress中
prs map[uint64]*Progress
// 集群中的Learner列表,仅用来接收Leader节点发送的消息,不会进行投票选举
learnerPrs map[uint64]*Progress
matchBuf uint64Slice
// 当前节点在集群中的角色:4种:StateFollower、StateCandidate、StateLeader、StatePreCandidate
state StateType
// 节点是否为Learner
isLearner bool
// 如果节点收到其他人的投票,则将投票人的对应的value设置为True,通过遍历该字典,判断是否投票过半数
votes map[uint64]bool
// 缓存了当前节点待发送的信息
msgs []pb.Message
// Leader节点的ID
lead uint64
// 当Leader漂移时,标记了新Leader的节点ID
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
pendingConfIndex uint64
uncommittedSize uint64
// 只读请求
readOnly *readOnly
// 选举计时器的指针
electionElapsed int
// 只有leader节点保留这个字段
heartbeatElapsed int
// 开启时:每隔一段时间,Leader都会判断下,自己连接到的节点数是否过半,如果没有,主动切换为Follower,避免网络分区时,旧Leader很晚被替换的问题
checkQuorum bool
// 如果开启:当一个Follower选举计时器超时,会切换为Candidate发起选举,但有可能只是这个Follower节点本身的网络问题,因此会先做一次预投票
// 即先连接到其他节点,问他们是否有意愿参与投票,拒绝的话,则取消这次选举
preVote bool
// 心跳超时时间
heartbeatTimeout int
// 选举超时时间,一旦超时,重新新一轮选举
electionTimeout int
// 选举计时器的上限:electiontimeout, 2 * electiontimeout - 1
randomizedElectionTimeout int
disableProposalForwarding bool
// 推进逻辑时钟的处理方法
// 如果当前是Leader节点,指向tickHeartbeats方法
// 如果当前是其他节点,指向tickElection方法
tick func()
// 收到消息时的处理方法
// 如果当前是Leader节点,指向stepLeader方法
// 如果当前是其他节点,指向stepFollower、stepCandidate方法
step stepFunc
logger Logger
}
Config结构体
文件:raft/raft.go
// 创建raft实例时需要的参数,会通过Config对象传递进去
type Config struct {
// 当前节点ID
ID uint64
// 记录了集群中的所有节点ID
peers []uint64
// 集群中的Learner列表,仅用来接收Leader节点发送的消息,不会进行投票选举
learners []uint64
// 也就是选举的超时时间,单位是Node.Tick;当Follower在当前选举周期内没有 ,收到任何消息时开始变成Candidate开始选举
ElectionTick int
// 为了维持其当前的角色发起的心跳请求
HeartbeatTick int
// 当前节点保存日志记录所使用的存储
Storage Storage
// 当前已经应用的记录位置:即最后一条Entry记录的索引
Applied uint64
MaxInflightMsgs int
CheckQuorum bool
PreVote bool
ReadOnlyOption ReadOnlyOption
Logger Logger
DisableProposalForwarding bool
}
Storage接口
文件: raft/storage.go
// Config定义中的Storage,后面的MemoryStorage即为一种实现
type Storage interface {
// InitialState returns the saved HardState and ConfState information.
// HardState:当前任期号Term,投给了哪个几点voter,已提交的entry记录的位置,
// ConfState: 所有节点的ID
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
// 存储了所有的Entry记录,该方法主要用于查询特定范围的记录
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
// 任期
Term(i uint64) (uint64, error)
// 最后一条Entry
LastIndex() (uint64, error)
// 第一条Entry
FirstIndex() (uint64, error)
// 返回最近一次的快照
Snapshot() (pb.Snapshot, error)
}
MemoryStorage:Storage的默认内存实现
文件: raft/storage.go
// MemoryStorage是在内存中实现了上述的Storage接口,保存了快照后的所有当前记录
// 继承了sync.Mutex,因此是同步加锁操作,通过raft goroutine执行
type MemoryStorage struct {
// Protects access to all fields. Most methods of MemoryStorage are
// run on the raft goroutine, but Append() is run on an application
// goroutine.
sync.Mutex
hardState pb.HardState
snapshot pb.Snapshot
ents []pb.Entry
}
ApplySnapshot: 更新快照
文件: raft/storage.go
// 如节点重启后,从快照文件中读取,并创建Snapshot实例,然后保存到MemoryStorage对象中
func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error {
// 加锁同步
ms.Lock()
// 执行结束后,释放锁
defer ms.Unlock()
// 比较快照文件中的Snapshot与当前MemoryStorage中的Snapshot哪个新一点?
msIndex := ms.snapshot.Metadata.Index
snapIndex := snap.Metadata.Index
if msIndex >= snapIndex {
// 如果快照文件中的比较旧,直接异常
return ErrSnapOutOfDate
}
// 如果快照文件中的比较新,则MemoryStorage覆盖现有的Snapshot
ms.snapshot = snap
// 重置ents,此时的ents将是空实例
ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}}
return nil
}
Append: 追加entries
文件: raft/storage.go
// 当上边的ApplySnapshot执行完后,就需要向MemoryStorage中追加entries记录了
func (ms *MemoryStorage) Append(entries []pb.Entry) error {
if len(entries) == 0 {
return nil
}
ms.Lock()
defer ms.Unlock()
// 获取当前的MemoryStorage中的firstIndex值
first := ms.firstIndex()
// 获取待处理的entries中的第一条记录的index值
last := entries[0].Index + uint64(len(entries)) - 1
// 如果待处理的entries都过时了,就无需处理
if last < first {
return nil
}
// 开始添加entries
// truncate compacted entries
// 首先截掉first前的记录,因为已经存在了
if first > entries[0].Index {
entries = entries[first-entries[0].Index:]
}
offset := entries[0].Index - ms.ents[0].Index
switch {
case uint64(len(ms.ents)) > offset:
// 直接向MemoryStorage的ents中追加entries列表
ms.ents = append([]pb.Entry{}, ms.ents[:offset]...)
ms.ents = append(ms.ents, entries...)
case uint64(len(ms.ents)) == offset:
ms.ents = append(ms.ents, entries...)
default:
raftLogger.Panicf("missing log entry [last: %d, append at: %d]",
ms.lastIndex(), entries[0].Index)
}
return nil
}
Entries: 获取Entries
文件: raft/storage.go
// 查询MemoryStorage中的Entries
func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) {
ms.Lock()
defer ms.Unlock()
offset := ms.ents[0].Index
if lo <= offset {
return nil, ErrCompacted
}
if hi > ms.lastIndex()+1 {
raftLogger.Panicf("entries' hi(%d) is out of bound lastindex(%d)", hi, ms.lastIndex())
}
// only contains dummy entries.
// 如果只有一条记录,则返回空?为啥不返回这一条
if len(ms.ents) == 1 {
return nil, ErrUnavailable
}
ents := ms.ents[lo-offset : hi-offset]
// 调用limitSize限制下ents的总切片大小
return limitSize(ents, maxSize), nil
}
文件: raft/util.go
// 限制返回的Entry的大小
func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry {
if len(ents) == 0 {
return ents
}
size := ents[0].Size()
var limit int
// limit 下标之前,总和小于maxSize
for limit = 1; limit < len(ents); limit++ {
size += ents[limit].Size()
if uint64(size) > maxSize {
break
}
}
return ents[:limit]
}
CreateSnapshot: 定时创建快照
文件: raft/storage.go
// 定时进行快照的创建,可以较少内存中的Entry数量
// i :新建Snapshot时最大索引值
// cs: 当前集群状态
// data:需要快照的数据
func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error) {
ms.Lock()
defer ms.Unlock()
// 边界检查:索引值必须大于当前Snapshot拥有的最大index,且小于MemoryStorage的lastIndex
if i <= ms.snapshot.Metadata.Index {
return pb.Snapshot{}, ErrSnapOutOfDate
}
offset := ms.ents[0].Index
if i > ms.lastIndex() {
raftLogger.Panicf("snapshot %d is out of bound lastindex(%d)", i, ms.lastIndex())
}
ms.snapshot.Metadata.Index = i
ms.snapshot.Metadata.Term = ms.ents[i-offset].Term
if cs != nil {
ms.snapshot.Metadata.ConfState = *cs
}
ms.snapshot.Data = data
return ms.snapshot, nil
}
// 上边的创建快照完成后,会执行Compact方法,将索引之前的entries全部抛弃,达到压缩ents的目的
func (ms *MemoryStorage) Compact(compactIndex uint64) error {
ms.Lock()
defer ms.Unlock()
offset := ms.ents[0].Index
if compactIndex <= offset {
return ErrCompacted
}
if compactIndex > ms.lastIndex() {
raftLogger.Panicf("compact %d is out of bound lastindex(%d)", compactIndex, ms.lastIndex())
}
i := compactIndex - offset
// 创建一个新的切片,用于存放compactIndex之后的Entry
ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)
ents[0].Index = ms.ents[i].Index
ents[0].Term = ms.ents[i].Term
ents = append(ents, ms.ents[i+1:]...)
ms.ents = ents
return nil
}
unstable结构体
- 客户端发来的请求,会先存入unstable中(一个数组),然后上层模块会将这些记录保存到其他地方(storage)
- 持久化后会删除unstable中的记录,没持久化时不稳定,所以称为unstable
- Leader节点中:客户端请求的Entry,Follower节点中:从Leader节点复制来的Entry
文件: raft/log_unstable.go
type unstable struct {
// 未持久化的快照
snapshot *pb.Snapshot
// 未写入Storage的Entry记录
entries []pb.Entry
//entries中的第一条Entry的索引值
offset uint64
logger Logger
}
- 当unstable中的Entry记录被写入到Storage之后,会调用stableTo()方法清掉entries中的Entry记录
func (u *unstable) stableTo(i, t uint64) {
// 查找对应Term的值,如果不存在,直接返回
gt, ok := u.maybeTerm(i)
if !ok {
return
}
// if i < offset, term 已经持久化
if gt == t && i >= u.offset {
u.entries = u.entries[i+1-u.offset:]
u.offset = i + 1
// 随着多次日志的截取操作,u.entries底层的数组会越来越大
// u.shrinkEntriesArray会在底层数组超过实际占用的两倍时,对底层数组进行缩减
u.shrinkEntriesArray()
}
}
func (u *unstable) shrinkEntriesArray() {
// We replace the array if we're using less than half of the space in
// it. This number is fairly arbitrary, chosen as an attempt to balance
// memory usage vs number of allocations. It could probably be improved
// with some focused tuning.
const lenMultiple = 2
if len(u.entries) == 0 {
u.entries = nil
} else if len(u.entries)*lenMultiple < cap(u.entries) {
newEntries := make([]pb.Entry, len(u.entries))
copy(newEntries, u.entries)
u.entries = newEntries
}
}
- 同理,当unstable中的snapshot记录被写入到Storage之后,会调用stableSnapTo()方法清掉snapshot字段
raftLog结构体
etcd中日志复制的核心就是在各个节点之间进行日志的复制,因此在etcd-raft模块中使用raftLog结构体来管理
type raftLog struct {
// 对应MemoryStorage实例,存储了快照数据,及快照之后的Entry记录
storage Storage
// 未写入Storage中的快照数据和Entry记录
unstable unstable
// 已提交的Entry记录中最大的索引值
committed uint64
// 已应用的Entry记录中最大的索引值
// 始终满足:committed<=applied
applied uint64
logger Logger
// 最大消息体大小
maxMsgSize uint64
}
说点什么
2 评论 在 "etcd 原理解析:读《etcd 技术内幕》"
[…] etcd 的选主架构这里不做描述,可以看这篇文章,这里对ReadIndex和Lease Read做下解释,即etcd […]
[…] etcd 原理解析:读《etcd 技术内幕》这篇文章主要是原理性的内容,本文主要是实践角度,谈谈平时用到的一些操作和监控,整理中… […]