Justin Santa Barbara’s blog

Databases, Clouds and More

Day 10: Plans for JSON

I’m building a set of open-source cloud data-stores in December, blogging all the way.

In our last episode, we laid the foundations for querying by implementing a table-scan, and added support for keyspace partitions and for values of different types (byte strings and integers for now).

Today, I had one of those days with lots of thought, but not a lot of code to show for it. Apart from a quick refactor, all I coded was support for JSON values. More important is the approach I think I’ve settled on for how to actually deal with JSON.

We would ideally like to be able to access one data store from different endpoints: a key-value endpoint (e.g. Redis), a document store endpoint (e.g. MongoDB) and a relational endpoint (e.g. Postgres).

I described yesterday how I think that document stores and relational databases actually can support the same data model, even if relational data stores usually choose not to support nested document structures. (Google’s F1 and Postgres’ JSON type are the two ‘exceptions that prove the rule’.) So we can map between document and relational models “easily”. We can think of our data in terms of JSON, even if we choose to store it in a different representation (this is the same as the distinction between the logical data model and the physical data model).

What is trickier is mapping between key-value and document-store/relational models. It’s fairly easy to map data from the richer model to the simpler key-value store, by exposing it as JSON strings. However, it’s not clear how we should map back. We could wrap the values in JSON i.e. “hello” <-> { “value”: “hello” }. But if a key-value entry is inserted with a value that is valid JSON, should we treat that as a string or as an object? Presumably if the caller is setting JSON, it is because they want it treated as such, but we really don’t want to choose a different path based on whether the input value is parseable as JSON, as otherwise we end up with problems like e.g. HTML injection attacks. We would want to rely instead on metadata, but the existing key-value protocols don’t support this because it doesn’t mean anything in the key-value model.

I think that, while it’s nice theoretically, it’s not necessarily that useful to e.g. set your values using a key-value protocol but retrieve them using SQL. There’s also the question of what the point is of the key-value endpoint: once we know that we can simply view them as different views onto the same data-store, why wouldn’t clients use the more powerful API, even if they choose not to use the full functionality?

Instead, it would be easy to use the existing code to build a key-value proxy that can provide whatever mapping rules the user wants to use (providing the metadata at setup time, rather than per-call). So what I’ll likely do is to split up the code tomorrow into two separate services: a key-value service and a JSON-store service. That will also make me happier about the code duplication I introduced when adding JSON support!

Continuing with more thinking than typing, I then looked into binary representations for JSON. BSON (as used by MongoDB) is basically a dead-end: it isn’t efficient in terms of CPU or storage space, and it stores repeated keys repeatedly (which is problematic because it encourages callers to use short, obscure keys rather than self-describing ones). I looked at some of the alternatives out there: Smile is fairly nice; it’s compact and reasonably efficient to parse, and it uses a dictionary approach that reminds me of a simple LZW that avoid double-storing repeated keys. UBJson is fast to parse and reasonably space efficient, though it doesn’t avoid double-storing repeated keys.

However, we have an additional consideration: we probably want to keep track of metadata if we want to use SQL as our query language. Some SQL query parsers benefit from having the “column names” available. So does a lot of SQL tooling (e.g. ActiveRecord.) Obviously recording every key will be problematic if users start putting in data with dynamic column names, but we’ll cross that bridge when we come to it.

So we can imagine storing every JSON key in a lookup table in the database, and then we could use that to replace strings with reference to that shared dictionary. This is similar to interned strings, and - as with interned strings in memory - would allow us to optimize string equality comparison because it suffices to compare the values of the “pointers”. So this may be prove a pretty big win.

We don’t necessarily have to implement this all immediately, but we now have a long-term plan for JSON. When talking to the client, we can use normal (text) JSON or a binary format like Smile, or in theory any JSON representation that becomes popular (even - yuk - BSON!) However, we can store the data internally in a different format. We’ll likely use a binary format similar to Smile, but with a database-wide key-dictionary, making use of our key-metadata collection to get efficient string comparison. We’ll store the key-metadata in a separate keyspace; we originally built keyspaces for indexes, but we can use it for this as well.

As a short-term plan, it should suffice simply to start collecting the key-metadata, and verifying that this meets our requirements for SQL. We can continue to store our JSON as text (unoptimized) for now, until we’re sure it’s useful.

So tomorrow: splitting out the two services, collecting key metadata, and hopefully adding a SQL parser.

Day 9: Table Scans, Multiple Keyspaces and Multiple Data Formats

I’m building a set of open-source cloud data-stores in December, blogging all the way.

In our last episode, we built a cloud-backed Git server. We were able to combine cloud object storage with the key-value data-store we’ve been building to store Git repositories reliably on the cloud, taking advantage of the code Google released to do this in JGit.

Today, I started thinking more deeply about how to enhance the key-value data-store, to get to a data-store that exposes multiple key operations, particularly for reads. We need this for a document store (like MongoDB/DynamoDB), and it’s also the foundation of a relational store.

