Implementing A Distributed Key Value Store With Strong Consistency And Elastic Scalability Using Raft

2023-04-12 · Leonardo Benicio
Summary

A comprehensive technical exploration of implementing a distributed key value store with strong consistency and elastic scalability using raft, covering key concepts, practical implementations, and real-world applications.

Contents

Building a Distributed Key-Value Store That Doesn’t Lie

Introduction

Imagine you’re building the next big thing—a platform for real-time inventory management, a cryptocurrency exchange, or a collaborative editing tool. Your application starts small, running on a single server with a local SQLite database. It works perfectly: every write is atomic, every read sees the latest data, and life is good. Then you grow. Traffic spikes. Your single server buckles. You add a second node with replication, but now you face a new problem: when a customer orders the last item, both replicas might think it’s in stock, double-selling the same unit. Or worse, after a brief network partition, your nodes disagree on the current balance, and your users see stale or conflicting data.

This is the consistency–availability trade-off in all its glory. The CAP theorem tells us that a distributed system can guarantee at most two of three properties: Consistency, Availability, and Partition Tolerance. In a world where network partitions are inevitable, you must choose: either sacrifice availability during a split (like traditional databases) or accept eventual consistency and tolerate temporary disagreements (like many NoSQL stores). But what if you could have both—strong, linearizable consistency and the ability to scale elastically, adding or removing nodes without downtime? That’s the promise of Raft, a consensus algorithm that makes building such a system not only possible but surprisingly manageable.

This post is not a theoretical treatise. It’s a hands-on guide to implementing a distributed key-value store that is both strongly consistent and elastically scalable, using the Raft protocol as its backbone. We’ll walk through the core challenges: how to replicate state across nodes so that every client sees the same truth, how to handle leader failures gracefully, and how to dynamically grow or shrink the cluster without losing data or violating consistency. By the end, you’ll have a working (if minimal) key-value store in Go that you can adapt to your own projects, and a deep understanding of the engineering decisions behind it.

But before we dive into code, let’s ground ourselves in the problem. Why is building a consistent, scalable key-value store so hard in the first place? And why does Raft, among all consensus algorithms, offer a practical path forward?

Why Consistency and Scalability Seem at Odds

In a single-node system, consistency is trivial. A write is immediately visible to all subsequent reads because there’s only one copy of the data. Scalability, however, is limited: hardware constraints bound throughput and capacity. To scale, you replicate your state across multiple nodes. Replication introduces the possibility of divergence: if two nodes accept writes concurrently, their states can become inconsistent. To prevent this, you need a protocol that orders writes globally, so every replica applies them in the same sequence. This is consensus.

Traditional consensus algorithms like Paxos are notoriously difficult to implement correctly. They’re often described as “simple in theory, maddening in practice.” Paxos’s protocol is elegant but leaves many real-world details unspecified—how to deal with leader failures, how to compact the log, how to handle configuration changes. As a result, many production systems either avoided consensus altogether (opting for weak consistency) or relied on complex, hard-to-debug implementations.

Raft was designed to address exactly this pain point. It breaks consensus into understandable sub-problems: leader election, log replication, safety, and membership changes. Its goal is to be as efficient as Paxos while being much easier to implement correctly. By adopting Raft, you gain a clear recipe for building a strongly consistent, fault-tolerant distributed system.

What Does “Strongly Consistent” Mean?

In the context of a key-value store, we want linearizability: every operation appears to occur atomically at some point between its invocation and its response, and the order of operations is the same across all clients. In simpler terms, once a write completes, any subsequent read must see the written value (or a later one). No stale reads. No reading your own write and seeing nothing. This is the gold standard of consistency, and it’s what most developers intuitively expect from a database.

Raft provides linearizable semantics by ensuring that all committed operations are applied to a replicated state machine in the same total order. The state machine—in our case, a simple map of keys to values—executes one operation at a time, sequentially. Because every non-faulty node executes the same log entries in the same order, the state machine’s state converges to the same result across the cluster. And because the leader coordinates all writes and (for linearizable reads) responds only after confirming its leadership, clients observe a consistent view.

