Cassandra cluster migration story

This is a simple story how we migrated a live production cluster with all its data into a new cluster, all without downtime.

Old cluster:

  • 60 c4.4xlarge instances
  • Cassandra 2.1.12, 256 vnodes per host
  • Each instance had two 1500 GB GP2 EBS disks. This matches the c4.4xlarge EBS bandwidth quite well as max bandwidth for the instance is 250 MB/sec and the max EBS bandwidth is 160 MB/sec per disk.
  • Total of 39 TiB of data with replication factor 3

New cluster:

  • 72 i2.4xlarge instances, ephemeral SSDs
  • Cassandra 2.2.7, no vnodes
  • Initial import resulted in ~95 TiB of data.
  • Initial compaction estimation was 21 days. Total compaction time around one week.
  • 25 TiB of data after compaction.

Migration plan

We did a hot migration which we have done several times in the past. It’s based on the fact that all our operations are impotent in a way that we can do dual writes to both the old and the new cluster from our application. Once the dual writes have started we will know that all data from that point on will be stored on both the old and the new cluster. Then we initiate a BulkLoad operation from the old cluster to the new cluster, where we stream all SSTables into the new cluster instances.

When Cassandra does writes, it will store data with a timestamp field to the disk. When a field is updated a new record is created to a new SSTable which points to the same key, has the new value and most importantly has a newer timestamp. When Cassandra does reads it will read all entries, group them by the keys and then select only the newest entries for each key according to the timestamps.

We can leverage this behavior by BulkLoading all SSTables from the old cluster into the new cluster as any duplicated data will be handled automatically on the Cassandra read code path! In other words, it will not matter if we copy data from the old cluster to the new cluster which has already been written to the new cluster by the application doing dual writes. This fact makes migrations like this really easy and we can do these without any downtime.

So the steps are:

  1. Create new cluster and create keyspaces and table schemas into the new cluster.
  2. Modify application to write both the old and the new clusters. Keep the application reading from the old cluster.
  3. Run BulkLoader in every node in the old cluster so that it will read all SSTables from the instance in the old cluster and feed them into the new cluster.
  4. Wait that Cassandra compacts the loaded data.
  5. Do a incremental repair round so that all current data is incremental repaired once.
  6. Switch the application to do reads from the new cluster.
  7. Disable writes to the old cluster after you are sure that the new cluster works and can hold the load; you could still rollback to the previous cluster until this point.
  8. Destroy the old cluster.

Compaction phase

After the BulkLoader each instance in the new cluster will have a lot of SSTables which need to be compacted. This is especially the case if you streamed from every instance in the old cluster, so you should have three copies of every data (assuming ReplicationFactor 3) on every instance. This is normal, but this will take some time.

Initially each cluster instance had around 7000-8000 pending compactions, so the entire cluster had over half million pending compactions. Once streaming was completed the compaction started right away and in the end it took around one week. At the beginning the compaction speed felt slow: After one days worth of compaction I estimated that at that run rate it would take three weeks to compact everything. During the compaction each instance had one or two moments where only one giant compaction task was running for around one day and this seems normal. After this task was done then the Pending Compactions count dropped by several thousands and the remaining compactions went quite fast. If we look on the OpsCenter the PendingCompactions chart looked something like this in the end.

Because the bootstrap created a lot of small SSTables in L0 the Leveled Compaction switched to using STCS in L0, which resulted in a few big SSTables (from 20-50 GiB in size). This can be avoided by using the stcs_in_l0 flag in the table: ALTER TABLE … WITH compaction = {‘class’:’LeveledCompactionStrategy’, ‘stcs_in_l0’: false}; I did not use this and thus some of my compactions after repairs take longer than they might actually require.

Incremental repairs

We also switched to use Incremental Repairs (they are the new default in 2.2), so we ran couple of full repair cycles on the new cluster while it was still receiving only a partial production load. This gave us valuable information how we should tune streaming throughput and compaction threads in the new cluster to ensure values which don’t affect production. We ended up settling up with five compaction threads (out of 16 CPU cores).


Overall the migration went really well. Assuming you can make a code change which will do dual writes into two different cluster, this method is really nice if you need to migrate a cluster into a completely new one. Reasons for doing this kind of migration could be major revision number, changing deployment topology or perhaps changing machine type. This allows a nice way to rollback at any given moment, greatly reducing operational risks.

How NoSQL will meet RDBMS in the future

The NoSQL versus RDBMS war started a few years ago and as the new technologies are starting to get more mature it seems that the two different camps will be moving towards each other. Latests example can be found at where the author talks about upcoming postgresql feature where the application developer can choose the service level and consistency of each call to give hint to the database cluster what it should do in case of database node failure.

The exact same technique is widely adopted in Cassandra where each operation has a consistency level attribute where the programmer can decide if he wants full consistency among entire cluster or is it acceptable if the result might not contain the most up to date data in case of node failure (and also gain extra speed for read operations) . This is also called Eventual Consistency.

The CAP theorem says that you can only have two out of three features from a distributed application: Consistency, Availability and Partition Tolerance (hence the acronym CAP). To give example: If you choose Consistency and Availability, your application cannot handle loss of a node from your cluster. If you choose Availability and Partition Tolerance, your application might not get most up-to-date data if some of your nodes are down. The third option is to choose Consistency and Partition Tolerance, but then your entire cluster will be down if you lost just one node.