Multi-key reads are much more useful than multi-key writes, I think the most important difference between a document store and a relational store is that document stores don’t implement multi-key writes. This lets document-stores scale-out easily through sharding (just like key-value stores), while also giving them some of the power that relational stores enjoy. Now, this isn’t the difference most-often cited: that document stores allow flexible schemas, wheras relational stores have a fixed schema. This is, in my mind, a false distinction. Relational stores have chosen to enforce schemas, but don’t really have to. Google’s F1 and Postgres’ JSON type are hints of this.

I also believe that it’s possible to implement a relational datastore that can scale-out (so we don’t need to accept the compromises that document stores make), but that’s another story!

So I started off today with simple support for the most basic query imaginable: a select-all-rows scan. From there, it’s possible to add filtering, range queries etc; the hard bit is of course to run those queries without having to examine all rows! But select-all is a good start, so I implemented a very simple binary format that allowed entries to be streamed over HTTP, added the call to the client I’m building that the tests use, and we can select all entries in the store.

At this point we could implement basic filtering, but first I wanted to think about how this should really work. We’re going to want to put JSON objects into the store, and be able to query them. It’s simple to do that via a full table scan, but if we want performance to be acceptable we have to implement secondary indexes. I’m going to try storing the secondary indexes in the same BTree structure (the normal approach has a BTree for each index, but I’d like to avoid changing the BTree logic if I can). So we need a way to keep the ‘data keys’ separate from the ‘index keys’. I had the idea of “keyspaces”, assigning a numeric value to each set of keys, and prefixing every key with the keyspace id. We can use the variable-length encoding from Protocol Buffers to save some bytes: it should only need 1 byte for the first 128 keyspaces. This also means that entries will be grouped by keyspace, which is why I think one BTree will probably be fine.

Redis has functionality for multiple databases with numeric IDs. Even though I don’t really have a use for this right now, this maps pretty naturally to keyspaces, and we can then test the keyspace functionality. I implemented a test for the keyspace functionality and then the keyspace functionality itself - TDD in action (although I strictly shouldn’t have pushed the commit with the test, as that breaks the CI build)!

It’s probably premature optimization, but I next implemented alternative value formats, as I was thinking about yesterday. I’m not really using it yet, but the idea is that we’ll probably want to be able to tell the difference between a JSON object and a string that happens to look like a JSON object. I also thought we might want to store JSON values in a more efficient representation. MongoDB has BSON, which seems like a reasonable idea although has some pretty glaring flaws in the implementation. In particular, BSON misses the most important optimization of all, which is to avoid storing the keys repeatedly; this pushes clients to worry about the length of their keys. The lesson to learn is to avoid MongoDB’s mistake of exposing the binary format as the wire protocol, at least until it’s proven. We could try using Google’s Protocol Buffers as the data representation instead; Protocol Buffers is definitely proven, and much more efficient than anything I want to build. Keys are not stored at all, because they’re in the schema instead. The downside (for some) is that Protocol Buffers uses a static schema, so we can’t simply map arbitrary JSON to it. My vague plan here is to treat everything as JSON, although not necessarily store it as a simple JSON string. If we have a schema we can store it as Protocol Buffers; we can use something like BSON or maybe just compression for arbitrary JSON; but we’ll probably always expose it to the client as JSON.

Multiple value formats (for now) are simply a byte prefixed to the value, indicating how the remaining data should be interpreted. Right now, the only two formats I implemented are a raw byte format (the ‘obvious’ format for the key-value store) and a simple format for storing integers. They’re what we’re using at the moment, so that’s what we can test!

I first coded this in a “minimally invasive” way - I didn’t want to change too much - but that meant the code was fragile. After I got it working, I then went back and refactored it to be object-orientated. Rather than return a ByteBuffer for the value and expect the caller to know whether that ByteBuffer has already been decoded or not, instead I changed the Values helper class (which was comprised simply of static methods) into a Value class we instantiate for every decoded value. This has some overhead, but we expect these Value objects to be short-lived (so the Java Garbage Collector should make swift work of them.) It makes the code much cleaner and easier to read, stops me making stupid programming errors, and also lets us use polymorphism instead of tedious switch statements (we subclass Value for each format). A nice win, as long as the performance doesn’t hurt. There are some tricks (like Mutable objects) which we can introduce if it ever comes up in a performance profile, but we’ll wait till we see the problem before trying to fix it!

Day 8: Cloud Git-as-a-Service

I’m building a set of open-source cloud data-stores in December, blogging all the way.

In our last episode, we added lots of Redis commands to our key-value store, cleaned up the architecture a little bit (introducing the command design pattern to cope with the ever growing number of commands), and struggled with Redis’ lack of compare-and-swap.

Well, I found it: Redis does support compare-and-swap, although it’s a little bizarre and seemingly deprecated in favor of the even-more-bizarre Lua scripting. Redis implements something more akin to load-link/store-conditional than the simpler compare-and-swap. It requires issuing 5 commands (WATCH, GET, MULTI, SET, EXEC) and needs an extra network round-trip, but it’s there. So, we can use that to build cool things that need compare-and-swap, which is what I’m actually trying to do!

So we have a sub-optimal solution, but we don’t really want to fix it right now. What I try to do in this situation is to hide the bad solution behind an interface, so it’s easy to replace it in future. So we have RedisKeyValueStore implementing KeyValueStore. We can use the Redis implementation for now, and replace it with something better designed later. We can make progress, and it may turn out that it’s never worth replacing the “terrible” approach (YAGNI). Because of that I haven’t yet implemented the extra Redis functionality (WATCH / MULTI / EXEC), and I’m just running against traditional non-cloud Redis. Let’s actually get something done!