Overview: What We’ll Build

We’ll implement a distributed key-value store with the following properties:

  • Strong consistency (linearizability) for both reads and writes.
  • Fault tolerance: the system withstands the failure of up to a minority of nodes (e.g., one node in a three-node cluster).
  • Elastic scaling: we can add or remove nodes without stopping the system, and without violating consistency.
  • Simple API: Get(key), Put(key, value), Delete(key).

We’ll build it in Go, using the net/rpc package for node-to-node communication and custom timers for leader election. We’ll keep the code minimal enough to fit in a blog post but complete enough to be run and tested. The final code, with comments and test harness, will be available on GitHub.

Now let’s dive into the Raft protocol, step by step, and see how each piece fits into our key-value store.


Part I: Understanding Raft

Raft is a consensus algorithm that manages a replicated log. It defines three roles for nodes: leader, candidate, and follower. In normal operation, there is exactly one leader that handles all client requests. The leader replicates log entries to followers. If the leader fails or becomes unreachable, a new leader is elected from among the followers. Raft ensures that the new leader will have all committed entries, so no committed data is lost.

The Replicated Log

The heart of Raft is a log of commands. Each log entry has three fields: an index (its position in the log), a term (the term in which the entry was created), and a command (the operation to be executed by the state machine). The term is a monotonically increasing integer that acts as a logical clock, helping nodes detect stale leaders and outdated information.

The log is replicated across all nodes. The leader accepts client requests, appends them to its local log, and sends them to followers via AppendEntries RPCs. When a majority of the cluster have acknowledged a newly appended entry, the leader marks it as committed and applies it to its state machine. Committed entries are safe: they will survive leader failures because any future leader will contain them.

Leader Election

Raft uses randomized timeouts to elect a leader. Followers start with a randomized election timeout (e.g., 150–300ms). If a follower receives no communication from the leader within its timeout, it transitions to candidate state, increments its term, and requests votes from other nodes. Each node votes at most once per term, granting its vote to the first candidate that has an up-to-date log (the candidate’s log must be at least as complete as the voter’s). The candidate that receives votes from a majority becomes the new leader. The leader then sends heartbeat AppendEntries messages to all nodes to assert its authority and reset their timeouts.

A critical safety property: at most one leader can be elected in any given term. Because votes require a majority, and majorities intersect, two candidates cannot both win in the same term. Also, the leader election ensures that a candidate with a more complete log cannot lose to a candidate with a less complete log; this is the log matching property.

Log Replication

When the leader receives a client request, it appends a new entry to its log, then sends AppendEntries RPCs in parallel to all followers. The RPC includes the previous log index and term, allowing followers to check consistency. If a follower’s log does not match the leader’s at the point where the new entry would be appended, the follower rejects the RPC. The leader then backtracks: it decrements the previous index and retries. This “conflict resolution” ensures that the leader and follower logs become consistent.

Once a majority of followers (including the leader itself) have acknowledged the entry, the leader commits it by incrementing its commit index. On subsequent AppendEntries RPCs, the leader includes the updated commit index, and followers apply the entry to their state machines.

Safety and the Commitment Rule

Raft guarantees that a committed entry is never lost. The key is that any leader for a given term must contain all entries committed in previous terms. This is enforced by the election restriction: a candidate can only win an election if its log is at least as up-to-date as that of the voter (i.e., its last term is higher, or the last term is the same but its log is longer). Consequently, a new leader will have all committed entries, and it can force its log on followers through the normal log replication process.

Cluster Membership Changes

One of Raft’s most elegant contributions is a safe mechanism for changing the set of nodes in the cluster—adding or removing servers. The straightforward approach of simply switching from one configuration to another is dangerous: during the transition, a majority of the old configuration and a majority of the new configuration might not overlap, leading to two possible leaders. Raft uses joint consensus, where the cluster transitions through an intermediate configuration that requires a majority of both the old and new configurations to commit. This ensures that no split-brain occurs.

