Tuesday Nov 24, 2009

Handling Transactions in BDB-JE 4

The Replicants (data storage nodes) in Aura are composed of a search engine and a database. While we are closely tied to using our own search engine, the implementation of the database layer is entirely contained in a wrapper class called BerkeleyDataWrapper. This class does all the handling around repeating failed transactions, making sure transactions commit or abort, closing cursors, etc in case of success or failure. In my first iteration of the code, I had this handling code repeated in every method around every transaction. There were frequently small differences in how things were handled so the code got repeated a lot with minor tweaks here and there. Obviously there was a maintenance problem when I needed to change something in the error handling for all methods.

When the newest version of BDB-JE came out, adding in support for High Availability, this further complicated the set of things that could go wrong and needed to be handled with each transaction. Now we could have a whole slew of errors related to networking and other processes that the previously monolithic database was unaware of. Since the error handling code was about to get even more complicated, this seemed like a good time to refactor the way the BerkeleyDataWrapper was constructed.

In the latest version, I create command objects in each of the database methods and evaluate them all in a single method that handles all the failure scenarios. Now, this pattern isn't exactly earth shattering, but I think it works fairly well here so I thought I'd document it. After looking at the variables in my transaction execution, I constructed the following command object:

public abstract class DBCommand<R> { /\*\* \* Returns a configuration for the transaction that should be \* used when running this command. \* \* @return null by default \*/ public TransactionConfig getTransactionConfig() { return null; } /\*\* \* Runs the command within the given transaction. The transaction is \* committed by the invoker method so commit should not be called here. \* @param txn the transaction within which the code should be run \* @return the result of the command \*/ public abstract R run(Transaction txn) throws AuraException; /\*\* \* Gets a message that should be included upon failure that should \* include the command name and any extra data (item key, etc) \* \* @return the status message \*/ public abstract String getDescription(); }
The DBCommand allows you to specify: the configuration for the transaction in which you'd like to run (e.g. defining what the read/write semantics are); the actual code to run within the configured transaction; and a human readable description that could appear in log messages. When creating a concrete instance of DBCommand, you simply override run and use the transaction passed in to perform your application logic. DBCommand is parameterized, allowing you to specify an optional return value from the run method. Here's a simple example of how this is used to put a set of Attention objects (associations between users and items) into the database. Nothing is returned from this, so a type of Void is given.