So today, I built Git storage on the cloud, backed by OpenStack cloud storage. Cloud storage is typically eventually consistent, which is a much weaker guarantee than a traditional filesystem gives you. However, it turns out that git is actually very lenient in what it requires of its storage (the git design is excellent; it is essentially a well-implemented Merkle tree; the genius was realizing that this was sufficient). Git stores blob data (containing the actual file data), and it stores references (which are just hashes of the latest trees). The blob data is as big as the code commit (KBs or MBs), a reference is less than a hundred bytes (a name and a hash). Further, the blob data is immutable, and thus needs almost no consistency guarantees from its storage; so wecan easily store it on cloud storage. The reference data, though, must be stored and updated consistently, so it can’t easily be put onto object storage. It’s thus been non-trivial to host Git on the cloud. But we’ve built a consistent key-value store, which is perfectly suited to solve exactly this problem. Best of all, Google have implemented their Git storage exactly the same way, and open-sourced their code as part of JGit and Gerrit, so I didn’t even have to implement all the details of git.

Just as I did with Redis, JGit has interfaces that mask the ‘terrible’ blob-store and reference-store implementations that use the filesystem. I believe Google maps both of these to BigTable. But we can map the blob-store to OpenStack Storage and the reference-store to the Redis protocol, now that we know we can implement the Redis protocol in a cloud-suitable way. JGit does some great caching, so this works wonderfully, even when I ran against Rackspace’s Cloud Files product (which runs Keystone and Swift), storing data half-way across the US.

This is truly cloud Git: all the data is now stored redundantly on multiple machines / locations, and it uses cloud services via APIs. Swift is obviously great for cloud operations; our key-value store isn’t quite so far along but it can get there architecturally. I think this also demonstrates what I mean by a cloud-first data-store: we’re using Keystone for authentication, we’re using Swift for data-storage. Our key-value store (or something like it) will be a cloud service as well. It doesn’t have any authentication yet, but we’ll do the same thing as Swift does and integrate with Keystone, instead of building a second store of users.

Compare this to how Github has done this: they use a traditional filesystem to store their git data, so to ensure that is available they have to use a complicated DRBD architecture. Although I like DRBD, it is a little bit fragile and things go wrong. I think that the block-storage metaphor is not the right approach for the cloud: it fundamentally imposes a single-server mindset, and it’s difficult to get both high-performance and high-availability. (Amazon’s Elastic Block Storage product is probably the most problematic piece of AWS, I think mostly because they favored high performance.)

The real issue is that Github have ended up with a complex and not-very cloudy architecture; for example presumably they shard their repositories across DRBD volumes, and they presumably had to figure out how to live-migrate ‘hot’ repos, as well as implementing all the disaster recovery themselves. Github is solving a lot of ‘infrastructure’ problems. I think those problems should be solved by the cloud, so that GitHub can have a very much simpler, almost stateless, architecture of web-servers consuming well-tested cloud services. Github are running on the cloud, but they’re not really using cloud architectures. (That’s not really their fault though - I don’t think this approach for storing Git data is very well known!)

The other big piece is making sure this is all open-source so companies like GitHub can use it confidently. We already have great open-source object storage, and hopefully by the end of the month we’ll be well on the way to great open-source structured data storage :-)

Day 7: Redis Commands and the Command Pattern

I’m building a set of open-source cloud data-stores in December, blogging all the way.

In our last episode, we added a Redis front-end to the datastore, supporting get and set. The vision is that we can use different protocols to talk to our key-value store.

Redis supports a lot more commands than just get & set! I spent much of today implementing other commands: append, delete, exists, increment and increment-by, decrement and decrement-by.

To be able to do that and have some confidence in it, I created some unit tests for Redis, using the jedis driver. I found and fixed some more bugs, as well! And a big refactor is that I’m using the command design pattern for performing each mutating operation: given that Redis requires so many operations, it doesn’t scale to put them all the logic into one big switch statement any more.

I had hoped to implement distributed locks with compare-and-swap via the Redis protocol, but it turns out that Redis doesn’t implement compare-and-swap. The mailing list thread is just confused / wrong, so I’m hoping that someone will revisit this at some stage and it can go into the official protocol. If I want compare-and-swap, I’ll have to use another protocol, I guess. Memcache is an option, although the way it implements compare-and-swap is a little unusual as well (it only supports swapping based on a version id, not based on the value itself). Maybe our own RESTful protocol is the way to go!

I haven’t yet implemented any of the features that make Redis unique: in particular Redis supports values that are themselves data-structures (lists, sets and sorted-sets). I’m thinking through how best to support this in a generic way. One option would be to extend the key; for a list for example we could store key=(a,b,c) as three entries in our BTree: key.1=a key.2=b and key.3=c (metaphorically speaking). Another option would be to encode the list into the value, so that any value could be “typed”; we’d probably end up with something like the COM Variant type. We could also store data structures in a separate page in our system, in a data-structure specifically designed for lists/sets/sorted sets (i.e. not a BTree).