In practice, the leader appends a special configuration entry to its log. Until this entry is committed, the cluster operates under both the old and new configurations simultaneously: a leader needs a majority from both sets to be considered leader, and a newly appended entry must be replicated to a majority of both sets. Once the joint configuration entry is committed, the leader can then append a second entry that finalizes the new configuration, after which the old configuration is discarded. This process is safe and does not require manual intervention.


Part II: Designing the Key-Value Store

With the Raft protocol in mind, let’s design our distributed key-value store. We’ll call it RaftKV. The architecture is straightforward: each node runs two components: a Raft consensus module and a key-value state machine. Clients interact only with the leader node, which forwards requests to the Raft module and then applies committed commands to the state machine, returning results to the client.

Client API

The public API for clients is a simple RPC interface:

  • Put(key string, value string) (error) – sets the value for a key.
  • Get(key string) (value string, error) – retrieves the value for a key.
  • Delete(key string) (error) – removes a key.
  • GetAll() (map[string]string, error) – returns the entire key-value map (for debugging or snapshotting).

Because the store is linearizable, Get must also go through the leader and be applied to the state machine to ensure it returns the most recent committed value. (Alternatively, the leader can use a read-only optimization called linearizable reads with leader lease, but we’ll keep things simple and consistent.)

State Machine

The state machine is a thread-safe map from strings to strings, with an additional term and index tracking to support idempotency (which we’ll cover later). It exposes an Apply(op Operation) (result interface{}) method that executes the operation and returns the result. Operations include:

type OpType int
const (
    OpPut OpType = iota
    OpGet
    OpDelete
    OpGetAll
)

type Operation struct {
    Op    OpType
    Key   string
    Value string   // used for Put
    RequestID uint64 // for idempotency (see below)
}

The state machine also maintains a mapping from client request IDs to responses, so that if a client retries a command that was already committed, the duplicate is detected and the previous result is returned without re-execution.

Persistence

Raft requires that the state machine and the Raft log survive crashes. We’ll use a simple file-based approach: the Raft log is serialized to disk (e.g., using gob), and the state machine is periodically snapshotted. For simplicity in this first implementation, we’ll keep the log in memory and rely on the cluster to recover from failures via leader election (since we have no persistent storage). In a production system, persistence is essential to prevent data loss on restarts.

Consistency Guarantee

Our design ensures linearizability because:

  1. All operations go through the leader.
  2. The leader appends each operation to the Raft log.
  3. The operation is applied to the state machine only after it is committed (i.e., replicated to a majority).
  4. Get is implemented by appending a read-only operation to the log, waiting for it to commit, and then returning the state machine’s result. (This is the simplest approach; optimizations exist, but for correctness, it’s safe.)

This approach is called strong consistency via state machine replication.


Part III: Implementing Raft in Go

Now let’s roll up our sleeves and implement the Raft core. We’ll use Go because its concurrency model (goroutines, channels) and RPC library align well with Raft’s asynchronous nature. The complete implementation is about 1,000 lines of code, but we’ll focus on the essential parts.

Data Structures

First, define the Raft node’s persistent state (on disk in production) and volatile state:

type Raft struct {
    mu        sync.Mutex
    peers     []string           // addresses of all nodes
    me        int                // index into peers
    state     NodeState          // follower, candidate, leader

    // Persistent state
    currentTerm int
    votedFor    int
    log         []LogEntry

    // Volatile state
    commitIndex int
    lastApplied int

    // Volatile state (leader only)
    nextIndex  []int
    matchIndex []int

    // Channels for internal communication
    applyCh     chan ApplyMsg
    enableElection bool
    ...
}

type LogEntry struct {
    Term    int
    Command Operation
    Index   int // implicit from log position
}

The applyCh is a channel through which committed operations are sent to the state machine goroutine.

Leader Election

We implement leader election using a timer that fires after a randomized interval. In the follower state, a goroutine checks for leader activity; if none (no AppendEntries from leader or no votes from candidates), it times out and starts an election.

