Handling Transactions in BDB-JE 4

The Replicants (data storage nodes) in Aura are composed of a search engine and a database. While we are closely tied to using our own search engine, the implementation of the database layer is entirely contained in a wrapper class called BerkeleyDataWrapper. This class does all the handling around repeating failed transactions, making sure transactions commit or abort, closing cursors, etc in case of success or failure. In my first iteration of the code, I had this handling code repeated in every method around every transaction. There were frequently small differences in how things were handled so the code got repeated a lot with minor tweaks here and there. Obviously there was a maintenance problem when I needed to change something in the error handling for all methods.

When the newest version of BDB-JE came out, adding in support for High Availability, this further complicated the set of things that could go wrong and needed to be handled with each transaction. Now we could have a whole slew of errors related to networking and other processes that the previously monolithic database was unaware of. Since the error handling code was about to get even more complicated, this seemed like a good time to refactor the way the BerkeleyDataWrapper was constructed.

In the latest version, I create command objects in each of the database methods and evaluate them all in a single method that handles all the failure scenarios. Now, this pattern isn't exactly earth shattering, but I think it works fairly well here so I thought I'd document it. After looking at the variables in my transaction execution, I constructed the following command object:

public abstract class DBCommand<R> { /\*\* \* Returns a configuration for the transaction that should be \* used when running this command. \* \* @return null by default \*/ public TransactionConfig getTransactionConfig() { return null; } /\*\* \* Runs the command within the given transaction. The transaction is \* committed by the invoker method so commit should not be called here. \* @param txn the transaction within which the code should be run \* @return the result of the command \*/ public abstract R run(Transaction txn) throws AuraException; /\*\* \* Gets a message that should be included upon failure that should \* include the command name and any extra data (item key, etc) \* \* @return the status message \*/ public abstract String getDescription(); }
The DBCommand allows you to specify: the configuration for the transaction in which you'd like to run (e.g. defining what the read/write semantics are); the actual code to run within the configured transaction; and a human readable description that could appear in log messages. When creating a concrete instance of DBCommand, you simply override run and use the transaction passed in to perform your application logic. DBCommand is parameterized, allowing you to specify an optional return value from the run method. Here's a simple example of how this is used to put a set of Attention objects (associations between users and items) into the database. Nothing is returned from this, so a type of Void is given.

