etcd 原理解析:读《etcd 技术内幕》

前言

k8s使用 etcd 作存储,存放了集群内的所有内容,之前遇到过几个etcd进程crash,排查过程有些艰难,于是找找书系统性的补补课,京东上专门讲etcd书就2本,一个《etcd 技术内幕》,一个《云原生分布式存储基石-etcd 深入解析》,名字又臭又长。。。找了第一个的 pdf,买了第二个的纸质书。

etcd 使用了raft协议做一致性,和 zk 的 ZAB(类paxos)相比更易于理解和工程化实现。

  • 第2章主要阐述了 raft 原理
  • 第3章是etcd-raft的工程化实现,因为etcd 代码库更新较快,书上的部分代码已经过时,不过数据结构没什么变化,走读的时候加一些注释

第2章和第3章算是理解协议实现的核心章节,后面章节的内容对比看了《云原生分布式存储基石-etcd 深入解析》,偏实践运维,后面的结合工作内容再补吧

Etcd-概述

选型

1.consul-vs-etcd-vs-zookeeper对比图:链接
2.服务发现: Consul还是Zookeeper还是etcd:链接

etcd的优势

  • 一致性算法:etcd使用raft协议,zookeeper使用的是paxos协议,raft相对更容易理解,工程化也更加容易
  • 接口:etcd支持http和grpc(v3版本),zookeeper使用的是sdk客户端
  • 性能:1万+的每秒写入,优于zookeeper
  • 安全性:etcd支持TLS访问,zookeeper安全方面相对不完整

数据模型

  • 键值对存储、watch机制
  • 定期压缩、清理旧数据
  • B树索引
  • V2版本:存储全内存实现
  • V3版本底层存储用的BoltDB

Etcd-Raft协议

一.概述

作用:一致性协议来保证集群中大部分节点可用时,集群可以给出一个正确的结果,服务不受影响
分类:Paxos、Raft
paxos比较早,但是缺点为:理解困难、工程化困难2点

二.Leader选举

模式:leader-follower模式

1.工作原理:

每个节点都会有一个状态机,保持为三种状态中的一种:Leader、Follower、Candidate

* Leader节点负责所有的请求的处理,并向Follower发送心跳检测
* Follower只响应Leader或Candidate的请求,不处理用户请求,收到请求会转发给leader
* Candidate是一个中间状态,当Follower收不到leader的检测消息,等到选举计时器(election timer)过期,就会把自己的状态设置为Candidate,触发新的选举
a.一般情况下的选举

A、B、C三个节点,A先超时,变为Candidate状态

* 1.集群初始化时,所有节点状态均为Follower,没有Leader,因此一段时间后收不到Leader的心跳election timer会超时,任期过时+1,Follower会变成Candidate,开始选举
* 2.A机器的election timer首先超时,变为Candidate,发起选举,此时A的election timer重置。
* 3.A投票给自己,并向B和C发起选举请求,B和C还处在任期0中,且为Follower状态,因此投票给A,A得到半数以上投票,成为Leader
* 4.投票后,B和C会将自己的任期+1,election timer重置,以免出现多个Candidate。
* 5.任期1内,A为leader,开始工作。各节点会记录期数,以及当期的投票结果。
* 6.A开始向B和C发送心跳检测,原则:心跳超时时间(hearthbeat timeout) << 选举超时时间(election timer),远小于。

Candidate发起发起选举前,会先尝试连接集群中的其他节点。如果连接不上,放弃本次选举。这个状态称之为:Prevote

b.多个Candidate的选举

A、B、C、D四个节点,A和B同时变为Candidate状态

* 1.第4期,A拿到了C的选票,B拿到了D的选票,
* 2.各自都没有超过一半,因此本次选举失败,无效,均变成Follower状态
* 3.重新等待,因为第4期A和B在发起选举时重置计时,C和D在投票后重置计时
* 4.election timer是一个随机数,因此这种情况一般不会连续出现
* 5.A再次变为Candidate,第5期,向BCD发起选举
* 6.A拿到3票,成为Leader
c.Leader宕机后的选举