The election routine:

  1. Increment current term.
  2. Set state to candidate.
  3. Vote for itself.
  4. Send RequestVote RPCs to all other peers in parallel.
  5. Collect votes. If majority votes are received, become leader.
  6. If a new term is received during election (e.g., from a higher-term candidate), step down.
func (rf *Raft) startElection() {
    rf.mu.Lock()
    rf.state = Candidate
    rf.currentTerm++
    rf.votedFor = rf.me
    term := rf.currentTerm
    rf.mu.Unlock()

    var votes int32 = 1 // self-vote
    args := RequestVoteArgs{Term: term, CandidateID: rf.me, LastLogIndex: lastLogIndex(), LastLogTerm: lastLogTerm()}
    for i := range rf.peers {
        if i == rf.me { continue }
        go func(peer int) {
            var reply RequestVoteReply
            if ok := rf.callRequestVote(peer, &args, &reply); ok {
                rf.mu.Lock()
                if reply.Term > rf.currentTerm {
                    rf.currentTerm = reply.Term
                    rf.state = Follower
                    rf.votedFor = -1
                } else if reply.VoteGranted && rf.state == Candidate {
                    atomic.AddInt32(&votes, 1)
                    if atomic.LoadInt32(&votes) > len(rf.peers)/2 {
                        rf.becomeLeader()
                    }
                }
                rf.mu.Unlock()
            }
        }(i)
    }
    // After election timeout, if not leader, start new election
}

Log Replication

The leader periodically sends AppendEntries RPCs to followers. Each RPC includes the entries from nextIndex[server] onward. If the follower’s log matches the leader’s at the expected position, it appends the new entries. Otherwise, it returns false, and the leader decrements nextIndex for that follower and retries.

For heartbeats, the leader sends an empty AppendEntries (no new entries) to maintain authority.

The follower handler for AppendEntries:

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    // Reject if term less than current term
    if args.Term < rf.currentTerm {
        reply.Success = false
        reply.Term = rf.currentTerm
        return
    }

    // If RPC has higher term, convert to follower
    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.state = Follower
        rf.votedFor = -1
    }

    // Reset election timer (we'll handle timeouts separately)
    // Perform log consistency check
    if args.PrevLogIndex > len(rf.log)-1 {
        reply.Success = false
        return
    }
    if args.PrevLogIndex >= 0 && rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
        reply.Success = false
        return
    }

    // Append entries
    // ... handle conflicts and append new entries ...
    // Update commitIndex from leader's commitIndex
    if args.LeaderCommit > rf.commitIndex {
        rf.commitIndex = min(args.LeaderCommit, len(rf.log)-1)
    }
    reply.Success = true
    reply.Term = rf.currentTerm
}

The leader, after receiving a successful AppendEntries reply from a majority, updates its matchIndex for that server and then checks if any new entries are committed (i.e., entries with index > commitIndex that have been replicated to a majority). The commit index advances to the highest index at which a majority of servers have acknowledged.

Client Request Handling

Clients send RPCs to the leader’s ClientRequest handler. The leader wraps the operation in a LogEntry, appends it to its log, and then proceeds with the normal replication. The leader does not respond to the client until the entry is committed and applied to the state machine.

Because the state machine is single-threaded (apply loop), we can block the client RPC goroutine waiting for a notification. We use a map from log index to a channel that is signalled when the operation is committed.

Linearizable Reads

For Get, we have two choices:

  1. Log-based reads: Append a read-only operation to the log (same as a write). This ensures the read sees all committed writes. However, it adds latency and log overhead.
  2. Leader-only reads with commitment check: The leader ensures it is still the leader by requesting a quorum of heartbeats (an alternative to logging reads). This is more efficient.

For simplicity, we’ll implement log-based reads initially. The state machine, upon applying a Get operation, looks up the key in its map and returns the value. Because the operation is committed, it linearizes with all preceding writes.


Part IV: Implementing the Key-Value State Machine

The state machine runs in its own goroutine, reading from the applyCh. It maintains a map and also a map of client request IDs to results to handle idempotency.