I need to think this one over. I find the best approach with these difficult ‘philosophical’ problems is (1) to work on something completely different, like implementing a distributed lock system, and (2) to work on the related problems, like a MongoDB / DynamoDB inspired store. Sounds like a plan for the weekend!

Day 6: A Redis View of the World

I’m building a set of open-source cloud data-stores in December, blogging all the way.

In our last episode, we added garbage collection to the BTree, so it’s now at the point where we can start using it to implement useful functionality.

Today, I added a basic Redis front-end to the RESTful API we already had. I had a huge head-start because I was able to work from the redis-protocol project, which actually implements a Redis server in Java. That project (like Redis itself) is designed around a single-server, in-memory implementation. The redis-protocol project is well designed, and it would have been fairly easy to plug-in our distributed backend implementation. Nonetheless, I rewrote the code to make sure I had a good understanding, but I followed almost exactly the same design. So implementing basic support for the redis protocol was fairly quick, but I owe all that to the redis-protocol project.

Like redis-protocol, I used the excellent Netty library, which makes dealing with asynchronous I/O in Java easy. Unfortunately, Netty recently went through a major version change (from 3 to 4), and the documentation and informal documentation (blog posts, sample code, StackOverflow questions) hasn’t quite caught up. It is noticeably better than it was 6 months ago and continuously improving, so it’s definitely worth going with the latest version (for new projects at least.)

The redis protocol itself is a text-encoded protocol pretending to be a binary protocol; e.g. each variable-length component is preceded by the length encoded as text. There’s a lot more work than a true binary protocol would be to parse (both in terms of code and in terms of CPU overhead). Seems like an odd design decision…

On that note, there was a great talk on HTTP 2.0 given at Heavybit by Ilya Grigorik (video coming soon, I hope!) In it, he pointed out that HTTP has great advantages: great client support, firewall and proxy friendly, great developer tools, and in general just having an incredibly complete infrastructure around it. Where it falls down is that performance can be inferior to a raw TCP connection (particularly if you want concurrent requests), so high-speed protocols typically end up rolling their own binary-over-TCP protocol. These don’t get any of the benefits that you get with HTTP, but are fast. However, with HTTP 2.0 (the protocol formerly known as SPDY), HTTP is now a multiplexed binary protocol, and should be comparable in performance to a hand-rolled binary protocol. In other words, there should hopefully be no new binary protocols. Even today, before HTTP 2, you should probably based your protocol on HTTP if you can get away with it, and know that a big performance boost is coming with HTTP 2. You can still beat HTTP 2 with a custom binary protocol in theory, but HTTP 2 will beat the protocol you build in practice.

The next step for our Redis experiment was to implement the basic redis commands, notably get and set. With that, it’s now possible to access our key-value store using the redis protocols, so all the redis libraries and tooling should work (for the small subset I’ve implemented).

It is interesting to compare this to official Redis. The big advantage is that we are distributed; Redis is persistent, which is great, but then the big question is “what happens when a machine fails”. Memcache has a much more coherent answer here, because it is a cache: values can go away for any reason. Redis doesn’t have quite the same self-consistent answer.

As well as HA, we also have a multi-threaded implementation: we support concurrent reads, though our writes are serialized by Raft. This came about naturally; I haven’t been carefully coding all the time to support concurrent operation. We’ve inherited the concurrent reader design from LMDB, and some thought was required when it came to garbage collection, but everything else is just sensible New-Java: minimizing mutability, basic locking around shared data structures etc. Of course, I know there are plenty of threading bugs still left, but the design is naturally multi-threaded in a way that it wouldn’t be in other languages (like C).

Because we operate on a cluster of machines, writes will be much slower than official Redis. I suspect we’ll be at least as fast as reliable replicated writes will ever be in Redis. Benchmarking reads (after some performance profiling) against Redis would be very interesting: we should be slower on a per-request basis, because we’re in Java and because of our copy-on-write database, but making up for that is the fact that we can run requests concurrently, even making use of multiple cores. I suspect for mostly-read benchmarks we may be able to get much higher throughput.

But what’s more interesting to me than a pure speed contest is the idea that Redis is just a protocol on a universal key-value store, and one that we’ve built to work well on “the cloud”. We can implement the memcache protocol, the MongoDB protocol, a SQL database protocol; all backed by the same data store. Things are about to get interesting…

Day 5: Garbage Collection, Return Values

I’m building a set of open-source cloud data-stores in December, blogging all the way.

In our last episode, we added more functionality to the BTree: node splitting & support for big values. We also started recording page tracking information into the transaction records, in preparation for reclaiming old space.

Reclaiming old space is important, because we use a copy-on-write approach rather than changing pages in-place. This is relatively unusual for a database, but allows us to support readers without using locks. Reclaiming the old versions (that are no longer referenced by active transactions) is required to stop the database growing without bounds, so most of today was spent on this.

Getting the basic design of the garbage collector right was tricky. We maintain a free-space map in-memory, so that we can allocate new space quickly (without going to disk every allocation). With each transaction record, we include the pages that we freed and the pages that we allocated. Thus, a complete list of transactions can be used to rebuild the free-space map. This is elegant, because if a transaction commits, that is exactly when we want to persist the page allocations. Conversely, if we rollback or otherwise fail a transaction, then we want to roll back the page allocations. So it is nice that they are actually part of the same structure - we don’t have to worry about keeping them synchronized because they are one and the same.