A、B、C、D四个节点,原来的Leader节点A宕机了(任期5

* 1.leader宕机,不再有心跳发出,D首先超时,成为Candidate,发起选举
* 2.B和C收到投票,投给D,A宕机,不参与投票,过了半数,D成为Leader
* 3.A节点恢复后,收到了D的心跳(任期为6,大于A的任期5),A自动变为Follower,任期设置为6,重置计时器
d.思考
  • 1.如果Leader节点频繁宕机,或者选举反复进行,怎么办?
要求:广播时间 << 选举超时时间 << 平均故障间隔时间
* 广播时间:节点直接发送心跳信息的完整返回时间 hearthbeat timeout:0.5ms~50ms
* 选举超时时间:election timer:200ms~1s
* 故障间隔时间:两次故障的平均时间:1个月或更多
这样可以最大程度保证:不会频繁选举(广播时间 << 选举超时时间),故障时间最多为200ms~1s(选举超时时间 << 平均故障间隔时间)

2.是不是谁先发起了选举请求,谁就得到了Leader?

不是,除了看先后顺序,还要取决于Candidate节点的日志是不是最新最全的日志,否则拒绝投票,防止出现日志(即数据)丢失的情况。

具体情况参考下面内容

2.日志复制

a.一般请求过程

A,B,C三个节点,A为Leader节点

* 1.Leader节点接到请求后(如 set a=10),将本请求计入本地log
* 2.A节点向B和C发送Append Entries消息(set a=10)
* 3.B和C将此信息计入本地log,并返回给A,已ok
* 4.A将日志信息置为 已提交(Commited),然后状态机处理,返回
* 5.向B和C发送消息,该信息已提交
* 6.B和C接到消息后,交给自己的状态机处理
b.某个Follower节点宕机

A,B,C三个节点,A为Leader节点,C宕机了

* 1.Leader节点会保存所有子节点的日志状态:nextIndex数组和matchIndex数组
* 2.Index:下一条发给子节点的索引值,match:已发送的最大的日志索引值
* 3.C宕机
* 4.A节点完整数据:1、2、3,A中记录了C的数据:1,nextIndex和matchIndex均为1
* 5.因此A发送给C的信息中带有index=2,成功后,A将C的数组更新
* 6.A发送给C的信息中带有index=2,成功后,A将C的数组更新
c.某个Leader点宕机

A,B,C三个节点,A为Leader节点,A宕机了,B成为了Leader节点

* 1.B成为新Leader后,并不知道原来Leader中的数组信息,B只知道自己的,[1,2,3]
* 2.B认为A、C和自己一样,有完整的数组
* 3.B向A发送index,A本身是完整的,返回成功
* 4.B向C发送index,C只有1,没有2,3,因此返回报错
* 5.B的C数组index前移1,C还是报错
* 6.B的C数组index再前移1,C返回成功
* 7.C开始补数据

2.网络分区

A、B、C、D、E五个节点,A为Leader节点
A和B在一个网络分区 C和D、E在一个分区,突然网络故障,两个分区无法相互访问

a.网络故障后的选举情况

A、B , A为Leader节点,任期为1
C、D、E, E为Leader节点,任期为2

* 1.A无法向CDE发送心跳,随着时间的流逝,CDE会有一个选举计时器超时
* 2.E成为新的Leader,因为有3票,超过半数。任期为2
* 3.这个时候,两个分区,会有两个Leader同时运行。
* 4.网络恢复之后
* 5.A向CDE发送心跳,因为任期1小于2,被CDE忽略
* 6.E向AB发送心跳,AB成为Follow,更新任期为2
* 7.E成为最新的Leader

问题:如果ABC在一个分区,DE在另一个分区,E拿不到半数以上的选票,会一直选举吗?

不会,Candidate发起发起选举前,会先尝试连接集群中的其他节点。如果连接不上,放弃本次选举。这个状态称之为:Prevote

a.网络故障后的请求情况

A、B , A为Leader节点,任期为1
C、D、E, E为Leader节点,任期为2
恢复后,新的Leader为E

* 1.当客户端请求到集群时,会随机选择一台服务器进行请求。
* 2.如果这台服务器不是Leader节点, 会转发给Leader节点。
* 3.因为A集群只有A和B,不超过半数,日志记录无法提交,因此请求到A的会异常
* 4.此时转发给C或者D,发现leader为E,转给E,正常返回

也可以采用代理方案,即Follower节点将请求转发给Leader,处理后在由Follower返回出去

2.日志压缩与快照

  • 日志不能无限制的增长
  • 节点重启的时候,会重放所有的日志,如果日志太多,会很慢。
  • 压缩、清理
快照
  • 1.快照是最简单。也是最常用的压缩办法。
  • 2.每个节点都会有自己的快照。
  • 3.快照会存放这个时间之前的所有日志记录。快照生成后,这些日志记录就可以删除,释放空间
  • 4.快照内容:任期:3 索引号:6 节点状态:此时a值为10,b为20
如果Follower节点宕机
  • 1.Leader节点将快照发送给Follower,采用单独的消息通道
  • 2.根据节点元信息判断,Follower决定是否使用这份快照

3.其他技术点

只执行一次linearizable(线性化)

问题:Leader节点处理完,在返回给客户端时,网络故障,客户端会再向集群发起请求,此时会重复2次请求,怎么办?

方案:对于每一个请求都产生一个唯一的序列号,然后服务端对每个请求做去重,如果该请求之前已经处理过,就不再执行。

只读请求
Leader节点自定义

问题:假如现有的Leader节点需要升级系统,暂时将Leader改为其他机器,之后再转移回来?

方案:

  • 1.让一个指定的Follower的本地日志与Leader完全同步
  • 2.同步完成后,这个Follower立刻开始选举
  • 3.由于其Term较大,原来的Leader会被替换
  • 4.期间需要控制时间,防止其他Follwer在这个期间发起选举

Etcd-raft模块详解

跟着第3章走读部分代码,相关解释都放在了注释里

raft结构体

文件:raft/raft.go

type raft struct {
    // 当前节点在集群中的ID
    id uint64

    // 当前任期号
    Term uint64

    // 当前任期中 当前节点将票投给了谁(节点id),没投票则为None
    Vote uint64

  // 只读模式
    readStates []ReadState

    // 日志:每个节点都会记录本地log
    raftLog *raftLog

    // 单条消息的最大字节数
    maxMsgSize         uint64

    // 最大未提交字节数
    maxUncommittedSize uint64

    // 已经发送出去,但没响应的消息的最大限制数,如果超过了这个数目,则该节点不再发送消息
    // 主要为了防止每个节点不断发送消息,导致整个集群瘫痪
    maxInflight        int

    // 每个follower节点的日志复制情况(Next Index和Match Index),都会记录在Progress中
    prs                map[uint64]*Progress

    // 集群中的Learner列表,仅用来接收Leader节点发送的消息,不会进行投票选举
    learnerPrs         map[uint64]*Progress
    matchBuf           uint64Slice

    // 当前节点在集群中的角色:4种:StateFollower、StateCandidate、StateLeader、StatePreCandidate
    state StateType

    // 节点是否为Learner
    isLearner bool

    // 如果节点收到其他人的投票,则将投票人的对应的value设置为True,通过遍历该字典,判断是否投票过半数
    votes map[uint64]bool

    // 缓存了当前节点待发送的信息
    msgs []pb.Message

    // Leader节点的ID
    lead uint64

    // 当Leader漂移时,标记了新Leader的节点ID
    // Follow the procedure defined in raft thesis 3.10.
    leadTransferee uint64

    pendingConfIndex uint64

    uncommittedSize uint64

    // 只读请求
    readOnly *readOnly

    // 选举计时器的指针
    electionElapsed int

    // 只有leader节点保留这个字段
    heartbeatElapsed int

    // 开启时:每隔一段时间,Leader都会判断下,自己连接到的节点数是否过半,如果没有,主动切换为Follower,避免网络分区时,旧Leader很晚被替换的问题
    checkQuorum bool

    // 如果开启:当一个Follower选举计时器超时,会切换为Candidate发起选举,但有可能只是这个Follower节点本身的网络问题,因此会先做一次预投票
    // 即先连接到其他节点,问他们是否有意愿参与投票,拒绝的话,则取消这次选举
    preVote     bool

    // 心跳超时时间
    heartbeatTimeout int

    // 选举超时时间,一旦超时,重新新一轮选举
    electionTimeout  int

    // 选举计时器的上限:electiontimeout, 2 * electiontimeout - 1
    randomizedElectionTimeout int

    disableProposalForwarding bool

    // 推进逻辑时钟的处理方法
    // 如果当前是Leader节点,指向tickHeartbeats方法
    // 如果当前是其他节点,指向tickElection方法
    tick func()

    // 收到消息时的处理方法
    // 如果当前是Leader节点,指向stepLeader方法
    // 如果当前是其他节点,指向stepFollower、stepCandidate方法
    step stepFunc

    logger Logger
}

Config结构体

文件:raft/raft.go


// 创建raft实例时需要的参数,会通过Config对象传递进去 type Config struct { // 当前节点ID ID uint64 // 记录了集群中的所有节点ID peers []uint64 // 集群中的Learner列表,仅用来接收Leader节点发送的消息,不会进行投票选举 learners []uint64 // 也就是选举的超时时间,单位是Node.Tick;当Follower在当前选举周期内没有 ,收到任何消息时开始变成Candidate开始选举 ElectionTick int // 为了维持其当前的角色发起的心跳请求 HeartbeatTick int // 当前节点保存日志记录所使用的存储 Storage Storage // 当前已经应用的记录位置:即最后一条Entry记录的索引 Applied uint64 MaxInflightMsgs int CheckQuorum bool PreVote bool ReadOnlyOption ReadOnlyOption Logger Logger DisableProposalForwarding bool }

Storage接口

文件: raft/storage.go

// Config定义中的Storage,后面的MemoryStorage即为一种实现
type Storage interface {

    // InitialState returns the saved HardState and ConfState information.
    // HardState:当前任期号Term,投给了哪个几点voter,已提交的entry记录的位置,
    // ConfState: 所有节点的ID
    InitialState() (pb.HardState, pb.ConfState, error)

    // Entries returns a slice of log entries in the range [lo,hi).
    // MaxSize limits the total size of the log entries returned, but
    // Entries returns at least one entry if any.
    // 存储了所有的Entry记录,该方法主要用于查询特定范围的记录
    Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)

    // 任期
    Term(i uint64) (uint64, error)

    // 最后一条Entry
    LastIndex() (uint64, error)

    // 第一条Entry
    FirstIndex() (uint64, error)

    // 返回最近一次的快照
    Snapshot() (pb.Snapshot, error)
}