type KVServer struct {
    mu       sync.Mutex
    data     map[string]string
    applied  map[uint64]interface{} // requestID -> result (for idempotency)
    lastApplied int
    raft     *Raft
    // ...
}

func (kv *KVServer) applyLoop() {
    for msg := range kv.raft.applyCh {
        kv.mu.Lock()
        // If already applied (due to duplicate), ignore
        if msg.CommandIndex <= kv.lastApplied {
            kv.mu.Unlock()
            continue
        }
        op := msg.Command.(Operation)
        // Check idempotency
        if op.RequestID != 0 {
            if result, ok := kv.applied[op.RequestID]; ok {
                // Already applied, can respond with cached result
                // But we still need to update lastApplied?
                // We'll have a separate mechanism to respond to waiting clients.
                // Simplified: we assume client RPC handler uses message passing.
                kv.mu.Unlock()
                continue
            }
        }
        // Apply operation
        var result interface{}
        switch op.Op {
        case OpPut:
            kv.data[op.Key] = op.Value
            result = nil
        case OpGet:
            // Get is a read, return value
            val, ok := kv.data[op.Key]
            if ok {
                result = val
            } else {
                result = ""
            }
        case OpDelete:
            delete(kv.data, op.Key)
            result = nil
        case OpGetAll:
            // return copy
            copy := make(map[string]string)
            for k, v := range kv.data {
                copy[k] = v
            }
            result = copy
        }
        kv.applied[op.RequestID] = result
        kv.lastApplied = msg.CommandIndex
        kv.mu.Unlock()

        // Notify the waiting client RPC handler
        // This can be done via a channel keyed by command index or request ID.
    }
}

The client RPC handler in the leader’s KVServer will block until the operation is applied. It does so by recording a channel in a map indexed by the request ID (or the log index) and then waiting on that channel. The applyLoop above signals that channel after applying.

Client RPC Handlers

We expose an RPC ClientRequest that takes a generic operation and returns the result. This RPC is only accepted by the leader. If a follower receives it, it returns a “not leader” error, and the client contacts the suggested leader.

func (kv *KVServer) ClientRequest(args *ClientRequestArgs, reply *ClientRequestReply) error {
    // Check if leader
    if kv.raft.state != Leader {
        reply.LeaderHint = kv.raft.getLeader()
        reply.Err = ErrNotLeader
        return nil
    }
    // Generate request ID if not provided
    if args.RequestID == 0 {
        args.RequestID = makeRequestID() // (nodeID, counter)
    }
    // Create channel for this request
    ch := make(chan interface{}, 1)
    kv.mu.Lock()
    kv.pending[args.RequestID] = ch
    kv.mu.Unlock()

    // Submit operation to Raft
    index, term, err := kv.raft.Start(args.Op, args.RequestID)
    if err != nil {
        reply.Err = ErrShutdown
        return nil
    }
    // Wait for apply
    select {
    case result := <-ch:
        reply.Value = result
        reply.Err = OK
    case <-time.After(5 * time.Second):
        reply.Err = ErrTimeout
    }
    return nil
}

The Start method on the Raft leader proposes the operation to the log and returns the index. The actual commit notification goes through the apply channel, and the KVServer’s apply loop will retrieve the channel from the pending map and send the result.


Part V: Cluster Membership Changes

Now we tackle elastic scaling. The cluster configuration is a list of server addresses. Initially, the cluster has a fixed set. To add a new node, the leader appends a special configuration entry: first a joint consensus entry (containing both old and new configurations), then a final entry with only the new configuration. The process is detailed in the Raft paper, and we implement it as follows:

  1. The leader receives an admin RPC AddServer(newAddr).
  2. It creates a new configuration C_old_new = {old_set ∪ {newAddr}} (but the new node starts in a non-voting state? Actually for safety, we start with joint consensus: C_old and C_new combined, where both must achieve majority.)
  3. The leader appends ConfigChange{C_old_new} to the log, replicates it, and commits it. During this joint consensus phase, the leader uses both old and new majorities for decisions.
  4. After the joint configuration is committed, the leader appends a second entry with the final configuration C_new. This is also committed, and then the old servers are discarded.