/\*\* \* Puts attention into the entry store. Attentions should never be \* overwritten. \* \* @param pas the attention \*/ public void putAttention(final List<PersistentAttention> pas) throws AuraException { DBCommand<Void> cmd = new DBCommand<Void>() { @Override public Void run(Transaction txn) throws AuraException { for (PersistentAttention pa : pas) { allAttn.putNoOverwrite(txn, pa); } return null; } @Override public TransactionConfig getTransactionConfig() { // // We want putting attentions to be fast and not necessarily // durable or consistent across replicas. TransactionConfig txConf = new TransactionConfig(); Durability dur = new Durability( Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.WRITE_NO_SYNC, Durability.ReplicaAckPolicy.NONE ); txConf.setDurability(dur); return txConf; } @Override public String getDescription() { return "putAttention(" + pas.size() + " attns)"; } }; invokeCommand(cmd); }
In Aura, we may be collecting Attention objects at a good clip (e.g. "user X played song Y", or "user X starred article Z"), so we want to relax the transaction semantics as far as we can to keep the speed up while recording Attentions. If we lose some attention data due to a crash, it doesn't hurt that much, so this is a good trade-off. In order to modify the semantics, you can see that I've overridden getTransactionConfig to return a configuration with very little durability. A transaction with that config is created in the command invoker and passed into the run method. Since error handling is done externally, the run method only needs to have the basic application logic in it.

After creating the command object above, the last line of putAttention invokes the command, with the call going into the generic command invocation code.

/\*\* \* Invoke a database command, passing in a transaction to use for \* operating on the database. The configuration from the transaction \* is obtained from the command that is passed in. \* \* @param cmd the command object to invoke \* @param commit whether we should commit after running this transaction \* @param useCurrentTxn if the thread-local transaction should be used instead \* of a new transaction \* @return the result of the run method \* @throws AuraException in the event of a failure \*/ protected <R> R invokeCommand(DBCommand<R> cmd, boolean commit, boolean useCurrentTxn) throws AuraException { int numRetries = 0; int sleepTime = 0; while(numRetries < MAX_RETRIES) { Transaction txn = null; CurrentTransaction currTxn = null; try { // // If the write lock on quiesce isn't held, we can continue quiesce.readLock().lock(); // // Fetch the transaction config and create a transaction or // get the CurrentTransaction TransactionConfig tconf = cmd.getTransactionConfig(); if (useCurrentTxn) { currTxn = CurrentTransaction.getInstance(dbEnv); txn = currTxn.beginTransaction(tconf); } else { txn = dbEnv.beginTransaction(null, tconf); } // // Now run the command and commit if necessary. R result = cmd.run(txn); if (commit) { if (useCurrentTxn) { currTxn.commitTransaction(); } else { txn.commit(); } } return result; } catch (InsufficientReplicasException e) { // // In the event of a write operation that couldn't be sent // to a quorum of replicas, wait a bit and try again sleepTime = 2 \* 1000; } catch (InsufficientAcksException e) { // // We didn't get confirmation from other replicas that the // write was accepted. This likely happens when a replica // is going down (and when we are requiring acks). For us, // this is okay. } catch (ReplicaWriteException e) { // // We tried to write to this node, but this node is a replica. throw new AuraReplicantWriteException( "Cannot modify a replica: " + cmd.getDescription()); } catch (ReplicaConsistencyException e) { // // We require a higher level of consistency that is currently // available on this replica. Wait a bit and try again. sleepTime = 1000; } catch (LockConflictException e) { try { if (useCurrentTxn) { currTxn.abortTransaction(); } else { txn.abort(); } log.finest("Deadlock detected in command " + cmd.getDescription() + ": " + e.getMessage()); numRetries++; } catch (DatabaseException ex) { throw new AuraException("Txn abort failed", ex); } } catch (Throwable t) { try { if(txn != null) { if (useCurrentTxn) { currTxn.abortTransaction(); } else { txn.abort(); } } } catch (DatabaseException ex) { // // Not much that can be done at this point } throw new AuraException("Command failed: " + cmd.getDescription(), t); } finally { quiesce.readLock().unlock(); } // // Do we need to sleep before trying again? if (sleepTime > 0) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { // Nothing we can do about it. } } } throw new AuraException(String.format( "Command failed after %d retries: %s", numRetries, cmd.getDescription())); }
Scrolling through the invokeCommand method, you'll see that we use a structure fairly close to the invocation code that is given as an example in the BDB documentation. It all runs in a loop in case we need to retry due to conflicts, and it handles, in different ways, a number of the exceptions that might be thrown. Near the top of the loop is the logic that gets the transaction configuration from the command and uses it either to create a new transaction, or to start the thread's CurrentTransaction (used for BDB methods that don't otherwise take a transaction as a parameter). If all goes well, the code is run and committed and the result is returned. If not, the error handling kicks in.

One of the nice things about running the transactions this way is that it is very easy to modify the semantics around every transaction that runs. For example, I realized that there may be occasions that we'd like to quiesce the database, temporarily pausing all activity. In the previous iteration of the code, I would have needed to modify every method to check if it was okay to run, but since I had refactored everything, I could add the simple quiesce lock into the invoker. The quiesce instance variable in the code above is a ReentrantReadWriteLock, so if anybody came in and called the quiesce method, they'd request the write lock and no further database commands could run until the lock was released. I could see this being very useful for keeping track of failure rates and logging in general. Finally, if it turned out that I need to add a new kind of command that needs to further parameterize its execution environment, it would be easy to add another method to the DBCommand interface to get at its values. So all told, I think this works well, and maybe it'll be useful for somebody else too.

Wednesday Nov 18, 2009

Replicated Replicants

Oracle has just recently released a new version of Oracle Berkeley DB Java Edition including new High Availability features. These allow you to keep multiple database instances in sync (using a single master). Some time ago I was asked if we'd like to help evaluate a pre-release version of the code, and of course I said yes. We've been waiting for HA features to be available in the database to implement our own replication support so it was a perfect fit for evaluation. Some very insightful people had good things to say about it. Since we had a head start, I already have a working version of the AURA Data Store with replication.

The AURA Data Store has three tiers - the Data Store Head serves as the communication point to the outside world and also distributes queries to each of the many Partition Clusters. Partition Clusters each represent a single partition of the data, replicated across a cluster of Replicants. Until recently we only supported a single Replicant, but thanks to BDB-JE/HA we now have support for multiple replicas. If you're following along in the source code you'll want to update to the "replication" branch to see the work being done. Adding support was fairly straightforward once I got a handle on how each of the nodes in a replication group are connected to each other. We already had infrastructure that expected to work this way, so integration was smooth. When setting up the group, you specify how closely synchronized the replicas need to be, and when committing transactions you can specify just how far into the disk or network you want the commit to go before returning. So we commit to the master and in a few seconds we can expect to see changes in the replicas.

The only catch was that we maintain both a database and a search engine. We haven't put any support in the search engine for HA (although a single index can have multiple readers and writers if we were sharing it). So for the time being I have a thread that picks up any changed item keys from the master and informs the replicas that they should fetch those items from their respective databases and re-index them. What would be nice would be if we could get an event stream from the database in the replicas notifying us that a new or updated item just came in. Another option might be to actually store the search engine's data in the database and let it do the replication, but the nature of the inverted file doesn't really lend itself to this (at least, not with any hope of performing well).

Anyway, the real excitement here was that for the first time, we got to see our data store visualization show us multiple Replicants for each Partition Cluster:

This is a screenshot showing a very small ("two-way") data store. It is running with only one DataStoreHead, and the data is divided across two partitions. Each partitions has three Replicants. While the Replicants are drawn inside the Partition Clusters, it should be noted that they are actually separate processes running on separate machines. The grouping is just a logical view. I opened up the overall Item Stats display to show that we only have a small number of items. To make the screenshot more interesting, I'm running a 2,000 user load test that simulates the actual operations users would perform (basically, a weighted mix of getting item details, searching, and getting live recommendations) when using an application such as the Music Explaura.

As you can see in the image, we're distributing query load fairly evenly across all Replicants in each Partition Cluster. Replicants do most of the heavy lifting in terms of data crunching. In order to benefit from the greater amount of cache afforded us by the greater number of processes/memory, we distribute what queries we can based on the hash code of the item key, thereby asking for the same items from the same Replicants. The Partition Clusters are doing a little work in distributing those queries, and the Data Store Head is doing a little more work in that it needs to combine results from across Partition Clusters.

I plan to do some more posting about how BDB-JE/HA is integrated into the AURA Data Store, so stay tuned!


Jeff Alexander is a member of the Information Retrieval and Machine Learning group in Oracle Labs.


« July 2016