Raft is consensus algorithm that’s being widely used in distributed stores and queues. It was designed to be easy to understand and to implement. For example it’s used by NATs the distributed lightweight queue system, or by etcd the distributed key value store used by Kubernetes. In this article I will try to reverse engineer Raft just by using this visualization , implement and explain how it works. I will not be reading the “Raft paper ” before finishing this project. All following code was simplified and full code can be found here , yes the project is called Shipwreck as the Raft that crashed.
Overview
Raft works over log which is being kept on each node and each entry is agreed upon with majority of nodes.
Single leader is elected at the start. The leader’s role is to keep track of all writes to the system, and ensures the consensus for new writes. Clients of this distributed should always talk only with leader, usually request are proxied from followers to the leader.
Other nodes in the system are acting as followers, they are writing the same log based on what leader tells them to write.
If follower does not receive message from leader for some time it becomes candidate and starts new electing process.
I will be implementing it in go, so let’s start.
Node
To start we will need some struct that will hold the state of the node. Each node will have it’s own id
so we can later identify them, mode
or the state in which it is in.
Nodes also have to hold connection too all other nodes in the system. The most important attributes are the commitOffset
and syncOffset
used for tracking what was already synced and what was committed.
I will be adding attributes to it, but for now that’s enough.
type Node struct {
// Node data
id string
mode NodeMode
nodeConnList []conn
commitOffset int64
syncOffset int64
}
Node mode is “standard” go enum, representing on of three values. Each node starts in the Follower
mode, when it doesn’t hear from leader for time between 150-300ms it transitions to the Candidate
state.
If enough votes are secured then it will become the Leader
.
type NodeMode int64
const (
NodeModeFollower NodeMode = iota
NodeModeCandidate
NodeModeLeader
)
For keeping track of the last heartbeat node received from leader I will need some timer that I will listen to and reset if message is received. Go for this purpose have time.Ticker
and I can listen on it’s channel for ticks.
Altering the node struct to add few attributes.
type Node struct {
// ... previous attributes
// Voting
electionTimeout *time.Ticker
votedFor string
term int64
}
func (n *Node) Start() {
for {
select {
case <-n.electionTimeout.C:
// resets
if n.mode == NodeModeFollower {
n.becomeCandidate()
}
}
}
}
Leader election
Voting in Raft is similar to any democratic voting like the president election. Each node can only vote once in the current term, so I’ve added attributes to track the current term
and to track for what candidate the node votedFor
.
Method Start
is the where I will be listening for the various signals and is required to be called to start the node interacting with other nodes. I’m simply running infinite loop and waiting for signal from the ticker electionTimeout
that keeps
track if we want to switch from Follower
to Candidate
.
After becoming candidate node needs to start the vote process, that means calling all other nodes and requesting votes from them. Calling other nodes was abstracted to the conn
interface. In order to keep it simple I’ve decided to only include
three functions. ID
function is self explanatory, RequestVote
is called when node transitions to candidate requesting vote from the connected node. AppendLog
will be used as the hearthbeat and to sync logs from leader to followers.
type conn interface {
ID() string
RequestVote(ctx context.Context, vote Message[VoteRequest]) (Message[VoteReply], error)
AppendLog(ctx context.Context, log Message[LogRequest]) (Message[LogReply], error)
}
Now we have everything ready to start implementing the voting process. Node that becomes candidate will start the new term by incrementing it’s own and voting for itself. Then sending request to all peers in parallel. For collecting the results I could have chosen to use channel and collect the results that way, but in my opinion using the atomic values here is more elegant. If candidate received majority of votes, we can promote him to the leader.
func (n *Node) becomeCandidate() {
n.mode = NodeModeCandidate
n.candidateRequestPeerVotes()
}
func (n *Node) candidateRequestPeerVotes() {
n.term += 1
n.votedFor = n.id
ctx := context.Background()
granted := atomic.Int64{}
granted.Add(1) // Node voted for itself so we can just add one here
wg := sync.WaitGroup{}
for _, conn := range n.nodeConnList {
conn := conn
wg.Add(1)
go func() {
defer wg.Done()
reply, err := conn.requestVote(ctx, Message[VoteRequest]{
SourceID: n.id,
TargetID: conn.ID(),
Msg: VoteRequest{
Term: n.term,
},
})
if err != nil {
slog.ErrorContext(ctx, "Vote requested failed", "Err", err)
}
if reply.Msg.Granted {
granted.Add(1)
}
}()
}
wg.Wait()
hasMajorityVote := granted.Load() > int64(1+len(n.nodeConnList)/2)
if hasMajorityVote {
n.becomeLeader()
}
}
Great this was simple, now I just have to implement the resolving of votes and voting process is done. This should also be easy.
Each request should reset timers on the nodes, so I’ve created the resetTimer
function that’s going to be call as the first thing in both request handling methods.
Timer has to be reset to random value to try to minimize conflicts.
If node received vote message for older term then it’s currently it should not grand the vote.
If the term is the same, nodes can only vote for one candidate so we will grant the vote if the node already voted for the same id.
Last case is if the node received vote message for new term, in that case it should became follower, record the term and for whom it voted and reply with granted.
This is slightly simplified, for final implementation there’s few more checks for nodes that were desynced or separated from the cluster by network partition.
func (n *Node) resetTimer() {
d := time.Duration(rand.Int63n(150)+150) * time.Millisecond
n.electionTimeout.Reset(d)
}
func (n *Node) requestVote(ctx context.Context, vote Message[VoteRequest]) (Message[VoteReply], error) {
if n.stopped {
return Message[VoteReply]{
SourceID: n.id,
TargetID: vote.SourceID,
}, fmt.Errorf("Node unreachable")
}
n.resetTimer()
if vote.Msg.Term < n.term {
return Message[VoteReply]{
SourceID: n.id,
TargetID: vote.SourceID,
Msg: VoteReply{
Granted: vote.SourceID == n.votedFor,
},
}, nil
}
if vote.Msg.Term == n.term {
return Message[VoteReply]{
SourceID: n.id,
TargetID: vote.SourceID,
Msg: VoteReply{
Granted: vote.SourceID == n.votedFor,
},
}, nil
}
n.becomeFollower()
n.term = vote.Msg.Term
n.votedFor = vote.SourceID
return Message[VoteReply]{
SourceID: n.id,
TargetID: vote.SourceID,
Msg: VoteReply{
Granted: true,
},
}, nil
}
And that should be it, voting is done.
Log replication
Log replication is slightly harder to implement then leader election. Leader have to periodically sync the logs to all of nodes and if the majority of followers successfully append the logs only after that logs are committed and considered written.
In case of logs are rejected by the majority of followers leader will throw them out.
Let’s get to it.
func (n *Node[T]) Start() {
for {
select {
// ... other timer
case <-n.syncTicker.C:
if n.mode == NodeModeLeader {
n.syncPeers()
}
}
}
}
Leader is sending periodically sync message to followers regardless if there were any new logs in the system. So on each tick I’m calling syncPeers
. This method calls each peer with the messages that were added since last sync call, Entries
can also be empty.
Call also contains the current commit index, this is used to update the commit index in followers. And StartOffset
which is used by followers to reject syncs from stale leaders. After all results are collected, leader either commits or rejects the messages.
func (n *Node[T]) syncPeers() {
n.syncOffset = n.storage.Length()
ctx := context.Background()
wg := sync.WaitGroup{}
results := make(chan Message[LogReply], len(n.peers))
for _, peer := range n.peers {
peer := peer
wg.Add(1)
go func() {
defer wg.Done()
resp, err := peer.conn.syncLog(ctx, Message[LogRequest[T]]{
SourceID: n.id,
TargetID: peer.conn.ID(),
Msg: LogRequest[T]{
CommitIndex: n.commitOffset,
StartOffset: peer.commitOffset,
Entries: n.getUncommitedLogs(peer),
},
})
if err != nil {
slog.ErrorContext(ctx, "Ping failed", "Err", err)
}
results <- resp
}()
}
wg.Wait()
close(results)
// leader already written the value
successes := int64(1)
for result := range results {
if result.Msg.Success {
successes += 1
}
}
canCommit := successes > int64(1+len(n.peers)/2)
if canCommit {
err := n.commitLogs(n.syncOffset)
// err handle
n.commitOffset = n.syncOffset
} else {
err := n.storage.Discard(n.commitOffset, n.syncOffset)
// err handle
}
}
The whole storage of logs is abstracted into it’s own interface for simplicity and user of this package can swap it out if wanted. I will be implementing two types of storage memory and file backed.
But first the logic for follower side of log replication.
There’s few things I need to take care of. First if the start index is not align we will discard message after the StartOffset
since this means last message written to storage were not committed.
Second follower is rejecting any message that’s not aligned with it’s syncOffset
, this usually signifies some desync or leader that were disconnected.
If everything was aligned we will write into the log and move the commit offset on the follower.
func (n *Node[T]) syncLog(ctx context.Context, log Message[LogRequest[T]]) (Message[LogReply], error) {
// Messages were not committed
if n.syncOffset > log.Msg.StartOffset {
err := n.storage.Discard(log.Msg.StartOffset, n.storage.Length())
// err handle
n.syncOffset = log.Msg.StartOffset
}
if n.syncOffset != log.Msg.StartOffset {
return Message[LogReply]{
SourceID: n.id,
TargetID: log.SourceID,
Msg: LogReply{
CommitIndex: n.commitOffset,
Success: false,
},
}, nil
}
// Write new logs
err := n.storage.Append(log.Msg.Entries...)
// err handle
n.syncOffset = log.Msg.StartOffset + int64(len(log.Msg.Entries))
// Commit
err = n.commitLogs(n.commitOffset)
// err handle
n.commitOffset = log.Msg.CommitIndex
return Message[LogReply]{
SourceID: n.id,
TargetID: log.SourceID,
Msg: LogReply{
CommitIndex: n.commitOffset,
Success: true,
},
}, nil
}
And that’s basically it.
What’s missing
In real implementation one should also try to keep track of timings of the messages between nodes to dynamically tweak the timing of the timer in case when nodes are places far from each other.
I didn’t implement any snapshot functionality or log compaction, but both of these can be moved into the storage
interface implementation. Currently clients can only talk to the leader and there should be some implementation of message passing to allow clients
to talk to any node, I’m planning to implement this but that’s out of the scope of this blog post. Lastly I need to read the “Raft paper
” to see how many mistakes I did.
What’s next
I’m planning to finish this implementation so I can create distributed work queue or maybe key value store. For transport I want to implement gRPC interface. I’ve recently was asked to implement some work queue in interview task and I want to extend this implementation to be distributed with custom kubernetes operator, I will be writing about this more in the future. Secondly I would like to test this implementation of Raft with something like Jepsen to test if it’s really reliable.
Conclusion
Raft is really simple and fun to implement consensus algorithm and I highly recommend for anyone interested to learn more about how distributed systems work to try to implement it. I’ve expected this to be more challenging and it’s highly likely that I’ve missed something. There’ many production ready implementations you can look at to get inspiration, mine is really not production ready. But if you are still interested full code can be found here . PR’s and comments on repository are welcomed.