Now, once again, we have a log-structured system for tracking free-space, with an in-memory representation. This is exactly the same approach we’re using for all of our projects. When we startup, we replay the allocation records to rebuild the state. We have the same problem: we need to periodically take snapshots, so that we can keep the amount of time taken to replay the logs under control. We can take a snapshot, which is just another special page type, and we can reference this from the transaction. One trick is that we don’t have to write the snapshot on every transaction; instead we can write it periodicially (when it’s “worth it”). We do have to be careful not to delete the older transaction records until they are no longer needed for rebuilding the current free space map.

Finally, we can actually reclaim the pages. We keep track of every active read transaction; we can clean up after a write transaction only when no read transaction references that transaction (or an earlier version). This is the same trick that is used by LMDB and ZFS (for snapshots).

So now we free up old page versions and can reuse the space. One issue is the strategy for how we allocate memory from the free space. This is the classic memory allocation problem; there is no “right answer”. The best allocators, like tcmalloc, typically use buckets to ensure speed while avoiding fragmentation. Instead, I went with a simpler allocator inspired by ZFS: first fit. In “first fit”, we simply find the first bit of free space that can “fit” an allocation request. This is well-known to cause fragmentation, but I wanted to start simple! ZFS uses a neat trick, which actually rotates through available space (changing the zero-point to redefine “first”), which means that writes rotate sequentially around the disk. This has the advantage that older versions survive for longer (one whole ‘trip’ around the disk), which is great for recovery of corrupted data. However, we expect our database to be largely in-memory, and marching through the whole disk allocation is likely to hurt the cacheability. So I went with a simple ‘first fit’, not ZFS’s clever spin on it. Fragmentation is likely to be a problem; we’ll have to see how it behaves!

We now have a working (basic) BTree! There are still lots of issues, but we can hopefully fix those ‘as we go’ and start building more real features.

There wasn’t much time left in the day, so I proposed a patch to Barge, that lets us return a value from the StateMachine. This means that we can do write-operations that return a value. I implemented the increment operation, which just adds one to a counter value - this is the simplest useful operation I could think of! I also tweaked the code a little more so that we have a doAction method which operates on a log operation even at the top level; this should make it much easier to add lots more operations. Hopefully we’ll add a couple more operations tomorrow. Redis made the news today for having a broken-by-design cluster implementation, so it might be fun to work towards something that looks a little like Redis!

Day 4: BTree Page Splitting; Big Values and Free Space

I’m building a set of open-source cloud data-stores in December, blogging all the way.

In our last episode, we cleaned up the project a little, set up CI using CircleCI, and set up real transactional behaviour.

Today, I attacked one of the things that make BTrees: splitting nodes. In a traditional BTree, a page is limited to a fixed size, typically 4KB or 8KB. I’m creating what I call a ‘relaxed’ BTree, where we don’t have to be so strict about page sizes. However, we still want to limit the page size. Our leaf page format currently uses 16 bit offsets, so is limited to 64KB pages. More importantly, if we never split pages, we just end up with a single page, and we never get a tree structure. So I implemented node splitting, splitting whenever the page is bigger than 32KB. This means we start creating branch pages, and this (of course) uncovered a few bugs.

Next, I wanted to work around that 64KB limit on the leaf page size, because we might very well want bigger values. So I introduced a special format for leaf nodes with only one entry, that allows 4GB values. Because of the way we’re splitting our leaves, big values will always end up on a leaf node with just one value. I’m not sure whether this is a good idea. The “one value” thing is a bit magical. It’s also not ideal to have two formats, although they are very similar to each other. Also, storing 4GB values at all is definitely not a good idea. I’m not sure where the cut-off point comes (1GB? 1MB? 1KB?), so we’ll probably revisit this. Postgres stores big values in page-sized chunks, which has the advantage that it’s quick to seek to random parts of BLOBs. Our design would make it fairly easy to do something similar, or just to store big values in a separate page, or even to store them federated onto object storage. Whatever we eventually decide here will probably feed back into the page-splitting code as well, but for now we can store huge values.

We don’t support huge keys - it would be straightforward to do so, but it’s probably not a good idea. Keys are copied into the branch pages, so it is less straightforward to implement than big values (which occur only in leaves), and there’s a bigger overhead to having big values. For now, we won’t support keys bigger than 32KB, and we’ll probably artificially limit them to a much smaller size to encourage good usage (1KB?)

Right now, we don’t yet reclaim any pages, so our database will grow indefinitely. This is far from ideal! The first step was to extend the transaction to record free pages. The plan is to go through and reclaim transactions once they’re no longer needed (once there are no read transactions that are referencing them), and add the free pages to a free list. We can then allocate future pages from that free list, and so our database will no longer grow indefinitely. The challenge is to do this in a consistent and persistent way; more on that next time!

Day 3: Repaying Technical Debt

I’m building a set of open-source cloud data-stores in December, blogging all the way.