Traditional relation databases are designed around the ACID principle which loosely maps to Consistency and Partition Tolerance in the CAP theorem. This makes it hard to scale an ACID into multiple hosts, because ACID needs Consistency. Cassandra in other hand can swim around the CAP theorem just fine because it allows the programmer to choose between Availability + Partition Tolerance  and Consistency + Availability.

In the other hand as nosql technology matures they will start to get features from traditional relation databases. Things like sequences, secondary indexes, views and triggers can already be found in some nosql products and many of them can be found from roadmaps. There’s also the ever growing need to mine the datastorage to extract business data out of it. Such features can be seen with Cassandra hadoop integration and MongoDB which has internal map-reduce implementation.

Definition of NoSQL: Scavenging the wreckage of alien civilizations, misunderstanding it, and trying to build new technologies on it.

As long as nosql is used wisely it will grow and get more mature, but using it without good reasons over RDBMS is a very easy way to shoot yourself in your foot. After all, it’s much easier to just get a single powerfull machine like EC2 x-large instance and run PostgreSQL in it, and maybe throw a few asynchronous replica to boost read queries. It will work just fine as long as the master node will keep up and it’ll be easier to program.

Example how to model your data into nosql with cassandra

We have built a facebook style “messenger” into our web site which uses cassandra as storage backend. I’m describing the data schema to server as a simple example how cassandra (and nosql in general) can be used in practice.

Here’s a diagram on the two column families and what kind of data they contain. Data is modelled into two different column families: TalkMessages and TalkLastMessages. Read more for deeper explanation what the fields are.

TalkMessages contains each message between two participants. The key is a string built from the two users uids “$smaller_uid:$bigger_uid”. Each column inside this CF contains a single message. The column name is the message timestamp in microseconds since epoch stored as LongType. The column value is a JSON encoded string containing following fields: sender_uid, target_uid, msg.

This results in following structure inside the column family.

"2249:9111" => [
  12345678 : { sender_uid : 2249, target_uid : 9111, msg : "Hello, how are you?" },
  12345679 : { sender_uid : 9111, target_uid : 2249, msg : "I'm fine, thanks" }

TalkLastMessages is used to quickly fetch users talk partners, the last message which was sent between the peers and other similar data. This allows us to quickly fetch all needed data which is needed to display a “main view” for all online friends with just one query to cassandra. This column family uses the user uid as its key. Each column
represents a talk partner whom the user has been talking to and it uses the talk partner uid as the column name. Column value is a json packed structure which contains following fields:

  • last message timestamp: microseconds since epoch when a message was last sent between these two users.
  • unread timestamp : microseconds since epoch when the first unread message was sent between these two users.
  • unread : counter how many unread messages there are.
  • last message : last message between these two users.

This results in following structure inside the column family for these
two example users: 2249 and 9111.

"2249" => [
  9111 : { last_message_timestamp : 12345679, unread_timestamp : 12345679, unread : 1, last_message: "I'm fine, thanks" }

"9111" => [
  2249 : { last_message_timestamp :  12345679, unread_timestamp : 12345679, unread : 0, last_message: "I'm fine, thanks" }

Displaying chat (this happends on every page load, needs to be fast)

  1. Fetch all columns from TalkLastMessages for the user

Display messages history between two participants:

  1. Fetch last n columns from TalkMessages for the relevant “$smaller_uid:$bigger_uid” row.

Mark all sent messages from another participant as read (when you read the messages)

  1. Get column $sender_uid from row $reader_uid from TalkLastMessages
  2. Update the JSON payload and insert the column back

Sending message involves the following operations:

  1. Insert new column to TalkMessages
  2. Fetch relevant column from TalkLastMessages from $target_uid row with $sender_uid column
  3. Update the column json payload and insert it back to TalkLastMessages
  4. Fetch relevant column from TalkLastMessages from $sender_uid row with $target_uid column
  5. Update the column json payload and insert it back to TalkLastMessages

There are also other operations and the actual payload is a bit more complex.

I’m happy to answer questions if somebody is interested :)

Cassandra operation success ratio survey results

It’s known that in Cassandra the compaction hurts the node performance so that the node might miss some requests. That’s why it’s important to handle these situations and the client needs to retry the operation into another working host. We have been storing performance data from each cassandra request which we do into our five node cassandra production cluster.

We log the retry count and request type into our data warehouse solution and I’ve now extracted the data from a 10 day period and calculated how many retry requests is needed so that the results can be obtained. The following chart tells how many time an operation had to be retried until it was successfully completed. The percents tells the probability like that “the request will be successful with the
first try in 99.933 % times.”

Total amount of operations: 94 682 251 within 10 days.

Retry times operations percentage from total operations
0 94618468 99.93263 %
1 56688 0.05987 %
2 5018 0.00529 %
3 1359 0.00144 %
4 111 0.00012 %
5 25 0.00003 %

There were also few operations which needed more than five retries, so preparing to try up to ten times is not a bad idea.

The cluster users 0.6.5 with RF=3. Dynamic Snitching was not enabled.  Each operation is executed until it succeeds or until 10 retries using this php wrapper