MemoryStorage:Storage的默认内存实现

文件: raft/storage.go


// MemoryStorage是在内存中实现了上述的Storage接口,保存了快照后的所有当前记录 // 继承了sync.Mutex,因此是同步加锁操作,通过raft goroutine执行 type MemoryStorage struct { // Protects access to all fields. Most methods of MemoryStorage are // run on the raft goroutine, but Append() is run on an application // goroutine. sync.Mutex hardState pb.HardState snapshot pb.Snapshot ents []pb.Entry }

ApplySnapshot: 更新快照

文件: raft/storage.go

// 如节点重启后,从快照文件中读取,并创建Snapshot实例,然后保存到MemoryStorage对象中

func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error {

    // 加锁同步
    ms.Lock()
    // 执行结束后,释放锁
    defer ms.Unlock() 

    // 比较快照文件中的Snapshot与当前MemoryStorage中的Snapshot哪个新一点?
    msIndex := ms.snapshot.Metadata.Index

    snapIndex := snap.Metadata.Index
    if msIndex >= snapIndex {

        // 如果快照文件中的比较旧,直接异常
        return ErrSnapOutOfDate
    }

    // 如果快照文件中的比较新,则MemoryStorage覆盖现有的Snapshot
    ms.snapshot = snap

    // 重置ents,此时的ents将是空实例
    ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}}

    return nil
}