/\*\* \* Puts attention into the entry store. Attentions should never be \* overwritten. \* \* @param pas the attention \*/ public void putAttention(final List<PersistentAttention> pas) throws AuraException { DBCommand<Void> cmd = new DBCommand<Void>() { @Override public Void run(Transaction txn) throws AuraException { for (PersistentAttention pa : pas) { allAttn.putNoOverwrite(txn, pa); } return null; } @Override public TransactionConfig getTransactionConfig() { // // We want putting attentions to be fast and not necessarily // durable or consistent across replicas. TransactionConfig txConf = new TransactionConfig(); Durability dur = new Durability( Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.WRITE_NO_SYNC, Durability.ReplicaAckPolicy.NONE ); txConf.setDurability(dur); return txConf; } @Override public String getDescription() { return "putAttention(" + pas.size() + " attns)"; } }; invokeCommand(cmd); }
In Aura, we may be collecting Attention objects at a good clip (e.g. "user X played song Y", or "user X starred article Z"), so we want to relax the transaction semantics as far as we can to keep the speed up while recording Attentions. If we lose some attention data due to a crash, it doesn't hurt that much, so this is a good trade-off. In order to modify the semantics, you can see that I've overridden getTransactionConfig to return a configuration with very little durability. A transaction with that config is created in the command invoker and passed into the run method. Since error handling is done externally, the run method only needs to have the basic application logic in it.

After creating the command object above, the last line of putAttention invokes the command, with the call going into the generic command invocation code.

/\*\* \* Invoke a database command, passing in a transaction to use for \* operating on the database. The configuration from the transaction \* is obtained from the command that is passed in. \* \* @param cmd the command object to invoke \* @param commit whether we should commit after running this transaction \* @param useCurrentTxn if the thread-local transaction should be used instead \* of a new transaction \* @return the result of the run method \* @throws AuraException in the event of a failure \*/ protected <R> R invokeCommand(DBCommand<R> cmd, boolean commit, boolean useCurrentTxn) throws AuraException { int numRetries = 0; int sleepTime = 0; while(numRetries < MAX_RETRIES) { Transaction txn = null; CurrentTransaction currTxn = null; try { // // If the write lock on quiesce isn't held, we can continue quiesce.readLock().lock(); // // Fetch the transaction config and create a transaction or // get the CurrentTransaction TransactionConfig tconf = cmd.getTransactionConfig(); if (useCurrentTxn) { currTxn = CurrentTransaction.getInstance(dbEnv); txn = currTxn.beginTransaction(tconf); } else { txn = dbEnv.beginTransaction(null, tconf); } // // Now run the command and commit if necessary. R result = cmd.run(txn); if (commit) { if (useCurrentTxn) { currTxn.commitTransaction(); } else { txn.commit(); } } return result; } catch (InsufficientReplicasException e) { // // In the event of a write operation that couldn't be sent // to a quorum of replicas, wait a bit and try again sleepTime = 2 \* 1000; } catch (InsufficientAcksException e) { // // We didn't get confirmation from other replicas that the // write was accepted. This likely happens when a replica // is going down (and when we are requiring acks). For us, // this is okay. } catch (ReplicaWriteException e) { // // We tried to write to this node, but this node is a replica. throw new AuraReplicantWriteException( "Cannot modify a replica: " + cmd.getDescription()); } catch (ReplicaConsistencyException e) { // // We require a higher level of consistency that is currently // available on this replica. Wait a bit and try again. sleepTime = 1000; } catch (LockConflictException e) { try { if (useCurrentTxn) { currTxn.abortTransaction(); } else { txn.abort(); } log.finest("Deadlock detected in command " + cmd.getDescription() + ": " + e.getMessage()); numRetries++; } catch (DatabaseException ex) { throw new AuraException("Txn abort failed", ex); } } catch (Throwable t) { try { if(txn != null) { if (useCurrentTxn) { currTxn.abortTransaction(); } else { txn.abort(); } } } catch (DatabaseException ex) { // // Not much that can be done at this point } throw new AuraException("Command failed: " + cmd.getDescription(), t); } finally { quiesce.readLock().unlock(); } // // Do we need to sleep before trying again? if (sleepTime > 0) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { // Nothing we can do about it. } } } throw new AuraException(String.format( "Command failed after %d retries: %s", numRetries, cmd.getDescription())); }
Scrolling through the invokeCommand method, you'll see that we use a structure fairly close to the invocation code that is given as an example in the BDB documentation. It all runs in a loop in case we need to retry due to conflicts, and it handles, in different ways, a number of the exceptions that might be thrown. Near the top of the loop is the logic that gets the transaction configuration from the command and uses it either to create a new transaction, or to start the thread's CurrentTransaction (used for BDB methods that don't otherwise take a transaction as a parameter). If all goes well, the code is run and committed and the result is returned. If not, the error handling kicks in.

One of the nice things about running the transactions this way is that it is very easy to modify the semantics around every transaction that runs. For example, I realized that there may be occasions that we'd like to quiesce the database, temporarily pausing all activity. In the previous iteration of the code, I would have needed to modify every method to check if it was okay to run, but since I had refactored everything, I could add the simple quiesce lock into the invoker. The quiesce instance variable in the code above is a ReentrantReadWriteLock, so if anybody came in and called the quiesce method, they'd request the write lock and no further database commands could run until the lock was released. I could see this being very useful for keeping track of failure rates and logging in general. Finally, if it turned out that I need to add a new kind of command that needs to further parameterize its execution environment, it would be easy to add another method to the DBCommand interface to get at its values. So all told, I think this works well, and maybe it'll be useful for somebody else too.


Post a Comment:
  • HTML Syntax: NOT allowed

Jeff Alexander is a member of the Information Retrieval and Machine Learning group in Oracle Labs.


« June 2016