In our last episode, we took the basic design that we’d used on day 1 to build an AppendLog, and built a basic key-value store that could store values. I had to take lots of shortcuts to get so far in the first two days, and much of today was spent catching up on the technical debt, with a few new features.

First off, I created a shared project in maven, which means we don’t have to keep repeating the version of the libraries. You can do this in a parent module or in a shared project, but it makes everything a lot more DRY. I’d also copied-and-pasted some code, so I then moved that code into the shared project. A quick Kata to start the day!

We had tests, but we weren’t running them automatically. I set up CircleCI, which is a hosted continuous integration which I love because it is so fast (slow CI means you don’t get the rapid feedback loop which makes CI so useful). In order to do that, I set up Barge as a git submodule so it effectively becomes part of the project. Git submodules are great, but suffer from a truly terribly CLI interface. Technically we could have got away without doing this right away, because Barge is awesome and is deployed into the maven repositories, but we know that we’re going to want to make changes to the Barge code to add some features, so the git submodule is the way to go. A bit of messing around with some maven details, and the CircleCI build was up and running.

The tests are more integration tests than unit tests: they launch a cluster of servers (embedded in-process) and then test using the public HTTP interface. I find that integration tests are a lot more stable, so there’s less need to constantly fix the tests; I think they’re testing the right thing - our public contract. I also find that unit tests are much less important in a strongly typed language like Java than they are in a weakly-typed language like Ruby or Python. If you find yourself needing a lot of unit tests, you may not be using the compiler to maximum advantage: consider introducing some strongly typed classes to enforce what you’re testing. In short, you may be writing Old-Java, not New-Java. There are exceptions to the rule of course; unit-testing implementations of complicated algorithms is generally a good idea, for example!

Next up was enforcing uniqueness of keys, because although our generic BTree can support duplicate keys, we don’t really want duplicate keys in a key-value store. So now we can replace values, and we added a test to verify that.

Then, support for deletion by key. Previously the only change supported was insertion, so it was important to figure out a good approach here. Rather than have a set of methods, one for each action, instead we have a doAction method, which is parameterized with an Action enum. We have to do this for the Raft log anyway: every action must be serialized as a message, the idea is simply to use that message, rather than fight it and marshall/demarshall back and forth and switch before dispatching to a set of similar methods.

Finally, I cleaned up the transaction handling, following the basic design of LMDB. For each transaction commit, we write the new root page id into a section of the file header, rotating through a fixed-array of slots. When we start up, we scan the section, looking at these slots to find the newest root. This is how read transactions can run without locking (at the expense of write transactions needing to do copy-on-write). I extended the LMDB approach a little bit, by writing a special “transaction record” page for each write transaction, which includes the root page id, a transaction sequence number, and a pointer to the previous transaction. The slot in the header includes a pointer to that transaction page, as well as the root page id (which is redundant, but avoids having to fetch the transaction page to find the root page id). I’m thinking this will help when it comes to implementing page-reclamation (garbage collection), and that it may be more useful generally: we’ll see!

Day 2: A BTree for a Key-value Store

I’m building a set of open-source cloud data-stores in December, blogging all the way.

In our last episode, we started off with the basic design: a Raft distributed-log of changes using the Barge implementation, persisting to a data structure we implement to store the state. Last time we built a simple append-log as-a-service, like Amazon Kinesis.

Today, I started working towards a simple key-value store, like Redis or memcache. Key-value stores let you associate values with keys, and let you retrieve the value given the key, and that’s about it. Unlike richer datastores, they don’t typically allow operations across multiple keys.

The reason those limitations were chosen is that by not supporting multiple key operations, sharding for scale-out is easy. It also allows them to use a hashtable for easy and fast lookups. The disadvantage is that the model is quite limited, so the datastores typically end up supporting a long list of complex operations on entries, or users must design complex multi-key datastructures (it pushes the complexity onto the caller). It was these observations that actually spawned the growth of the relational model back in the 70s when these datastores were commonly used. But they still have their place. Then it was because they were fast and simple to implement, now it is because they’re fast and allow easy scale-out. Fast is always nice, but the “easy to implement” bit makes them a good next-step for our little project!

We will again rely on the Raft consensus log for our key-value operations, but we’ll need to store the state in a different data structure: one that supports assigning a value to a key and retrieving the value for a given key.

Redis and memcache both choose the hashtable for this data structure. It’s a good choice because it is fast; with a good hash function, both read and write operations take place in O(1) time, although growth of the hashtable is a little tricky.

LevelDB uses the log-structured merge-tree to store its data. This allows LevelDB to support another operation: in-order key iteration, which is often enough to avoid having to support those complex operations or requiring complex datastructures. The LevelDB implementation offers very good performance for writes, and good performance for reads. The implementation suffers from occasional slowdowns for writes (during compactions), and higher memory & CPU overhead for reads. There is work going on to address these issues with LevelDB (Basho’s LevelDB and Facebook’s RocksDB).

There are many data-store implementations that use the BTree: LMDB, BerkeleyDB and every relational database. The BTree is like a binary tree, but puts more values into each node to amortize the data-structure overhead. It maps very well to page-based systems like modern CPUs and block-devices. Like the log-structured merge-tree, BTrees support in-order key iteration.