Similarly, removal is the reverse: first joint consensus, then final configuration without the removed server.

Our implementation needs to:

  • Track the current configuration(s) on each node.
  • Modify the leader election and log replication quorum to require majorities from all active configurations during joint consensus.
  • Ensure that the new node, when added, receives the full log from the leader (via normal log replication) before it can participate in decisions.

The code becomes more involved, but the pattern is clear.


Part VI: Testing and Verification

Testing a distributed system is notoriously hard. We’ll write a series of tests that simulate network partitions, server crashes, and client requests.

Unit Tests

We can test leader election by starting a set of Raft instances (using in-memory RPC) and verifying that a single leader emerges. Then we kill the leader and confirm a new one is elected.

Integration Tests

We’ll set up a three-node cluster, issue Put and Get requests, and verify that results are consistent across crashes and restarts. We’ll also test cluster membership changes: add a fourth node, check that it catches up, and then remove a node.

Failure Injection

We can simulate network partitions by temporarily blocking RPCs between certain nodes. The Raft algorithm should gracefully handle partitions: if a majority is partitioned from a minority, the majority continues to commit, while the minority cannot. When the partition heals, the minority catches up via log replication.

Checking Linearizability

We can write a test that runs multiple clients concurrently, performing random operations, and then we collect all operations and check if they meet a linearizability specification (e.g., using the porcupine library). This is the gold standard for verifying consistency.


Part VII: Performance Considerations and Optimizations

Our simple implementation works correctly but may not be performant. Here are some realistic optimizations:

Batching

Instead of replicating each command individually, the leader can batch multiple client requests into one AppendEntries RPC. This reduces network round trips. The leader collects requests for a short interval (e.g., 1ms) or when a certain batch size is reached.

Pipelining

The leader can send AppendEntries RPCs without waiting for the previous RPC to complete, using a pipeline approach. This keeps the network fully utilized. The leader keeps track of the last successful reply and adjusts nextIndex accordingly.

Read-Only with Leader Lease

To avoid logging reads, the leader can use its term and a heartbeat exchange to determine that it is still the leader. It then responds to reads immediately from its state machine, provided no configuration change is in progress. This requires careful handling of clock skew (a clock drift can cause a leader to serve stale reads after a network partition). A common approach is to rely on the heartbeat interval as a safety margin.

Snapshotting

Over time, the Raft log grows unboundedly. To bound memory and restart time, we periodically take a snapshot of the state machine. When a follower is behind and the leader no longer has the log entries it needs, the leader sends a snapshot via InstallSnapshot RPC. The follower then truncates its log up to the snapshot point.

Non-Volatile Memory

Persisting the log to disk can be a bottleneck. Batching commits and using fsync only when necessary (e.g., after a batch commit) can improve throughput. Some implementations replicate to memory first and rely on the cluster for durability, but this risks data loss.


Part VIII: Conclusion

Distributed consistency is often viewed as a mystical, ivory-tower topic, but with Raft, it becomes a practical engineering tool. In this post, we built a distributed key-value store from scratch, tackling leader election, log replication, linearizable operations, and cluster membership changes. The result is a system that “doesn’t lie”: it provides strong, intuitive consistency while scaling elastically across nodes.

Of course, our implementation is minimal. Real-world systems like etcd, Consul, and TiKV are built on Raft but include production-grade persistence, performance optimizations, and operational tooling. However, the core ideas remain the same. Understanding them allows you to debug, extend, and better use these systems.

If you’re inspired to go deeper, I encourage you to read the Raft paper (it’s exceptional) and run the Raft visualization. Then, fork the code from this post and experiment. Add persistent storage. Implement snapshotting. Try to break it. That’s how you truly learn.

Ultimately, the promise of Raft is that you don’t have to compromise. You can have your consistency and scale it too. You just need the right protocol—and a willingness to implement it, one log entry at a time.


Additional Resources


Thank you for reading. If you found this post useful, share it with your fellow engineers and consider subscribing to my newsletter for more distributed systems deep dives.