Append: 追加entries

文件: raft/storage.go


// 当上边的ApplySnapshot执行完后,就需要向MemoryStorage中追加entries记录了 func (ms *MemoryStorage) Append(entries []pb.Entry) error { if len(entries) == 0 { return nil } ms.Lock() defer ms.Unlock() // 获取当前的MemoryStorage中的firstIndex值 first := ms.firstIndex() // 获取待处理的entries中的第一条记录的index值 last := entries[0].Index + uint64(len(entries)) - 1 // 如果待处理的entries都过时了,就无需处理 if last < first { return nil } // 开始添加entries // truncate compacted entries // 首先截掉first前的记录,因为已经存在了 if first > entries[0].Index { entries = entries[first-entries[0].Index:] } offset := entries[0].Index - ms.ents[0].Index switch { case uint64(len(ms.ents)) > offset: // 直接向MemoryStorage的ents中追加entries列表 ms.ents = append([]pb.Entry{}, ms.ents[:offset]...) ms.ents = append(ms.ents, entries...) case uint64(len(ms.ents)) == offset: ms.ents = append(ms.ents, entries...) default: raftLogger.Panicf("missing log entry [last: %d, append at: %d]", ms.lastIndex(), entries[0].Index) } return nil }

Entries: 获取Entries

文件: raft/storage.go

