In the spirit of 24 pull requests for December, I had the idea of building a set of open source cloud data-stores in December, blogging my progress and thoughts as I go. I hope to build a basic key-value store, message queue, filesystem and simple non-relational DB (like MongoDB/DynamoDB). I technically have enough days left to manage 24 posts in December, though I think I’ll be lucky to get everything built… I hope that by documenting the problems and solutions that this will be useful reading, whether you want to do something similar or just better understand how these systems work.
These have all been built before, what is going to be really new here is that all the services are going to be built with the same design approach, and they’re all going to be “cloud-native”. If that sounds like marketing-speak, what I mean is that it’ll be designed for “the cloud”: within the limitations (unreliable machines, not terribly powerful individually), and making use of the capabilities (e.g. API driven infrastructure, reliable object storage like S3 or Swift). And it’ll be built this way from the start, not as a bolted-on after-thought.
The general architecture I’m going to explore is to use an append-only log to coordinate and store changes to the state. Every operation that changes the state must be recorded in order into the log. To determine (or recover) the state, you just replay every operation from the log. So that recovery doesn’t take unbounded time, we’ll periodically snapshot the state and store it (likely in object-storage); then we need only replay the subsequent portion of the log from the last snapshot.
This basic idea, of logging transactions and applying them in batches, is common to most datastores. It is similar to the Aries transaction log model that relational databases use. It’s also similar to the model that LevelDB and many other NoSQL databases use (the Log-Structured Merge-Tree).
To be “cloud-native”, we want the log to be distributed & fault-tolerant. For that, I’m planning on running it on a group of machines using the Raft consensus protocol. Raft is easy to understand and implement (at least compared to Paxos). Raft guarantees that as long as a quorum of machines is available, the log will be available; data can be appended to the log, and once acknowledged data is durable (won’t disappear). There’s a great implementation in my preferred language (New-Java) available in the form of Barge.
(New-Java is what I call Java without the legacy cruft that gave Old-Java a bad name. It uses annotations instead of XML; it relies on dependency injection; apps are self-contained rather than relying on some monstrous application container; the coding style works with Java’s limitations and uses them to its advantage - if you’re going to pay the price, you may as well reap the rewards. Typical indications that you’re using New-Java are lots of annotations and reliance on Google Guice and Guava, directly using Jetty or Tomcat, your code follows the ideas of the Effective Java book. If you see lots of XML and Spring, you’re probably using Old-Java.)
I wanted to start with a simple service, so I’ve started with an Append-Log as-a-Service; similar to Apache Kafka, or Amazon Kinesis. The append-log service allows clients to append chunks of data to a virtual file/stream; it will then allow those chunks to be retrieved in order. Periodically, old data is deleted and can no longer be retrieved. It’s admittedly a bit of a confusing place to start, given we’re internally using a Raft append-log with similar functionality, but it is the simplest service I can imagine.
In my first attempt, I tried to use the Raft log as the append log itself. But, based on some excellent feedback from the main Barge developer, I moved to a model where we copy the data from the Raft log into a second set of files (the append files).
The obvious disadvantage here is that everything gets written to disk twice. (This problem, called write amplification, actually crops up in most log-based datastores). But:
- We don’t have to fsync the second copy very often - only when we snapshot. We’ll likely snapshot less than once per minute. In terms of I/O; flushes to disk are what we really need to avoid, much more than cacheable writes.
- The second copy can be stored in a format optimized for our application. The Raft log is optimized for Raft, and e.g. keeping logs around for several days will be expensive.
- We can archive the second copy onto object-storage, and maybe implement HSM. With cloud object-storage, in theory, we need never delete.
- We’re isolated from the implementation details of the Barge/Raft log.
- This will be the same design as all the other data stores I’m planning. In other cases the state won’t be a log; better to have one universal design.
The code (which was originally heavily based on the example code for Barge), is available on Github.
Barge is doing most of the heavy lifting; we really just have to implement the StateMachine interface. Whenever an append operation comes in, we serialize it, and try to commit the data to the Raft log. Barge takes care of reaching consensus across our cluster of machines, and then calls applyOperation on every machine when the operation is successfully committed. We then have to apply the operation to our state, which here means appending the data to our append files.
We add a RESTful front-end onto it, and add support for reading from the logs. And that’s it - for now. There are a lot more features that need to be implemented, some in Barge (most notably log compaction and dynamic reconfiguration) and some in “our” code (archiving the files to object-storage, scaling & recovery from failures, etc). But we can’t do everything on day 1! The hope is that we’ll be able to build out a bunch more services, and then add missing features across all of them with shared code.
After the overall design, the most interesting detail here is the format of our append files. We use an approach similar to Kafka: we mmap a data file and rely on the OS to page it in as needed; this means we’re not limited to the amount of data we can fit in memory, but can be just as fast when the working set fits into memory (we can theoretically achieve the elusive zero-copy data-reads). It also offloads most of the hard work to the OS. There’s no such thing as a free lunch, and sometimes the OS heuristics are not a good match for the data-access patterns, but in this case I think heuristics like read-ahead and LRU will work well; we write sequentially, and I expect most reads will be sequential and mostly focused on the newest data.
Our file format is fairly simple. We have a fixed-size file header which identifies the file format version (it’s always good to have an escape-plan that allows the next version to change). We have a fixed size per-record header which includes the length and a checksum. We can walk the records sequentially, starting from any known-position, adding the length to find the next record. We can’t randomly seek to an arbitrary position and find the next record, though, so we choose to use the offset into the file as the record identifier. The checksum verifies that this is a valid record start, and can also check file corruption. (If we detect corruption, we can hopefully find a non-corrupted copy: the data is initially stored in the Raft log, and also in the append-files, both on the servers and in object-storage). Theoretically there’s a small chance that the checksum randomly matches at a non-record position, so that dictates our security model: a caller has all-or-nothing access to a log, becasue we can’t guarantee record level security if we accept client-specified record offsets.
It’s thus easy to implement read: we seek to the provided file position, verify the checksum, and return the data. We also return the next position as an HTTP header, for the next call. More efficient APIs are obviously possible (e.g. batching, streaming), but this is a good start.
The roadmap to this being a real implementation: we need snapshot support in Barge, then we can really build read-only log-segments, then we can upload them to object-storage, then we can delete. And in parallel: we need Barge to support dynamic reconfiguration, then we can implement auto-repair / auto-scaling. We can either detect when the cluster is degraded and launch a replacement server, or (probably better) we can rely on an auto-scaling pool of servers and reassign items between them to ensure that all have quorum and we spread the load around.
One last interesting detail: For our checksum, we use CRC32-C. It’s a variant of the “normal” CRC algorithm, which has hardware acceleration on the latest Intel chips with SSE 4.2. If you’re picking a (non-cryptographic) checksum and don’t have to be compatible with legacy software, it’s the natural choice. (And CRCs often pop up in performance profiles, so it definitely can be worth optimizing!)