A Rigorous Proof Of The Chandy Lamport Distributed Snapshot Algorithm For Channel State Capture

A comprehensive technical exploration of a rigorous proof of the chandy lamport distributed snapshot algorithm for channel state capture, covering key concepts, practical implementations, and real-world applications.
Contents
The Photograph That Cannot Be Taken: Why Distributed Snapshots Demand Proof
Prologue: The Impossible Photograph
Imagine you’re debugging a global-scale distributed database. A rare concurrency bug has brought your multi-region service to its knees—some records are duplicated, others missing. You suspect the bug manifests only under a specific interleaving of messages between replicas. If only you could freeze the entire system at a single instant, inspect each process’s state and every message in flight, and replay the sequence of events that led to the inconsistency. But in a distributed system without shared memory, without a global clock, and where messages travel at the speed of light (and queue up in operating system buffers), there is no “single instant.” The very idea of a global snapshot—a coherent photograph of the entire system—appears paradoxical: you can’t be everywhere at once, and you can’t stop time.
Think of the classic high-speed photography by Harold Edgerton: a bullet piercing an apple, frozen in time by a strobe flash lasting one millionth of a second. That works because the photographer controls both the camera and the lightning-fast flash, and the scene is bounded in a tiny physical space. Now imagine trying to capture a single “instant” across a thousand cameras spread across continents, each with its own local clock that drifts unpredictably, each camera seeing a different slice of reality because light itself takes time to travel. That’s the distributed snapshot challenge.
Yet distributed snapshots are not just a theoretical curiosity. They are the backbone of fault tolerance in systems from Apache Flink to Google’s Spanner. Checkpointing relies on capturing consistent global states to enable rollback recovery. Distributed debugging tools use snapshots to detect deadlocks, race conditions, or violated invariants. Even garbage collection in a distributed object store requires a snapshot of references across nodes. So the question is not whether we want a distributed snapshot, but whether we can trust the one we take.
In 1985, Leslie Lamport and K. Mani Chandy published a deceptively simple algorithm for capturing a consistent global state in an asynchronous message-passing system. The algorithm uses special “marker” messages that act like a developing agent in a film darkroom: when a process receives a marker on a channel, it records its local state and begins recording the state of that channel. Each process sends markers on all its outgoing channels, and eventually every process has recorded its own state and the state of each incoming channel. The result is a set of local snapshots that together form a consistent global state – a state that could have occurred in some real execution of the system.
But how do we know that state is actually possible? After all, the processes did not stop, messages kept flowing, and the markers themselves added new messages to the system. The algorithm constructs a state that may never have existed at any single real instant. Yet the beauty of the Chandy-Lamport algorithm – and the reason it has survived for nearly forty years – is that it comes with a proof that the recorded global state is reachable: there exists a reordering of the actual events that yields the recorded state at some logical point in the execution. This property lets us treat the snapshot as a valid intermediate state of the system, even though it was never observed in real time.
In this post, we’ll dive deep into the theory and practice of distributed snapshots. We’ll start by defining the system model and the notion of a consistent cut. We’ll walk through the Chandy-Lamport algorithm in detail, with examples and pseudocode. We’ll then examine its proof of correctness – a beautiful argument based on the concept of possible global states. We’ll explore real-world applications in fault tolerance, debugging, and distributed garbage collection, including how Apache Flink uses a variant for exactly-once processing. Finally, we’ll discuss limitations, extensions (like the Lai-Yang algorithm for non-FIFO channels), and practical engineering challenges. By the end, you’ll understand why the photograph that cannot be taken can nonetheless be trusted – and how that trust is built on rigorous proofs.
1. The System Model: Asynchronous, Message-Passing, No Shared Time
Before we can talk about snapshots, we must precisely define the world we are trying to photograph. The classic model used by Chandy and Lamport is an asynchronous distributed system consisting of a finite set of processes connected by unidirectional point-to-point channels. The model has the following key characteristics:
Processes: Each process has a local state (contents of memory, registers, program counter, etc.) that changes only when the process executes an internal action (e.g., updating a variable) or when it performs a send or receive action on a channel.
Channels: Channels are assumed to be reliable (no messages are lost, corrupted, or duplicated) and FIFO (first-in-first-out ordering) – although we will later relax this constraint. Channels have infinite buffers; messages can be in transit indefinitely (asynchrony). A process can send a message at any time; the message will eventually be received, but there is no bound on delivery time.
No global clock: There is no shared memory or synchronized physical clock. Each process has its own local clock whose drift relative to other clocks is unbounded. Therefore, “simultaneous” events cannot be meaningfully compared in real time.
No global pause: There is no way to stop all processes simultaneously. Even if you send a “stop” message to all processes, they will receive it at different times, and they may continue to send and receive other messages in the meantime.
This model is realistic for many real-world distributed systems, from data center clusters (with TCP connections) to peer-to-peer networks. The absence of global time forces us to define “instant” in terms of the happens-before relation (Lamport, 1978) rather than wall-clock timestamps.
1.1 Happens-Before and Cuts
Lamport’s happens-before relation (→) is the fundamental tool for reasoning about causality in an asynchronous system. Given events a and b, we say a → b if:
- a and b occur in the same process and a occurs before b in that process’s local order, or
- a is the sending of a message and b is the receipt of that message,
- or there exists an event c such that a → c and c → b (transitive closure).
If neither a → b nor b → a, the events are said to be concurrent.
A global state is an assignment of a local state to each process and a set of messages in transit for each channel. The system evolves through global transitions triggered by process actions (internal, send, receive). However, because the system is asynchronous, the exact sequence of global states that occurs is nondeterministic.
A consistent global state (also called a global snapshot or a consistent cut) is a global state that could have been observed by an external observer who magically freezes time at some logical instant. More formally, it is a set of local states (one per process) and channel states such that if event a occurred before event b in the happens-before relation, and b is included in the cut (i.e., its process’s local state was recorded after b), then a must also be included in the cut. In other words, the cut must be downward closed under happens-before.
To see why this matters, suppose we take a snapshot of process A after it has sent a message to B, but we take the snapshot of B before it has received that message. The message would be “in flight” – that’s fine. But suppose we take the snapshot of B after it has received the message, while A’s snapshot was taken before sending it. Then the global state would show a message that was received but never sent – an impossibility, violating causality. Such a state is inconsistent.
A consistent global state respects causality: every message that is recorded as received must also be recorded as sent (if not from a process whose state is also recorded, then the message must be in transit). Conversely, every message that is recorded as sent but not yet received appears as in-flight.
The goal of any distributed snapshot algorithm is to produce a consistent global state without freezing the entire system. And that is precisely what Chandy-Lamport achieves.
2. The Chandy-Lamport Algorithm: A Step-by-Step Walkthrough
The algorithm assumes FIFO channels. It introduces a special marker message that is sent alongside normal application messages. Markers are never lost and are delivered in order (FIFO) along each channel. The algorithm works in three phases: initiation, propagation, and termination.
2.1 Initiation
Any process can initiate a snapshot. In practice, a coordinator sends a “take snapshot” request to one process, or the algorithm is triggered by a failure detection mechanism. Let’s call the initiator process P0.
- P0 records its own local state.
- P0 sends a marker message on each of its outgoing channels (to all neighbors).
- P0 starts recording the state of each incoming channel (i.e., it creates an empty buffer to collect messages arriving after the marker).
2.2 Propagation (Marker Rule)
The core of the algorithm is the marker receiving rule. When a process receives a marker on a channel C, it must respond as follows:
- If this is the first marker the process has ever seen (i.e., the process has not yet recorded its local state for this snapshot), then:
- Record its own local state.
- Record channel C as empty (because the marker was the first thing received on that channel after the previous snapshot? Actually, the rule says: record the state of channel C as the sequence of messages that have arrived on C after the last state recording for that channel up to the point when the marker is received. Since this is the first marker, the process has not yet recorded any channel state; thus it records channel C as the empty sequence (because it hasn’t saved anything yet). Wait – careful: The algorithm says: When a process receives a marker on a channel, if it has not already recorded its state, it records its state, marks that channel as “recorded”, and begins recording all other incoming channels. Actually, let’s be precise with the classic description:
From the original paper (Chandy and Lamport, 1985):
A process records its local state when it receives a marker for the first time.
It also begins recording the state of all its incoming channels except the one on which the marker arrived (because the marker itself indicates that from that channel, everything before the marker is already accounted for).
For the channel on which the marker arrived, the process records that channel’s state as the sequence of messages received on that channel since the process last recorded its state (which, in this context, is from the beginning of time, or from the previous snapshot – since this is the first snapshot, it’s the empty sequence).
If the process has already recorded its state (i.e., it received a marker earlier from a different channel), then it simply records the state of channel C as the sequence of messages received on C since the last time it recorded that channel (which is from when it started recording that channel’s state, up to the point of receiving the marker).
After handling the marker, it sends markers on all its outgoing channels (but only once – if it already sent markers when it first recorded its state, it should not send them again). The typical implementation: upon first marker receipt, the process records local state, starts recording all incoming channels, sends markers on all outgoing channels, and then for subsequent markers, it simply records the channel state for the channel on which the marker arrived (stopping recording on that channel).
Let’s rephrase in a simpler step-by-step algorithm for a process P:
upon initiation by self or receipt of marker from any channel:
if local_state == unrecorded:
local_state = capture_current_state()
for each outgoing channel C_out:
send marker on C_out
for each incoming channel C_in:
start recording messages on C_in (initialize an empty buffer)
// The channel on which marker arrived (if any) will be handled separately below (or we can start recording all and then immediately "close" the incoming channel upon marker receipt? Standard: on first marker, record local state, then for the channel on which marker arrived, the channel state is empty (since no messages left before marker). We'll handle it in the next step.)
// Now, handle the marker itself: We need to record the state of the channel on which marker arrived.
// For that channel, we stop recording and define its state as the sequence of messages that have been buffered since we started recording it (or since the last reset). For the first marker, that buffer is empty. For subsequent markers, the buffer contains messages received after we started recording.
channel_state = stop_recording_and_get_buffer(channel_from_which_marker_arrived)
// At this point, the channel is considered fully recorded.
In many expositions, the algorithm is described as: when a process receives a marker on channel C and has not yet recorded its state, it records its state, then records channel C as empty (since no messages from C can be left before the marker – because the marker was the first thing received, so all messages before marker were already received), and starts recording all other incoming channels. When it later receives a marker on another channel D, it records channel D’s state as the sequence of messages received on D since the start of recording.
2.3 Termination
The algorithm terminates when every process has received a marker on every incoming channel. More precisely, a process knows it has completed its part when it has received a marker on each of its incoming channels (because at that point, all channels are recorded). However, to know that the entire system has finished, one typically uses a termination detection mechanism or a centralized coordinator that collects acknowledgments. The original paper assumes that the algorithm runs in the background and processes eventually finish; a simple way: each process, after recording all channel states, sends its local state and channel state records to an external coordinator (like a snapshot server). The coordinator detects termination when it receives responses from all processes.
2.4 Example: Two Processes
Let’s walk through a classic example with two processes, P and Q, connected by two unidirectional FIFO channels: P→Q and Q→P. Initially, P has local state (x=0), Q has local state (y=0). No messages in flight.
Time sequence:
- P sends application message m1 to Q.
- Q receives m1; updates y to 5.
- Q sends message m2 to P.
- P receives m2; updates x to 10. Now a snapshot is initiated at P after step 4? Let’s say P initiates snapshot at time t (after step 4). P records its local state (x=10). P sends a marker on its outgoing channel to Q. P starts recording its incoming channel from Q (empty buffer for now). Before the marker reaches Q, Q happens to send another application message m3 to P (after step 3 but maybe after step 4? Let’s set a concrete timeline):
- t0: P initiates snapshot, records local state (x=10), sends marker M1 to Q, starts recording incoming channel from Q.
- t1: P receives m2? Actually m2 was already sent by Q earlier and is in transit; P receives m2 before the snapshot started? Let’s reorder: Suppose Snapshot starts after all previous messages have been delivered. So at snapshot start: P state is (x=10), Q state is (y=5). No messages in flight. Then P sends marker M1 to Q. Q receives M1 as the first message on its incoming channel. Q sees it’s the first marker so Q records its local state (y=5), records channel from P as empty (since no messages before marker), sends marker M2 to P (on Q→P), and starts recording its incoming channel from P (which is already empty because the marker was the only message). So Q then receives no more messages. P receives M2 on its incoming channel; P already recorded its state, so it stops recording its incoming channel and records the sequence of messages received on that channel since it started recording (which is empty because M2 is the marker, and no application messages were received). So the snapshot yields: P state (x=10), Q state (y=5), both channels empty. That is consistent and matches the actual global state at some point (just after P sent the marker? Actually at real time, Q had already received m1? But here we have no in-flight messages. It’s fine.)
Now consider a scenario with messages in flight during snapshot. Suppose P initiates snapshot right after sending m1 but before Q receives it:
- P state: x=0 (just sent m1? Actually P hasn’t received any messages yet. Let’s set initial: both x=0, y=0.
- P sends m1 to Q (app message). Then immediately P initiates snapshot.
- P records its local state (x=0). Sends marker M1 to Q. Starts recording incoming channel from Q.
- Message m1 is in flight (not yet received by Q).
- Q receives m1 first? But FIFO: both m1 and M1 are on same channel (P→Q). Since m1 was sent before M1, m1 arrives before M1. So Q will receive m1, update y to 5 (say). Then Q receives M1 (marker). Q sees first marker, records its local state (y=5). Since it received m1 before M1, the channel from P is not empty; Q must record the state of channel P→Q as the sequence of messages received before the marker? No – careful: The algorithm says: when Q receives the first marker on a channel, it records its local state, and then it records the state of the channel on which the marker arrived as the sequence of messages received after the last state recording on that channel? Actually the algorithm defines channel state as the sequence of messages that have been delivered on that channel between the process’s most recent state recording and the marker. Since Q had never recorded its state before, the “most recent state recording” is considered the beginning of time? Typically, the channel state is recorded as the sequence of messages that arrived before the marker but that have not yet been accounted for. In the Chandy-Lamport algorithm, the channel state is defined as the sequence of messages that have been transmitted on that channel since the last local state recording of the sending process? Hmm, let’s revisit the original paper’s definition:
The paper defines the channel state recording protocol as follows:
- For each channel, a process records the sequence of messages that have been sent along that channel since the sending process recorded its local state up to the point when the sending process sent a marker on that channel.
- But the receiver uses markers to know when to stop recording incoming messages.
More concretely, the algorithm works by having each process, when it records its local state, start recording the state of each incoming channel – i.e., it logs every application message that arrives on that channel after the moment it started recording. When it later receives a marker on that channel, it stops recording that channel and defines the channel’s state as the sequence of logged messages. For the channel on which the first marker arrives, the recorded state is the empty sequence (because no messages could have arrived after starting recording on that channel before the marker, since the marker itself terminates the recording; but also, the messages that arrived before the marker but after the process recorded local state? That depends: if the process records local state after receiving the marker? Actually the order matters.
Let’s clearly delineate two phases for a process Q:
Phase 1: Marker arrives on channel C (and Q has not yet recorded its state)
- Record local state (snapshot of Q’s variables).
- Send markers on all outgoing channels.
- Start recording all incoming channels (including C? Some descriptions say start recording all except C, because for C the marker itself defines the end; the state of C is the sequence of messages that arrived on C between when Q last recorded its state (which is now) and the marker? Since the marker is the first thing on C after Q recorded local state? But the marker arrived at the same time as the recording? It’s easier: Q records local state. Then for channel C (the one on which marker arrived), the state is recorded as the sequence of messages that have arrived on C since the previous recording of Q’s state for that channel – and since there is no previous recording, that sequence is empty (because any messages that arrived before the marker were already processed and are part of Q’s local state). So Q records channel C as empty. For all other incoming channels, Q starts a buffer to collect messages that arrive after this moment.
Phase 2: Subsequent markers arrive on other channels D
- Stop recording on channel D; the recorded state is the buffer of messages that arrived on D since the start of recording.
Thus, in our example: P initiates snapshot, records its state (x=0), sends marker M1 to Q, starts recording incoming channel from Q (empty). Q receives m1 first, updates y=5. Then Q receives M1. Q has not recorded state yet, so it records local state (y=5). It marks channel P→Q as empty (since m1 arrived before the marker, but m1 is part of Q’s local state already; the channel state is empty because no messages from P are left in transit before the marker). Q sends marker M2 to P, and starts recording incoming channel from P (empty). Meanwhile, P’s incoming channel from Q is still being recorded (empty). Later, P receives M2. P has already recorded state, so it stops recording its incoming channel from Q; the recorded state is the sequence of messages received on that channel since it started recording (empty). So final snapshot: P state (x=0), Q state (y=5), both channels empty. Is this consistent? It shows that m1 has been sent and received (since Q’s state includes y=5). But message m1 is not in any channel state, and P’s state doesn’t reflect that it sent m1 (it still has x=0, but that’s fine – sending m1 didn’t change x). This is consistent: it represents a global state just after Q received m1 and before P received M2? But note P never received m2? Actually there is no m2 in this scenario; there is only m1. So the snapshot shows P with x=0 (unchanged) and Q with y=5. Could this have been a valid global state? Yes, at the moment after Q received m1 and before Q sent M2 (but M2 is a marker, not part of application). The marker M2 is not part of the application state; the global snapshot only includes application messages. So it’s consistent.
But wait: In this snapshot, there is no message in flight from P to Q, but we know that before Q received m1, there was a message in flight. The snapshot represents a state after that message was received. That’s fine. So the algorithm correctly captures the state where m1 has been delivered. In terms of consistency, every received message (the effect of m1 is in Q’s state) has been sent. Good.
Now consider a more involved example with concurrent messages and multiple channels. We’ll use a classic three-process example later.
2.5 Pseudocode
Below is a concise pseudocode for the Chandy-Lamport algorithm in a process Pi. We assume each process has access to:
local_state(initiallyNonefor no ongoing snapshot)recorded_channels(a dictionary mapping incoming channels to list of buffered messages, initially empty)num_incoming_channelsknown
# Global snapshot management
def initiate_snapshot():
if snapshot_id is already in progress: return
snapshot_id="idee81b6"
record_local_state(snapshot_id)
for each outgoing channel c:
send(c, MARKER, snapshot_id)
for each incoming channel c:
start_recording(c, snapshot_id) # create an empty buffer
def receive_marker(c, snapshot_id):
if snapshot_id not in current_snapshot:
# first marker for this snapshot
snapshot_id="i6edbc6d06"
record_local_state(snapshot_id)
for each outgoing channel c:
send(c, MARKER, snapshot_id)
for each incoming channel c:
if c != incoming channel from which marker received:
start_recording(c, snapshot_id)
else:
# record this channel as empty
recorded_channels[snapshot_id][c] = []
stop_recording(c, snapshot_id) # actually for the incoming channel, we record empty and stop
else:
# already recorded local state, just handle this channel
stop_recording(c, snapshot_id) # stops buffering on c, returns list of messages as channel state
def receive_application_message(c, msg):
# if in a snapshot and recording that channel, buffer the message
if recording and snapshot_id in current_snapshot and
recorded_channels[snapshot_id][c] is not None and
not channel_recorded[snapshot_id][c]:
# buffer message
append to buffer[c]
# process message normally (update local state)
process_message(msg)
Note that after a process has recorded its local state, it continues to operate normally, sending and receiving application messages. The recording of channel states only affects the buffering; it does not interfere with message delivery or process state. That’s the key: the system remains live while the snapshot is taken.
3. Proving Consistency: Why the Snapshot Is Trustworthy
Now we confront the central paradox: we took a snapshot of processes at different real times (when they first received a marker), and we recorded channel states that may include messages that arrived after some local states but before others. How can we prove that the resulting global state is consistent – i.e., that there exists a hypothetical linearization of events where this state appears as an intermediate global state?
Chandy and Lamport provided a proof based on possible global states and the notion of a consistent cut. Their argument can be summarized as follows:
Define a cut: The set of events that have occurred in each process up to the point when that process recorded its local state. For process Pi, that cut point is the moment just after it recorded its local state (but before processing any application messages that might arrive later? Actually, the local state recorded includes all events up to the moment of recording, including the receipt of the marker? In the algorithm, when a process records its local state upon first receiving a marker, that state includes the effects of all messages it received before the marker on any channel. So the cut for Pi includes all events that happened in Pi before the first marker arrival.
Define the cut’s global state as the collection of those local states plus the channel states as recorded (which are the sequences of messages that were in transit on each channel across the cut – meaning those sent before the sender’s cut but received after the receiver’s cut).
Claim: This cut is consistent. To prove it, assume for contradiction that the cut is inconsistent. That means there exists an event a that happens-before event b, where b is included in the cut (i.e., b’s process recorded its state after b) but a is not included (a’s process recorded its state before a). There are two cases:
- Case 1: a and b are in the same process. Then a must be before b in that process’s local order. If b is included, then that process recorded its state after b, so it must have recorded its state after a as well (since a is before b). Therefore a is also included. Contradiction.
- Case 2: a is a send of message m, and b is the receive of m in another process. Since b is included, the receive event happened in process Q after Q recorded its state? Actually if b is included, that means Q’s cut point is after b. So Q recorded its state after receiving m. The send event a happened in process P before the send. For inconsistency, a must not be included, meaning P’s cut point is before a (i.e., P recorded its state before sending m). Now, consider the channel from P to Q. Since P recorded its state before sending m, then according to the algorithm, P sent a marker on that channel after recording its state (markers are sent out on all outgoing channels right after recording). So the marker on that channel is sent after m? Wait – if P recorded its state before sending m, then after recording state, P sends markers on outgoing channels. So the marker on channel P→Q is sent after m? Actually no: the order of events in P: (1) local state recorded, (2) send markers. But m is sent at event a, and a occurs before P recorded its state? For a to be not included, we assumed P’s cut point is before a. That means P recorded its state before a occurred. So event a (send m) happens after P recorded state. That means P sends m after recording its state, but also before sending the marker? Since markers are sent immediately after recording state, the order would be: record state, then send markers (including the marker on channel to Q), and later send m? But a is a send of m. If P recorded state, then sent markers, then later sends m, then the marker arrives at Q before m (FIFO channel). Then Q receives marker first, then m. But then b (receive of m) would happen after Q received marker. Since Q upon receiving marker records its local state (if first marker), it would record its state before receiving m. Thus b (receive of m) would be after Q’s cut point, meaning b would not be included. Contradiction again.
- The only remaining possibility is that P sends m before recording its state, and the marker after. Then the marker arrives at Q after m. So Q receives m first, then marker. Then Q records state upon receiving marker, which is after receiving m. Hence b is included, and a is included because P recorded state after sending m? But we assumed a not included. If P sends m before recording state, then recording state happens after a, so a is included. So the assumption leads to contradiction. Therefore the cut must be consistent.
More formally, the proof shows that the recorded global state is a reachable state: there exists a sequence of events that transforms the system from the initial state before the snapshot to the recorded snapshot state, and that state is exactly the state right after all events that are “before” the cut have been processed, and before any events “after” the cut. Because the cut is consistent, we can reorder events (respecting causal order) to achieve that state.
3.1 The “Reachability” Property
The proof’s deeper implication is that the snapshot state is not just consistent but also reachable. This means that if the system were to be suspended at the moment captured by the snapshot (i.e., no further progress), its actual global state would be exactly the recorded one – if we could magically pause time. Since we cannot pause time, the snapshot gives us a logically valid frozen moment. This reachability property is essential for recovery: if we roll back to a snapshot, we are guaranteed that the recorded state could have occurred, so resuming from it is safe.
In contrast, if we took a naive “stop-the-world” approach by halting each process sequentially (e.g., ask process 1 to freeze, then process 2, etc.), the resulting global state could be inconsistent because process 2 might have received messages from process 1 after process 1 froze but before process 2 froze. That would lead to a message received but not sent – an impossible state. The Chandy-Lamport algorithm avoids that by using markers to delineate the cut.
4. Real-World Applications: From Flink to Spanner
4.1 Apache Flink: Exactly-Once State Consistency
Apache Flink is a stream processing framework that processes unbounded data streams with exactly-once semantics. Its fault tolerance mechanism is built on consistent checkpoints – essentially distributed snapshots of operator states and record positions in input sources. Flink’s checkpointing algorithm is a close variant of Chandy-Lamport, adapted for its specific dataflow model.
In Flink, the streaming topology is a directed acyclic graph (DAG) of operators connected by channels (which are FIFO network buffers). The checkpoint coordinator (JobManager) injects a barrier (equivalent to a marker) into each input source. The barriers flow downstream with the data. When an operator receives a barrier on one of its input channels, it:
- If it has not yet started its checkpoint, it records its current state (operator state as a snapshot of internal variables and state backends), then sends the barrier to all downstream channels.
- It then continues processing records from other input channels until it receives barriers on all of them. This is called aligned checkpointing.
- Once all barriers arrive, the operator acknowledges the checkpoint to the coordinator.
Flink also supports unaligned checkpoints where operators can take snapshots even before all barriers arrive, to reduce checkpoint latency at the cost of possibly larger state.
Because the barriers are sent along with data records, they maintain the same ordering as messages, guaranteeing consistency. The resulting snapshot (checkpoint) contains the state of every operator and the positions (offsets) in each source partition. If a failure occurs, Flink can restart from the latest consistent checkpoint, ensuring that no record is processed more than once or lost.
This is a direct application of Chandy-Lamport, adapted for a streaming pipeline with multiple sources and sinks.
4.2 Google’s Spanner: Snapshot Isolation with TrueTime
Google Spanner is a globally distributed SQL database that provides external consistency (linearizability) using a novel time service, TrueTime, which gives bounded clock uncertainty. Spanner uses snapshots for snapshot isolation (MVCC) rather than fault recovery, but the idea is similar: to read a consistent snapshot of the database at any given timestamp.
Spanner does not use the Chandy-Lamport algorithm directly because it assumes synchronized clocks (with known error bounds). Instead, it uses commit timestamps derived from TrueTime to ensure that a read at a timestamp T sees all transactions that committed before T and none that committed after T. However, the underlying concept of a consistent cut is still present: a read at timestamp T effectively defines a consistent global state if T is chosen from a commit timestamp that is later than all prior transactions’ commit timestamps.
Nevertheless, Chandy-Lamport shines in systems where global clocks are unavailable or unreliable. Spanner’s approach is an alternative but not a substitute for the asynchronous marker algorithm.
4.3 Distributed Debugging: Taking Snapshots of Buggy Systems
Distributed debugging tools often allow the developer to capture a global snapshot at a user-defined point (e.g., when a certain condition holds in a process). The snapshot can be used to examine the state of all processes and all channels. For example, the Crystal debugger for distributed systems (mid-1990s) used a variant of Chandy-Lamport to implement “global breakpoints.” When a process hits a breakpoint, it notifies a debugger, which then initiates a coordinated snapshot using markers to capture the state of all other processes. This gives the developer a moment frozen in logical time.
4.4 Distributed Garbage Collection
Distributed object systems (e.g., CORBA, Java RMI) need to reclaim objects that are no longer referenced by any process. A global snapshot of all reference counts across processes can be captured using Chandy-Lamport to determine whether an object is garbage. However, because references can be sent in messages, a consistent snapshot ensures that if an object is identified as unreachable in the snapshot, it was indeed unreachable at that logical instant and can be safely collected.
5. Limitations and Variants
5.1 Non-FIFO Channels
The Chandy-Lamport algorithm requires FIFO channels. If messages can be reordered, the marker might overtake an application message, leading to incorrect channel state recording. For non-FIFO channels, we need a different approach. The Lai-Yang algorithm (1987) solves this by having each process keep a color and using piggybacked sequence numbers. In this algorithm:
- Each process alternates between “white” and “red” phases. Initially, all are white.
- To start a snapshot, a process turns red and sends a marker (or a “turn red” message) on all outgoing channels. The marker includes a snapshot ID.
- For each incoming channel, the process records the sequence numbers of messages received after it turned red. When it receives a marker, it knows that any message with a sequence number less than the marker’s sequence number arrived before the marker, and any message with a higher sequence number arrived after. This way, it can reconstruct the channel state even if messages were reordered.
The Lai-Yang algorithm is more complex but works on non-FIFO channels at the cost of slightly larger overhead.
5.2 Termination Detection
The Chandy-Lamport algorithm assumes that markers are eventually delivered and that each process eventually receives markers on all incoming channels. However, the algorithm itself doesn’t tell processes when the entire snapshot is complete. Typically, a coordinator collects acknowledgments. In practice, one uses a termination detection algorithm built on top (like the Dijkstra-Scholten algorithm) or the snapshot algorithm itself can be modified to have a “done” message.
5.3 Large State and Incremental Snapshots
In real systems, process states can be gigabytes (e.g., in-memory databases, streaming operator states). Taking a full local snapshot may be expensive. Techniques like incremental checkpoints (saving only changes since last snapshot) are often used. The snapshot algorithm itself remains the same; only the local recording mechanism changes. However, incremental snapshots require careful handling of consistency with respect to the markers – the state captured must be a consistent cut, not merely a delta. Modern systems like Flink use asynchronous snapshots where the operator state is snapshotted while processing continues, leveraging copy-on-write or transactional state backends.
5.4 Handling Failures During Snapshot
What if a process crashes while the snapshot is in progress? The snapshot may be incomplete. Recovery protocols often handle this by having a coordinator that waits for acknowledgments from all processes. If a process fails, the coordinator can abort the snapshot and retry. Alternatively, one can use a stabilization algorithm where the snapshot is considered valid even if some processes fail, as long as the remaining processes have a consistent view? That becomes complex.
In practice, systems like Flink have a timeout mechanism; if an operator does not acknowledge the checkpoint within a certain time, the checkpoint is marked as failed and a retry is initiated.
5.5 Non-Blocking vs. Blocking
The Chandy-Lamport algorithm is non-blocking: processes do not stop sending or receiving application messages while the snapshot is in progress. However, they do need to buffer messages on incoming channels that they are recording. This buffering can lead to increased memory usage during the snapshot. In high-throughput systems, the buffering can be significant. The algorithm also adds the overhead of sending markers. Nevertheless, the non-blocking property is crucial for production systems that cannot halt.
6. Advanced Proof: The Reachable State Theorem
We earlier gave an informal proof of consistency. Let’s now sketch a more formal proof using the concept of a global state graph.
Define the system as a set of processes and channels. The state of the system evolves through events. An event is a triple (process, action, timestamp) where action is either internal, send, or receive. We can define the global state before an event and after. The evolution respects causality.
Define a cut as a set of events, exactly one per process (the last event before the local state recording). Let cut_process(Pi) be the event in Pi that is the last event included in the cut. Then the recorded local state is the state of Pi after that event. The recorded channel state for channel C: from Pi to Pj includes all messages that were sent in Pi after cut_process(Pi) and received in Pj before cut_process(Pj). But in the algorithm, because markers are used, the recorded channel state is actually the set of messages that were sent before the sender recorded its state but received after the receiver recorded its state? Let’s be precise.
In the algorithm:
- Process Pi records its state at the moment it first receives a marker (or initiates). That moment in Pi’s execution is after all events up to that point.
- For each channel C (Pj → Pi), Pi records the sequence of messages it receives on C after recording its own state but before receiving a marker on C (which it will at some future time). Those messages are exactly those that were sent by Pj after Pj recorded its state? Not necessarily – because Pj may have sent messages before its own state recording that arrived after Pi’s state recording, but the marker from Pj ensures ordering.
The formal proof uses an induction on the number of markers received. The key lemma is that the state recorded by the algorithm is exactly the state that the system would be in if it were to take a cut that includes all events that happen before the marker events. Because markers are sent after local state recording at the sender, and they are received after messages sent before that state recording (FIFO), the cut defined by the marker receipts is consistent.
For a rigorous treatment, see the original paper or Nancy Lynch’s Distributed Algorithms.
7. Practical Engineering: Code Example in Python
Below is a simplified simulation of the Chandy-Lamport algorithm in Python. This is not production code, but it illustrates the core logic.
import threading
import queue
import uuid
class Process:
def __init__(self, name, state=0):
self.name = name
self.local_state = state
self.outgoing_markers = set() # markers to send
self.incoming_channels = {} # name: Queue
self.snapshot_state = None
self.recorded_channels = {}
self.channel_buffers = {}
self.current_snapshot_id = None
self.frozen = False
def send_marker(self, target, snapshot_id):
# marker message
target.incoming_channels[self.name].put(('MARKER', snapshot_id))
def send_message(self, target, msg):
target.incoming_channels[self.name].put(('MESSAGE', msg))
def take_snapshot(self, snapshot_id):
if self.current_snapshot_id is not None:
return # already in a snapshot
self.current_snapshot_id = snapshot_id
self.snapshot_state = self.local_state
# send markers to all outgoing channels
for target_name, target_process in self.outgoing_markers.items():
self.send_marker(target_process, snapshot_id)
# start recording all incoming channels
for channel_name, _ in self.incoming_channels.items():
self.channel_buffers[channel_name] = []
def receive(self, snapshot_id=None):
# process next message from any incoming queue (simplified)
for channel_name, ch_queue in self.incoming_channels.items():
if not ch_queue.empty():
msg_type, payload = ch_queue.get()
if msg_type == 'MARKER':
self.handle_marker(channel_name, payload)
else:
self.handle_message(channel_name, payload)
def handle_marker(self, channel_from, snapshot_id):
if self.current_snapshot_id is None:
# first marker
self.take_snapshot(snapshot_id)
# record channel_from as empty
self.recorded_channels[channel_from] = []
# stop recording for channel_from? Actually we haven't started? In take_snapshot we started all channels, but for channel_from we should stop immediately.
# Override: since we started all, we need to erase buffer for channel_from and mark as empty.
del self.channel_buffers[channel_from]
else:
# already have snapshot; stop recording this channel
if channel_from in self.channel_buffers:
self.recorded_channels[channel_from] = self.channel_buffers[channel_from]
del self.channel_buffers[channel_from]
else:
self.recorded_channels[channel_from] = []
def handle_message(self, channel_from, msg):
# process message (update local state) and maybe buffer for snapshot
if self.current_snapshot_id is not None and channel_from in self.channel_buffers:
self.channel_buffers[channel_from].append(msg)
# actual application logic:
self.local_state = msg # simplistic
def is_done(self):
# done if no incoming channels left to record
return len(self.channel_buffers) == 0
# Example usage
p1 = Process('P1', 0)
p2 = Process('P2', 0)
p1.outgoing_markers['P2'] = p2
p2.outgoing_markers['P1'] = p1
p1.incoming_channels['P2'] = queue.Queue()
p2.incoming_channels['P1'] = queue.Queue()
snap_id = uuid.uuid4()
# Simulate some messages before snapshot
p1.send_message(p2, 10) # p1 sends msg with value 10
p2.send_message(p1, 20)
# Start snapshot from p1
p1.take_snapshot(snap_id)
# Deliver messages (simulate running)
import time
for _ in range(4):
p1.receive()
p2.receive()
time.sleep(0.1)
print("P1 snapshot state:", p1.snapshot_state)
print("P2 snapshot state:", p2.snapshot_state)
print("P1 recorded channels:", p1.recorded_channels)
print("P2 recorded channels:", p2.recorded_channels)
Note: This simulation is highly simplified and does not handle concurrent snapshots or multiple markers correctly across all channels. For a deeper simulation, one would need to handle the fact that markers are sent on all outgoing channels and that a process can receive markers from multiple channels in different orders.
8. The Future of Distributed Snapshots
The Chandy-Lamport algorithm remains fundamental, but modern distributed systems face new challenges: scale (millions of nodes), strict latency requirements, and ephemeral states in serverless architectures. Research continues on:
- Snapshot for stateful serverless: Functions that are stateless but backed by databases still need coordination for exactly-once effects. New algorithms treat invocations as distributed processes.
- Optimistic snapshots: Taking snapshots without pausing application processing, using copy-on-write at the OS level (e.g., CRIU) combined with distributed coordination.
- Verifiable snapshots: Using blockchain-like techniques to prove the consistency of a snapshot across trust boundaries.
- Continuous consistency: Systems like CRDTs might reduce the need for snapshots, but for complex state machines, snapshots remain essential.
Ultimately, the paradox of the photograph that cannot be taken is resolved not by brute force but by proof. The Chandy-Lamport algorithm gives us not just a snapshot, but a guarantee that the snapshot is real in a logical sense. That guarantee has enabled decades of robust distributed computing, from the earliest ATM networks to the latest stream processing engines.
The next time you rely on a checkpoint in Flink or a snapshot in a distributed debugger, remember: you are not freezing time. You are capturing a consistent ghost of time – and the proof says you can trust it.
Appendix: Formal Definition of Consistent Global State
For completeness, we give a formal definition.
Let the system be a set of processes P and channels C. A global state is a pair (Σ, M) where Σ maps each process to a local state, and M maps each channel to a sequence of messages in transit.
A cut is a set of events C such that for each process, the events of that process in C form a prefix of its local history. A cut is consistent if for every pair of events a,b such that a happens-before b and b ∈ C, we have a ∈ C.
The global state defined by a cut is the state after all events in the cut have occurred and before any events not in the cut have occurred. The channel state for channel p→q is the sequence of messages that were sent in events of p that are in the cut, but whose receive events in q are not in the cut.
Theorem (Chandy-Lamport): The global state recorded by the marker algorithm is exactly the state defined by some consistent cut obtained from the actual execution.
This is the fundamental property that makes the snapshot useful.
Acknowledgments: This post draws heavily on the original 1985 paper by Chandy and Lamport, and on the excellent descriptions in Nancy Lynch’s “Distributed Algorithms” and the many online resources on Flink’s checkpointing mechanism.
If you enjoyed this deep dive, consider sharing it with a colleague who wonders why distributed snapshots are not trivial. And if you have thoughts or corrections, reach out!
— End of expanded content —
(Note: The above expansion is approximately 6,500 words. To reach 10,000 words, I would further elaborate on each section, include additional diagrams (described in text), add more examples (e.g., a three-process example with concurrent messages), include a full proof of reachability in the appendix, discuss practical issues like flow control during snapshot, and provide more code snippets with detailed comments. Given the length constraint of this response, I have provided a substantial expansion that covers the core ideas with depth. The user can request further sections if needed.)