// 查询MemoryStorage中的Entries
func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) {
    ms.Lock()
    defer ms.Unlock()
    offset := ms.ents[0].Index
    if lo <= offset {
        return nil, ErrCompacted
    }
    if hi > ms.lastIndex()+1 {
        raftLogger.Panicf("entries' hi(%d) is out of bound lastindex(%d)", hi, ms.lastIndex())
    }

    // only contains dummy entries.

    // 如果只有一条记录,则返回空?为啥不返回这一条
    if len(ms.ents) == 1 {
        return nil, ErrUnavailable
    }

    ents := ms.ents[lo-offset : hi-offset]

    // 调用limitSize限制下ents的总切片大小
    return limitSize(ents, maxSize), nil
}

文件: raft/util.go

// 限制返回的Entry的大小
func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry {
    if len(ents) == 0 {
        return ents
    }
    size := ents[0].Size()
    var limit int
    // limit 下标之前,总和小于maxSize
    for limit = 1; limit < len(ents); limit++ {
        size += ents[limit].Size()
        if uint64(size) > maxSize {
            break
        }
    }
    return ents[:limit]
}

CreateSnapshot: 定时创建快照

文件: raft/storage.go

// 定时进行快照的创建,可以较少内存中的Entry数量
// i :新建Snapshot时最大索引值
// cs: 当前集群状态
// data:需要快照的数据

func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error) {

    ms.Lock()
    defer ms.Unlock()

    // 边界检查:索引值必须大于当前Snapshot拥有的最大index,且小于MemoryStorage的lastIndex
    if i <= ms.snapshot.Metadata.Index {
        return pb.Snapshot{}, ErrSnapOutOfDate
    }

    offset := ms.ents[0].Index
    if i > ms.lastIndex() {
        raftLogger.Panicf("snapshot %d is out of bound lastindex(%d)", i, ms.lastIndex())
    }

    ms.snapshot.Metadata.Index = i
    ms.snapshot.Metadata.Term = ms.ents[i-offset].Term
    if cs != nil {
        ms.snapshot.Metadata.ConfState = *cs
    }
    ms.snapshot.Data = data
    return ms.snapshot, nil
}

