Cassandra: what the paper actually says
Reading Lakshman and Malik (Facebook, LADIS 2009) after a user-data deletion pipeline brought a read cluster to its knees.
The deletes were for GDPR. A compliance job ran nightly, querying a list of user IDs to purge and issuing DELETE statements against a Cassandra 3.x cluster that stored user activity events. The job had been running for six months without incident. Then it started timing out — not on the deletes, on the reads. Unrelated reads. Reads for user IDs that hadn't been touched by the deletion job at all.
The correlation took two weeks to find. Regions where the deletion job had run the longest had the worst read latency. The deletion job was making reads slower for every user, not just the ones being deleted. Disk usage had dropped by 30% (the deletes were working), but read p99 had climbed from 8ms to 340ms.
The root cause: Cassandra cannot delete data. It can only append a deletion marker. Every delete creates a tombstone, a special write that is stored in SSTables alongside live data. When Cassandra reads a row, it must scan tombstones to determine which cells are live and which have been deleted. After six months of nightly deletes, our SSTables contained a ratio of roughly 12 tombstones per live cell in the activity event table. Every read was spending most of its time sorting through deleted markers to find the two or three events that still existed.
"Cassandra: A Decentralized Structured Storage System," Avinash Lakshman and Prashant Malik, Facebook, LADIS 2009 (published in ACM SIGOPS Operating Systems Review, January 2010), is the paper behind the storage system used by Netflix, Apple, and most companies running large-scale, write-heavy workloads. It is a short paper — eight pages — and it describes a system built around a specific bet: sequential disk writes always beat random disk writes, even if it means reads become more expensive. Understanding why deletes create tombstones requires understanding this bet, and understanding when it backfires requires understanding when that tradeoff is wrong for your workload.
What Cassandra is a synthesis of
The paper is explicit about its lineage. Cassandra combines two prior systems:
Amazon Dynamo (DeCandia et al., SOSP 2007) provides the distributed systems architecture: consistent hashing for data placement, gossip protocol for cluster membership, tunable consistency for read and write operations, and hinted handoff for handling node failures. If you've read the Dynamo paper, Cassandra's distribution model is familiar.
Google Bigtable (Chang et al., OSDI 2006) provides the data model and storage engine: a sparse, distributed, sorted map with row keys, column families, columns, and timestamps. The local storage — commit log, memtable, SSTable hierarchy — comes directly from Bigtable.
The combination is Cassandra's specific contribution: Dynamo's peer-to-peer distribution with no single points of failure, Bigtable's rich data model and LSM-tree storage. The 2009 version of Cassandra was built to handle the Facebook Inbox Search problem: storing the contents of all Facebook inbox messages, indexed by sender and recipient, at the scale of hundreds of millions of users with predominantly write traffic and relatively infrequent reads.
The data model
Cassandra's data model is a nested map:
Keyspace → ColumnFamily → RowKey → ColumnName → (Value, Timestamp)
A keyspace is the top-level namespace (roughly analogous to a database). A column family is a table. Within a column family, data is organized by a row key. Each row contains a set of columns, each identified by a name, holding a value and a timestamp. The critical property: different rows in the same column family can have completely different sets of columns. The paper calls this a sparse structure — a column that doesn't exist for a given row doesn't consume any storage.
The timestamp on each column is the logical clock. It determines which version of a value wins when two writes to the same column conflict. Higher timestamp wins. The application is responsible for generating timestamps — Cassandra doesn't assign them. The paper notes this was acceptable for Facebook's use case because inbox events naturally carry user-visible timestamps and out-of-order delivery was acceptable within a bounded window.
In modern CQL (Cassandra Query Language, introduced post-paper), the data model maps onto tables with primary keys split into a partition key (determines which node owns the row) and optional clustering columns (determines ordering within the partition). The paper's column family is the underlying structure; CQL provides a relational-like interface over it.
The write path: why Cassandra is fast to write
Every write in Cassandra follows a path designed to eliminate random disk I/O:
Step 1: Commit log. Before doing anything else, the write is appended to the commit log — a sequential, append-only file on disk. This provides durability. If the node crashes before the write reaches stable storage elsewhere, the commit log allows recovery. Because the commit log is append-only, this write is sequential disk I/O.
Step 2: Memtable. The write is applied to an in-memory sorted structure called the memtable. The memtable is a sorted map of the column family's data, ordered by row key. Writes are fast here — a log(N) insert into a balanced tree structure in memory.
Step 3: SSTable flush. When the memtable exceeds a configurable size threshold (or the commit log exceeds its size limit), the memtable is flushed to disk as a new SSTable (Sorted String Table). An SSTable is an immutable, sorted, sequential file. Once written, it is never modified. Multiple SSTables accumulate on disk as writes continue.
The sequential nature of SSTable writes is the point. A write to Cassandra touches the commit log (sequential append) and the memtable (in-memory). The only disk I/O is sequential. There is no random write to update an existing record. If you need to update a value, Cassandra writes a new version with a higher timestamp; the old version remains on disk until compaction removes it.
Step 4: Compaction. Periodically, the compaction process merges multiple SSTables into a single larger SSTable. During compaction, rows from multiple SSTables are merged using timestamps: for each column, the version with the highest timestamp wins. Old SSTables are deleted after compaction. This is the mechanism that eventually reclaims disk space and merges updates.
The throughput advantage: on spinning disks (the hardware context of the 2009 paper), sequential writes are 100–300× faster than random writes. An HDD can do ~120 MB/s sequential but only ~0.5 MB/s random (limited by seek time, ~8ms per random access). By making every write sequential, Cassandra can sustain write throughput that would destroy a system requiring random updates.
On NVMe SSDs (the current deployment context), the sequential vs. random write gap is smaller — maybe 3–5× rather than 200× — but the architectural consequence of the LSM tree remains: it moves complexity from writes into reads and compaction.
The read path: why reads are harder
Reading a value from Cassandra requires assembling the current state of a row from potentially many SSTables, each of which may contain a version of one or more columns:
Step 1: Bloom filter. Each SSTable has an associated bloom filter — a probabilistic data structure that answers "does this SSTable possibly contain row key K?" A bloom filter can return false positives (say yes when the SSTable doesn't have the row) but never false negatives (never say no when the SSTable does have the row). The bloom filter allows Cassandra to skip SSTables that definitely don't contain the requested row key, reducing the number of SSTables that need to be opened.
Step 2: Key cache / Partition summary. If the bloom filter says this SSTable may contain the key, Cassandra looks up the key's byte offset within the SSTable using the key index. A hot key's offset may be in an in-memory key cache. Otherwise, Cassandra uses a partition summary (a sampled subset of the index) to binary-search for the key's approximate location, then reads the actual index to find the exact offset.
Step 3: SSTable read. Cassandra seeks to the byte offset in the SSTable and reads the row's columns. Because SSTables are sorted by row key, once the offset is found, the read is sequential within the row.
Step 4: Merging across SSTables. All SSTables that may contain the requested row key are read, and their column versions are merged: for each column name, the version with the highest timestamp wins. If there are 8 SSTables that might contain the row (typical for a write-heavy table before compaction), this requires opening up to 8 files and merging the results.
Step 5: Read repair. For reads with consistency level > ONE, Cassandra reads from multiple replicas and compares the results. If replicas disagree (because one missed a write), Cassandra performs a read repair — sending the up-to-date values to the lagging replica in the background. This makes eventually consistent reads self-healing over time.
The read performance is inversely proportional to the number of SSTables. A freshly compacted table with one SSTable per partition is fast to read. A write-heavy table that hasn't been compacted recently may have 20+ SSTables, each requiring a bloom filter check, an index lookup, and a file read. The compaction strategy is therefore not a tuning parameter; it determines the read latency profile of your cluster.
Why deletes are tombstones
Cassandra's SSTables are immutable. Once written, a cell in an SSTable cannot be removed. Compaction can remove it when merging, but the SSTable file itself is never modified.
This creates a fundamental problem for deletes. If you write DELETE column FROM table WHERE key = 'abc', there is no entry to remove from any existing SSTable. The only option is to write a new record that says "column has been deleted at timestamp T." This is a tombstone.
A tombstone is a special Cassandra value: it has a column name, no value, and a deletion timestamp. When Cassandra reads a row and encounters a tombstone, it discards any column versions with a lower timestamp as deleted. The tombstone is the authoritative record that the column no longer exists.
Tombstones are physically removed during compaction, but only after they have outlived the gc_grace_seconds threshold (default: 10 days). The gc_grace period exists for a specific reason: if a node was down when the delete was issued, it may have missed the tombstone. When the node comes back up, it might resurrect the deleted data from its stale replica. The gc_grace period ensures that if a node was down and missed the delete, it will receive the tombstone via anti-entropy repair before the tombstone is eligible for deletion. If you delete a tombstone before gc_grace expires and a previously unreachable node comes back with the pre-delete data, the data is resurrected — silently, without error.
The implication for delete-heavy workloads:
tombstones_per_read = SUM(tombstones_in_partition) / live_cells_in_partition
If you've been deleting rows from a time-series table for six months and the table has compaction behind schedule, reads may traverse thousands of tombstones before finding any live data. Cassandra has a configurable tombstone_warn_threshold (default: 1000 tombstones per read) and tombstone_failure_threshold (default: 100,000) that abort reads when tombstone counts exceed them. These thresholds exist because reading too many tombstones blocks the read coordinator thread, delays other reads on the same thread, and eventually causes timeouts. Our deletion pipeline had created partitions with 40,000–80,000 tombstones each.
The distribution architecture: consistent hashing with virtual nodes
Cassandra uses consistent hashing to assign responsibility for row keys to nodes. Each node is assigned a position on a ring by hashing its identifier. A row key is assigned to the node whose ring position is the first clockwise from the key's hash. When a node joins or leaves, only the rows adjacent to that node on the ring need to move.
The paper introduces virtual nodes (vnodes) to solve a problem with basic consistent hashing: heterogeneous hardware. In basic consistent hashing, each physical node occupies one ring position and is responsible for roughly 1/N of the data. If one node is twice as powerful as another, you'd want it to handle twice as much data — but you can't do that with a single ring position per node.
With vnodes, each physical node is assigned multiple positions on the ring (the paper uses O(N) virtual nodes per physical node). The data owned by a physical node is the union of the ranges covered by all its ring positions. A more powerful node gets more virtual node positions and therefore more data. More importantly: when a node fails, its ranges are spread across many nodes rather than one successor, distributing the recovery load evenly.
This is a direct improvement over Dynamo's approach. Dynamo's consistent hashing also struggled with load imbalance. Cassandra's vnodes solve the same problem at the cost of additional complexity in the ring management bookkeeping.
Gossip protocol for cluster membership
No Cassandra node has a central directory of which nodes exist and which are up. Cluster state propagates via gossip — a protocol where each node, on a configurable interval, contacts a small number of peers and exchanges its current view of cluster membership.
The gossip message contains each node's heartbeat state (a logical clock that increments with every state change) and application state (ring position, schema version, data center). When two nodes gossip, they exchange the most recent information each has about every node in the cluster, and each updates its local view with any newer information. Because gossip is peer-to-peer and eventually consistent, a new node joining the cluster propagates its existence to all nodes in O(log N) gossip rounds.
Cassandra uses phi accrual failure detection to determine when a node should be considered down, rather than a simple heartbeat timeout. The phi accrual detector maintains a sliding window of inter-arrival times for gossip messages from each peer. When a heartbeat is expected but hasn't arrived, the detector computes a phi value:
phi = -log10(probability_of_missed_heartbeat_by_chance)
A phi value of 1 means a 10% chance the node has failed; phi of 8 means 0.000001% chance. The detector outputs a continuous suspicion score rather than a binary up/down determination. The Cassandra operator configures a phi threshold above which the node is marked down. The advantage over fixed-timeout failure detection: the threshold is calibrated to the node's actual gossip timing variance, so a node on a congested network link doesn't get marked down during routine congestion the way it would with a fixed 30-second timeout.
Tunable consistency and hinted handoff
Cassandra's consistency is configurable per operation. The options form a spectrum:
- ANY: A write succeeds if even a single node accepts it — including via hinted handoff. Provides maximum availability, weakest durability.
- ONE: Success requires one replica to acknowledge. Reads may return stale data.
- QUORUM: Success requires
floor(replication_factor / 2) + 1replicas to acknowledge. With RF=3, that's 2 replicas. Read + write quorum together guarantee you'll read the most recent write (the sets overlap). - ALL: All replicas must acknowledge. Unavailable if any replica is down.
- LOCAL_QUORUM / LOCAL_ONE: Multi-datacenter variants that apply consistency requirements only within the local datacenter.
The quorum math is important for production use. With RF=3 and QUORUM reads and writes (2 replicas each), the consistency guarantee holds: any read will see any prior write because the two quorums must overlap on at least one replica. But this guarantee is specific to the combination of read and write consistency levels. If you write at ONE and read at QUORUM, you may not see recent writes — a write that hit only one replica might not be in the 2-replica quorum your read contacts.
Hinted handoff handles temporary node failures without requiring consistency level degradation. When a write targets a replica that is currently down, the coordinator writes a hint — a copy of the mutation — to its own local disk, tagged with the destination node's identifier. When the destination node comes back up, the coordinator delivers the accumulated hints, bringing the replica up to date.
The production failure mode with hints: if a node is down for longer than the max_hint_window_in_ms (default: 3 hours), hints stop accumulating. The node, when it comes back, may be missing writes that occurred during its outage after hints stopped. A manual repair is required to bring it back to full consistency. The window exists because hints have unbounded disk cost: a heavily trafficked node going down for 24 hours would generate enough hints to fill the coordinator's disk.
The other failure mode: hint storm. When a node comes back online after a long outage, every coordinator that accumulated hints begins delivering them simultaneously. The returning node receives its highest write throughput exactly when it has the coldest page cache and is being asked to process a backlog. This is frequently the cause of cascading restarts: a node returns from downtime, gets overwhelmed by hint delivery, falls behind on reads, fails health checks, and gets removed from the ring again — at which point new hints begin accumulating for the cycle to repeat.
Anti-entropy with Merkle trees
Hinted handoff handles the case where a node missed writes while temporarily down. It doesn't handle the case where a node's data gradually diverges from its replicas due to bugs, disk errors, or missed writes outside the hint window.
Cassandra repairs divergence via anti-entropy repair — the operator-triggered (or scheduled) process of comparing replica data and syncing differences. The comparison uses Merkle trees: for each replica, Cassandra builds a hash tree over the data, where each leaf is the hash of a range of partition keys and each internal node is the hash of its children. Two replicas with identical data will have identical Merkle tree roots. Two divergent replicas will have different roots, and the difference can be narrowed to specific ranges by walking down the tree to where hashes diverge.
Once divergent ranges are identified, only the data in those ranges is transferred between replicas — not the entire dataset. This makes repair bandwidth proportional to the amount of divergence, not the dataset size.
The operational requirement: repair must be run at least once per gc_grace_seconds period (default: 10 days) on every node. If a node is repaired less frequently than gc_grace, tombstones may be deleted before the node has had a chance to receive them, allowing deleted data to resurrect. In practice, most operators run repair weekly with a scheduled job. Repair is resource-intensive: building and comparing Merkle trees requires a full table scan, which competes with read traffic.
Compaction strategies and why they matter more than the paper says
The 2009 paper describes a single compaction approach: periodically merge all SSTables in a column family. Modern Cassandra exposes three strategies with different tradeoffs:
Size-Tiered Compaction (STCS, default): SSTables of similar sizes are merged into larger ones. Young, small SSTables get merged frequently; large, old SSTables merge rarely. Write amplification is low; read amplification is moderate because multiple tiers may need to be searched. Space amplification is high: during compaction, you temporarily need 2× the disk space of the tier being merged. For write-heavy workloads with uniform access patterns, STCS is appropriate.
Leveled Compaction (LCS): SSTables are organized into levels. Level 0 holds recently flushed SSTables (few, small). Compaction promotes SSTables from level 0 to level 1, ensuring no overlapping key ranges at level 1. Each subsequent level is 10× larger. Reads are faster — at most one SSTable per level needs to be checked — but write amplification is much higher: each write is eventually rewritten O(L) times as it moves through levels. For read-heavy workloads where read latency matters more than write throughput, LCS is appropriate.
Time-Window Compaction (TWCS): Designed for time-series data. SSTables are grouped by the time window their data falls into. A window is compacted internally using STCS. When a window ages out of the TTL range, its SSTable is dropped entirely — without reading it — because all data in a time window expires at the same time. For time-series workloads with TTL, TWCS is the only strategy that makes tombstone accumulation nearly irrelevant: entire SSTables are dropped rather than requiring tombstones to be merged and persisted.
The critical production lesson: choosing the wrong compaction strategy for a workload is not a performance issue — it's a correctness-adjacent issue. A STCS table with high delete rates will accumulate tombstones until reads timeout. A LCS table with high write throughput will drive CPU continuously at its compaction overhead. The compaction strategy must match the access pattern before the cluster goes into production, because changing it requires rewriting all SSTables — a significant one-time I/O event.
Production tradeoffs the benchmark tables don't mention
Partition size is a hard operational limit. Cassandra's architecture works best when partitions are small — the paper's design targets individual rows accessed by key. When a single partition accumulates a large number of rows (e.g., storing all events for a user in a single partition keyed by user ID), reads that touch that partition must deserialize all of it. Cassandra exposes a compaction_tombstone_warn_threshold and also a partition size warning at 100MB, but these are warnings — the data is not rejected. Large partitions cause hotspots: the node owning a large partition receives disproportionate read traffic for that partition, independent of the ring-based load distribution.
Secondary indexes are not what you think they are. Cassandra supports secondary indexes on column values, but they are local indexes — each node indexes only its local data. A query on a secondary index requires reading from all N nodes in the cluster (N = total replicas, not just the replication factor), since the index doesn't indicate which nodes have matching data. For tables with high cardinality secondary keys, a secondary index query is O(N) nodes touched — essentially a scatter-gather full table scan. Secondary indexes are useful for low-cardinality columns where most nodes have few matching rows. For anything resembling a general-purpose query interface, use a materialized view (pre-write the denormalized index yourself) rather than a secondary index.
Read repair has a throughput cost. Read repair (the background sync triggered during reads) is tunable via the read_repair_chance parameter. At 100%, every read at consistency level > ONE triggers a background comparison of all replicas and repairs any divergence. At scale, read repair generates significant background write traffic. Read-heavy workloads often lower this to 10–20% and rely on scheduled anti-entropy repair to handle remaining divergence. The tradeoff is increased divergence risk if repair schedules slip.
Schema changes are cluster-wide events. Adding a column or changing a compaction strategy requires the change to propagate via gossip to all nodes. During schema propagation, reads from nodes that have received the schema change and writes to nodes that haven't can produce inconsistent behavior. Schema changes in production require coordination windows and validation that the gossip has converged before running dependent workloads.
When not to use Cassandra
When your workload is delete-heavy without TTLs. If your application issues frequent deletes and cannot express deletions as TTLs, tombstones will accumulate. A GDPR compliance workflow that deletes individual user records is exactly the wrong pattern for Cassandra. Consider: a time-series table where individual records expire naturally via TTL, with TWCS compaction, handles deletion cleanly. A user-profile table where individual fields are deleted based on user request has no clean solution. Either batch deletes to minimize tombstone count, accept compaction overhead, or use a different database for entities that require individual deletion guarantees.
When you need multi-row transactions. Cassandra provides atomic writes at the single-partition level via lightweight transactions (compare-and-set using Paxos), but these are expensive (4 round-trips instead of 1) and scoped to a single partition. Across partitions, there are no transactions. If your application requires "transfer $100 from account A to account B atomically," Cassandra cannot express this as a single operation. The application must manage partial failures. For financial ledgers, inventory management, or any workload where cross-row consistency is required, a relational database or a NewSQL system is a better fit.
When you need flexible queries. Cassandra is a lookup-by-primary-key system. Queries that aren't served by the primary key or a pre-built secondary index require full table scans. If your query access patterns are not fully known at schema design time, the denormalization required by Cassandra's data model means every new access pattern requires a new table. In development environments where access patterns evolve rapidly, this is operationally painful. Cassandra's data model is correct for production workloads with stable, known access patterns, and wrong for exploratory or ad hoc query workloads.
When your cluster is small. The operational overhead of gossip, repair scheduling, compaction management, and the complexity of tuning consistency levels per operation is substantial. For a workload that can fit on a single well-replicated PostgreSQL instance, Cassandra's distribution machinery adds complexity without benefit. The paper's architecture is designed for hundreds of nodes at Facebook scale. Below a certain threshold — roughly 6–12 nodes, a replication factor of 3, and write rates that would saturate a PostgreSQL write-ahead log — the operational cost exceeds the throughput benefit. Use Cassandra when you need horizontal write scale that a single database can't provide, not as a general-purpose store.
When consistency errors are application-visible. Eventual consistency means reads may return stale data. For most activity tracking workloads (the paper's original motivation), stale reads are acceptable: showing a user their inbox from 200ms ago is indistinguishable from real-time. For workloads where users see their own writes immediately and inconsistency is visible — a counter that displays the wrong value, a shopping cart that loses an item — eventual consistency creates user-facing bugs. The LOCAL_QUORUM consistency level mitigates this within a datacenter, but it reduces write availability (a write fails if fewer than a quorum of replicas are up) and adds latency. The choice of consistency level is not a knob you set once; it's a contract about what your application can tolerate.
What the paper got right
The LSM tree / SSTable architecture that Cassandra's local storage is based on has become the dominant storage engine design for write-intensive workloads. RocksDB, LevelDB, ScyllaDB, and Apache HBase all use variants of the same structure. The decision to optimize for sequential writes and accept that reads require merging from multiple sorted files is the correct tradeoff for write-heavy workloads on rotating media, and it remains competitive on flash due to write endurance (SSDs wear unevenly under random writes; append-only workloads extend SSD lifetime).
The gossip-based cluster management without a central coordinator proved more operationally robust than ZooKeeper-based systems. Cassandra clusters don't have a single point of failure in their coordination plane. When ZooKeeper-dependent systems (early Kafka, early Hadoop) experienced ZooKeeper unavailability, entire clusters halted. Cassandra gossip degrades gracefully: individual nodes may have slightly stale membership information, but no single ZooKeeper leader failure cascades to full cluster unavailability.
The tunable consistency model — the ability to say "this write must hit 2 replicas; this read can hit 1" — has proven essential for heterogeneous production workloads. A cluster can simultaneously serve low-latency, eventually-consistent reads for analytics and strongly-consistent reads for user-visible data, differentiated at the query level. This flexibility comes at the cost of application complexity: the developer must understand what each consistency level guarantees and choose appropriately. But the alternative — all-or-nothing consistency — forces either unnecessary latency on operations that don't need it, or unavailability on operations that do.
The tombstone story is the enduring lesson. Every design choice in the paper's storage engine assumes that writes are the common case and that deletions are rare. This was true for Facebook's inbox search workload in 2009. It is not true for GDPR-era user data pipelines, for time-series data with aggressive retention policies without TTLs, or for applications that treat Cassandra as a general-purpose database and issue deletes as routinely as updates. Read the paper to understand what the system was designed for. Measure whether your workload matches those assumptions before you deploy it.