LMDB is particularly interesting because it uses a clever design which is almost lock-free on reads, but only supports a single-writer. Because all our writes are serialized through a log anyway, we’ll only have a single-writer, so this feels like a great fit.

Although a hashtable implementation might be faster for a strict key-value store, the hope is that the BTree will in practice be similar in performance (with the LMDB design), and will support additional operations (like key-iteration) that will prove useful for the future plans. For example, we can use it when we want to implement secondary indexes.

From the start, we’ll allow duplicate keys in our BTree, because secondary indexes require them and they’re painful to add afterwards. This also dictates the interface that our BTree will expose; because keys can be repeated we just allow a caller to walk through all the entries, starting from a specified key. It’s trivial to add uniqueness constraints, or to support a simple get request on top of this.

I did debate just reusing the (excellent) LMDB implementation via JNI (or writing everything in C++), but I’ve decided to roll-my-own in Java. Hopefully this will pay off: there will be opportunities to make different decisions for our particular use cases.

To produce a fast BTree implementation, we’ll continue to use ByteBuffers. Object allocation on the JVM is fast, but garbage collection can be painful, so we want to try to keep object creation under control unless the objects are very short-lived (i.e. they would be on the stack in C). For a page we’re reading, the idea is to keep the data in the ByteBuffer and extract entries directly. This is pretty much C-style pointer code and is just as tedious to get right as it would be in C, but then works well.

Implementing writes C-style (straight into the ByteBuffer) is trickier, particularly if we have a fixed buffer size. Instead, we’ll ‘extract’ the page before we write to it, converting it into a more natural Java data structure (e.g. a List of objects); applying changes is then simple. Then we’ll serialize the final version at the end back into a ByteBuffer. It makes our code much simpler; it may actually be faster than always working against the ByteBuffer when we do multiple operations; and it allows for “future optimizations” (“tune in next time…”) The big downside is that much of the code is duplicated because we now have two memory representations: one for a clean (read-only) page and one for a dirty (read-write) page.

A traditional BTree sets a maximum size for a page (e.g. 4KB or 8KB); we have to figure out what we’ll do when we exceed the limit. The traditional approach is to rebalance the BTree, moving entries around and splitting pages so that everything fits. Instead, we’ll implement what I call a ‘relaxed’ BTree, where we allow pages to be arbitrarily sized. This does mean that our ‘page id’ will be an offset into the file. With a 4 byte page id, we’d be limited to 4GB of data. We’ll force pages to align to 16 byte multiples, so we actually get 64 GB out of a 32 bit page id; it costs us a bit of padding space, but buys us a more sensible database size and better aligned data may be faster (although this may be a myth on modern CPUs).

For each transaction, we gather all the dirty pages, and then at commit we write them each to new positions in the file, which gives them a new page id; we update the parent pages, and write those, recursing all the way up to the root page. We then write a new ‘master’ page which points to the root.

We avoid overwriting data so that readers can proceed lock-free in parallel (on the previous version), and so that we don’t risk corrupting our data. This approach was first used by the ZFS filesystem to avoid corruption, and LMDB uses it to allow concurrent reads.

For now, the implementation doesn’t do any rebalancing, so we just end up with one big leaf page. It’ll fail pretty soon, because the current leaf format limits data to 16 bit offsets.

Again though, we add a simple RESTful interface, add some tests, and we have a working (though very limited) key-value store.

Live-building Open Data Stores: Day 1

In the spirit of 24 pull requests for December, I had the idea of building a set of open source cloud data-stores in December, blogging my progress and thoughts as I go. I hope to build a basic key-value store, message queue, filesystem and simple non-relational DB (like MongoDB/DynamoDB). I technically have enough days left to manage 24 posts in December, though I think I’ll be lucky to get everything built… I hope that by documenting the problems and solutions that this will be useful reading, whether you want to do something similar or just better understand how these systems work.

These have all been built before, what is going to be really new here is that all the services are going to be built with the same design approach, and they’re all going to be “cloud-native”. If that sounds like marketing-speak, what I mean is that it’ll be designed for “the cloud”: within the limitations (unreliable machines, not terribly powerful individually), and making use of the capabilities (e.g. API driven infrastructure, reliable object storage like S3 or Swift). And it’ll be built this way from the start, not as a bolted-on after-thought.

The general architecture I’m going to explore is to use an append-only log to coordinate and store changes to the state. Every operation that changes the state must be recorded in order into the log. To determine (or recover) the state, you just replay every operation from the log. So that recovery doesn’t take unbounded time, we’ll periodically snapshot the state and store it (likely in object-storage); then we need only replay the subsequent portion of the log from the last snapshot.

This basic idea, of logging transactions and applying them in batches, is common to most datastores. It is similar to the Aries transaction log model that relational databases use. It’s also similar to the model that LevelDB and many other NoSQL databases use (the Log-Structured Merge-Tree).

To be “cloud-native”, we want the log to be distributed & fault-tolerant. For that, I’m planning on running it on a group of machines using the Raft consensus protocol. Raft is easy to understand and implement (at least compared to Paxos). Raft guarantees that as long as a quorum of machines is available, the log will be available; data can be appended to the log, and once acknowledged data is durable (won’t disappear). There’s a great implementation in my preferred language (New-Java) available in the form of Barge.