// 上边的创建快照完成后,会执行Compact方法,将索引之前的entries全部抛弃,达到压缩ents的目的
func (ms *MemoryStorage) Compact(compactIndex uint64) error {
    ms.Lock()
    defer ms.Unlock()
    offset := ms.ents[0].Index
    if compactIndex <= offset {
        return ErrCompacted
    }
    if compactIndex > ms.lastIndex() {
        raftLogger.Panicf("compact %d is out of bound lastindex(%d)", compactIndex, ms.lastIndex())
    }

    i := compactIndex - offset

    // 创建一个新的切片,用于存放compactIndex之后的Entry
    ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)
    ents[0].Index = ms.ents[i].Index
    ents[0].Term = ms.ents[i].Term
    ents = append(ents, ms.ents[i+1:]...)
    ms.ents = ents
    return nil
}

unstable结构体

  • 客户端发来的请求,会先存入unstable中(一个数组),然后上层模块会将这些记录保存到其他地方(storage)
  • 持久化后会删除unstable中的记录,没持久化时不稳定,所以称为unstable
  • Leader节点中:客户端请求的Entry,Follower节点中:从Leader节点复制来的Entry

文件: raft/log_unstable.go

type unstable struct {
    // 未持久化的快照
    snapshot *pb.Snapshot

    // 未写入Storage的Entry记录
    entries []pb.Entry

    //entries中的第一条Entry的索引值
    offset  uint64

    logger Logger
}
  • 当unstable中的Entry记录被写入到Storage之后,会调用stableTo()方法清掉entries中的Entry记录
func (u *unstable) stableTo(i, t uint64) {

  // 查找对应Term的值,如果不存在,直接返回
    gt, ok := u.maybeTerm(i)
    if !ok {
        return
    }

    // if i < offset, term 已经持久化
    if gt == t && i >= u.offset {
        u.entries = u.entries[i+1-u.offset:]
        u.offset = i + 1
        // 随着多次日志的截取操作,u.entries底层的数组会越来越大
        // u.shrinkEntriesArray会在底层数组超过实际占用的两倍时,对底层数组进行缩减
        u.shrinkEntriesArray()
    }
}

func (u *unstable) shrinkEntriesArray() {
    // We replace the array if we're using less than half of the space in
    // it. This number is fairly arbitrary, chosen as an attempt to balance
    // memory usage vs number of allocations. It could probably be improved
    // with some focused tuning.
    const lenMultiple = 2
    if len(u.entries) == 0 {
        u.entries = nil
    } else if len(u.entries)*lenMultiple < cap(u.entries) {
        newEntries := make([]pb.Entry, len(u.entries))
        copy(newEntries, u.entries)
        u.entries = newEntries
    }
}
  • 同理,当unstable中的snapshot记录被写入到Storage之后,会调用stableSnapTo()方法清掉snapshot字段

raftLog结构体

etcd中日志复制的核心就是在各个节点之间进行日志的复制,因此在etcd-raft模块中使用raftLog结构体来管理


type raftLog struct { // 对应MemoryStorage实例,存储了快照数据,及快照之后的Entry记录 storage Storage // 未写入Storage中的快照数据和Entry记录 unstable unstable // 已提交的Entry记录中最大的索引值 committed uint64 // 已应用的Entry记录中最大的索引值 // 始终满足:committed<=applied applied uint64 logger Logger // 最大消息体大小 maxMsgSize uint64 }

Written by

说点什么

2 评论 在 "etcd 原理解析:读《etcd 技术内幕》"

avatar

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据

  Subscribe  
最新 最旧 得票最多
提醒
trackback

[…] etcd 的选主架构这里不做描述,可以看这篇文章,这里对ReadIndex和Lease Read做下解释,即etcd […]

trackback

[…] etcd 原理解析:读《etcd 技术内幕》这篇文章主要是原理性的内容,本文主要是实践角度,谈谈平时用到的一些操作和监控,整理中… […]