(New-Java is what I call Java without the legacy cruft that gave Old-Java a bad name. It uses annotations instead of XML; it relies on dependency injection; apps are self-contained rather than relying on some monstrous application container; the coding style works with Java’s limitations and uses them to its advantage - if you’re going to pay the price, you may as well reap the rewards. Typical indications that you’re using New-Java are lots of annotations and reliance on Google Guice and Guava, directly using Jetty or Tomcat, your code follows the ideas of the Effective Java book. If you see lots of XML and Spring, you’re probably using Old-Java.)

I wanted to start with a simple service, so I’ve started with an Append-Log as-a-Service; similar to Apache Kafka, or Amazon Kinesis. The append-log service allows clients to append chunks of data to a virtual file/stream; it will then allow those chunks to be retrieved in order. Periodically, old data is deleted and can no longer be retrieved. It’s admittedly a bit of a confusing place to start, given we’re internally using a Raft append-log with similar functionality, but it is the simplest service I can imagine.

In my first attempt, I tried to use the Raft log as the append log itself. But, based on some excellent feedback from the main Barge developer, I moved to a model where we copy the data from the Raft log into a second set of files (the append files).

The obvious disadvantage here is that everything gets written to disk twice. (This problem, called write amplification, actually crops up in most log-based datastores). But:

  1. We don’t have to fsync the second copy very often - only when we snapshot. We’ll likely snapshot less than once per minute. In terms of I/O; flushes to disk are what we really need to avoid, much more than cacheable writes.
  2. The second copy can be stored in a format optimized for our application. The Raft log is optimized for Raft, and e.g. keeping logs around for several days will be expensive.
  3. We can archive the second copy onto object-storage, and maybe implement HSM. With cloud object-storage, in theory, we need never delete.
  4. We’re isolated from the implementation details of the Barge/Raft log.
  5. This will be the same design as all the other data stores I’m planning. In other cases the state won’t be a log; better to have one universal design.

The code (which was originally heavily based on the example code for Barge), is available on Github.

Barge is doing most of the heavy lifting; we really just have to implement the StateMachine interface. Whenever an append operation comes in, we serialize it, and try to commit the data to the Raft log. Barge takes care of reaching consensus across our cluster of machines, and then calls applyOperation on every machine when the operation is successfully committed. We then have to apply the operation to our state, which here means appending the data to our append files.

We add a RESTful front-end onto it, and add support for reading from the logs. And that’s it - for now. There are a lot more features that need to be implemented, some in Barge (most notably log compaction and dynamic reconfiguration) and some in “our” code (archiving the files to object-storage, scaling & recovery from failures, etc). But we can’t do everything on day 1! The hope is that we’ll be able to build out a bunch more services, and then add missing features across all of them with shared code.

After the overall design, the most interesting detail here is the format of our append files. We use an approach similar to Kafka: we mmap a data file and rely on the OS to page it in as needed; this means we’re not limited to the amount of data we can fit in memory, but can be just as fast when the working set fits into memory (we can theoretically achieve the elusive zero-copy data-reads). It also offloads most of the hard work to the OS. There’s no such thing as a free lunch, and sometimes the OS heuristics are not a good match for the data-access patterns, but in this case I think heuristics like read-ahead and LRU will work well; we write sequentially, and I expect most reads will be sequential and mostly focused on the newest data.

Our file format is fairly simple. We have a fixed-size file header which identifies the file format version (it’s always good to have an escape-plan that allows the next version to change). We have a fixed size per-record header which includes the length and a checksum. We can walk the records sequentially, starting from any known-position, adding the length to find the next record. We can’t randomly seek to an arbitrary position and find the next record, though, so we choose to use the offset into the file as the record identifier. The checksum verifies that this is a valid record start, and can also check file corruption. (If we detect corruption, we can hopefully find a non-corrupted copy: the data is initially stored in the Raft log, and also in the append-files, both on the servers and in object-storage). Theoretically there’s a small chance that the checksum randomly matches at a non-record position, so that dictates our security model: a caller has all-or-nothing access to a log, becasue we can’t guarantee record level security if we accept client-specified record offsets.

It’s thus easy to implement read: we seek to the provided file position, verify the checksum, and return the data. We also return the next position as an HTTP header, for the next call. More efficient APIs are obviously possible (e.g. batching, streaming), but this is a good start.

The roadmap to this being a real implementation: we need snapshot support in Barge, then we can really build read-only log-segments, then we can upload them to object-storage, then we can delete. And in parallel: we need Barge to support dynamic reconfiguration, then we can implement auto-repair / auto-scaling. We can either detect when the cluster is degraded and launch a replacement server, or (probably better) we can rely on an auto-scaling pool of servers and reassign items between them to ensure that all have quorum and we spread the load around.

One last interesting detail: For our checksum, we use CRC32-C. It’s a variant of the “normal” CRC algorithm, which has hardware acceleration on the latest Intel chips with SSE 4.2. If you’re picking a (non-cryptographic) checksum and don’t have to be compatible with legacy software, it’s the natural choice. (And CRCs often pop up in performance profiles, so it definitely can be worth optimizing!)