Cluster/J is Oracle’s NoSQL Java development environment for MySQL NDB Cluster, and MySQL Release 9.4.0 brings the largest set of updates to Cluster/J in many years. My other post “Introducing MySQL Cluster/J 9.4” provides an overview of many of these. This article will focus on some of the thornier problems in database application development: How does an application behave when an expected table does not exist? What happens if tables are created, dropped, and altered while the application is running?
Database applications all depend on schema metadata, and frameworks like Cluster/J usually keep the metadata cached, for better performance, but the caches further complicate the situation when a schema changes. The approach used in Cluster/J has been to throw an exception – called ClusterJDataStoreException – whenever an operation has been disrupted by a schema change. The application code can catch the exception, make a call session.unloadSchema() to refresh the needed metadata, and then keep running. This broad description of the issue remains true in release 9.4, but the enhancements lie in the details. ClusterJDatastoreException now provides methods to help multiple threads coordinate their response, rather than racing against each other to solve the problem. A new subclass, ClusterJTableException, helps the particular problem case of an expected table that does not exist.
In this post I will take a hands-on approach using plenty of runnable sample code. Following along with the demo should require just a binary distribution of MySQL NDB Cluster (as found at dev.mysql.com) and a recent JDK. It should be possible to run the demo; create an empty directory, and begin with this small env-demo.sh script to help set up the demo environment. This script simply sets CLASSPATH and JDK_JAVA_OPTIONS to point the JVM to the Cluster/J jar files and ndbclient native object file.
# env-demo.sh
# Source this file to set up the demo environment
# MYSQL_HOME should point to an install tree
export MYSQL_HOME=/opt/mysql/9.4.0
# LIBPATH contains the libndbclient dynamic object (.so, .dll, or .dylib)
LIBPATH=$MYSQL_HOME/lib
# CLASSPATH contains clusterj.jar
export CLASSPATH="$MYSQL_HOME/share/java/*"
# Set JDK_JAVA_OPTIONS
export JDK_JAVA_OPTIONS="-Djava.library.path=$LIBPATH"
Running a Cluster
In order to run the examples, you will need to start an NDB cluster. There are several ways to do this: using MySQL Cluster Manager; using the NDB Operator with Kubernetes; using mysql-test-run, if you happen to have built NDB from source. Absent any of those, a configuration template and sandbox script will do the job. The config template should be saved in a file called ndb-template.ini.
# ndb-template.ini
[ndbd default]
DataMemory=30M
SharedGlobalMemory=20M
[ndb_mgmd]
NodeId=49
Hostname=localhost
DataDir=_WORK_PATH_/ndb-data
[ndbd]
NodeId=1
Hostname=localhost
DataDir=_WORK_PATH_/ndb-data
[ndbd]
NodeId=2
Hostname=localhost
DataDir=_WORK_PATH_/ndb-data
[api]
[api]
[api]
[api]
[api]
This template describes a cluster with two data nodes and up to five API nodes, all running on the local machine. The MySQL server will use one of these API slots, and the Cluster/J application will use another. The example tables will be stored in memory, so the total size of the database is limited to about 30 MB. The sandbox script is called sandbox.sh; it understands a six management commands, enough to take care of configuring, starting, and stopping the cluster.
# sandbox.sh
source env-demo.sh
WORK_TREE=Workspace
mkdir -p $WORK_TREE
export WORK_PATH=`cd $WORK_TREE && pwd`
export NDB_TLS_SEARCH_PATH=$WORK_PATH/ndb-certs
MYSQL_BIN=$MYSQL_HOME/bin
OPT_TLS="--ndb-tls-search-path=$NDB_TLS_SEARCH_PATH"
MGM_OPTS="$OPT_TLS --connect-retries=1"
init() {
# Create MySQL server with its configuration and directory structure
mkdir -p $WORK_PATH/mysqld/data
# Initialize mysql server
$MYSQL_BIN/mysqld --datadir=$WORK_PATH/mysqld/data \
--skip-log-bin --initialize-insecure
# Create a 2-node NDB cluster with its config and directory structure
mkdir $WORK_TREE/ndb-data $WORK_TREE/ndb-certs
# Create the NDB config file
sed s:_WORK_PATH_:$WORK_PATH: < ndb-template.ini > $WORK_PATH/CFG_NDB.ini
# Create the CA and node certificates
$MYSQL_BIN/ndb_sign_keys --create-CA $OPT_TLS --passphrase=Demo
$MYSQL_BIN/ndb_sign_keys --create-key $OPT_TLS --passphrase=Demo \
-f $WORK_PATH/CFG_NDB.ini
}
start() {
$MYSQL_BIN/ndb_mgmd --configdir=$WORK_PATH $OPT_TLS \
-f $WORK_PATH/CFG_NDB.ini
sleep 1
$MYSQL_BIN/ndbmtd --ndb-nodeid=1 $OPT_TLS
$MYSQL_BIN/ndbmtd --ndb-nodeid=2 $OPT_TLS
$MYSQL_BIN/mysqld --skip-log-bin --ndbcluster $OPT_TLS \
--datadir=$WORK_PATH/mysqld/data --pid-file=$WORK_PATH/mysqld/pid \
--log-error=$WORK_PATH/mysqld/log &
echo Starting MySQLD ...
}
stop() {
$MYSQL_BIN/mysqladmin -u root shutdown
sleep 1
$MYSQL_BIN/ndb_mgm $MGM_OPTS -e shutdown
sleep 1
}
case "$1" in
"init")
init
;;
"start")
start
;;
"test")
$MYSQL_BIN/ndb_mgm $MGM_OPTS -e "all status"
$MYSQL_BIN/mysql -u root -e "select 1"
;;
"demo")
java demo/TableDemo2.java
;;
"stop")
stop
;;
"clear")
test -f "$WORK_PATH/ndb-data/ndb_49.pid" && stop
rm -rf $WORK_PATH
;;
*)
echo "sandbox.sh init | start | test | demo | stop | clear"
esac
After saving the template and the sandbox script, it should take only a few steps to get MySQL NDB cluster up and running:
sh sandbox.sh init
sh sandbox.sh start
sh sandbox.sh test
“Table Not Found” Conditions and ClusterJTableException
Now I can illustrate how ClusterJTableException improves the application lifecycle. Make a directory called “demo” for Java code. A Cluster/J application establishes a mapping between a set of domain classes, used to implement the logic of the application, and a set of tables in the database. We can begin with a file demo/Customer.java that defines a mapping from a Customer (in Java) to a customers table (in SQL), by way of a Cluster/J annotation API known as PersistenceCapable.
// demo/Customer.java
package demo;
import com.mysql.clusterj.annotation.PersistenceCapable;
import com.mysql.clusterj.annotation.PrimaryKey;
@PersistenceCapable(table="customers")
public interface Customer {
@PrimaryKey
int getId();
void setId(int id);
String getName();
void setName(String name);
}
This file defines Customer as an interface. A subsequent call to session.newInstance(Customer.class) should create a new, empty instance of an object that implements the interface; a call to session.find() could fetch a customers row from the database and use it to populate a fully-fledged Customer instance. Both of these methods require Cluster/J to be connected to the database, and require the customers table to exist. The first noteworthy difference between Cluster/J 9.4 and previous versions concerns how Cluster/J behaves when the customers table does not exist in the database. I can illustrate this using the simple application in the file demo/TableDemo1.java.
// demo/TableDemo1.java
package demo;
import java.util.HashMap;
import com.mysql.clusterj.ClusterJHelper;
import com.mysql.clusterj.SessionFactory;
import com.mysql.clusterj.Session;
public class TableDemo1 {
public static void main(String[] args) {
HashMap<String, String> properties = new HashMap<String, String>();
properties.put("com.mysql.clusterj.database", "demo");
properties.put("com.mysql.clusterj.connectstring", "localhost:1186");
SessionFactory factory = ClusterJHelper.getSessionFactory(properties);
Session session = factory.getSession();
/* If the customers table does not exist in the database,
older releases of Cluster/J will throw ClusterJUserException.
Cluster/J 9.4 and later will throw ClusterJTableException.
*/
session.newInstance(Customer.class);
}
}
If you have Java 22 or later available locally, then JEP 458 allows you to launch multi-file source code programs, so you should be able to run the example code directly without stopping to compile anything, like this:
java demo/TableDemo1.java
With older versions of Java, you will have to compile both Customers.java and TableDemo1.java before running the demo. Here is the output we are looking for:
at com.mysql.clusterj.tie.DictionaryImpl.getTable(DictionaryImpl.java:133)
…
at com.mysql.clusterj.core.SessionImpl.getDomainTypeHandler(SessionImpl.java:1219)
at com.mysql.clusterj.core.SessionImpl.newInstance(SessionImpl.java:308)
at demo.TableDemo1.main(TableDemo1.java:24)
ClusterJTableException is a newly introduced class in Cluster/J 9.4. When an application tries to use a table that does not exist, previous versions of Cluster/J would throw ClusterJUserException, like this:
Exception in thread “main” com.mysql.clusterj.ClusterJUserException: Failure getting NdbTable for class demo.Customer, table customers. Verify that the table is defined with ENGINE=NDB.
at com.mysql.clusterj.core.metadata.DomainTypeHandlerImpl.(DomainTypeHandlerImpl.java:149)
…
at com.mysql.clusterj.core.SessionImpl.newInstance(SessionImpl.java:306)
at demo.TableDemo1.main(TableDemo1.java:24)
The ClusterJUserException treats the table not found condition as a “user programming error” — a way of saying that you, the developer, should have thought to create the table. ClusterJTableException treats it as something more like an expected step in the application lifecycle, where sometimes an application might start running before the database is ready for it.
There is also a behavioral difference between the two exceptions: when the older Cluster/J encountered the “table not found” condition, the ClusterJUserException was thrown immediately. The newer version here actually entered a sleep-and-retry loop, tried to open the table again, and then gave up after 50 milliseconds. This 50 msec of wait time is the default value for a new connection property, com.mysql.clusterj.table.wait.msec. It is designed to allow an application to gracefully handle “table not found” conditions, and the allowable values are from 0 to 1000, where 0 causes the wait loop to be disabled altogether, and 1000 enables a wait of one second. An application can wait longer than one second by configuring clusterj.table.wait.msec to 1000 and then looping around the call to session.newInstance(). ClusterJTableException provides a method getTableName() to return the name of the missing table, and a set of methods — elapsedNanos(), elapsedMicros(), elapsedMillis() — to report the accumulated wait time.
“ALTER TABLE” and ClusterJDatastoreException
The second improvement concerns the behavior inside a Cluster/J application when a schema is changed while the application is running. A schema change such as an ALTER TABLE statement will often cause an application to receive ClusterJDatastoreException. If the exception is due to a schema change, the application can call session.unloadSchema() to discard its old metadata, obtain fresh metadata, and try again.
This has been true for a long time, but this interface can fall short of requirements in a few key ways. Firstly, there was no simple way to distinguish schema errors from other types of errors. Secondly, session.unloadSchema() should be called just once by a single thread, while any other threads needing the same table should wait for it to complete. This was difficult to manage.
Cluster/J 9.4 introduces some new methods on ClusterJDatastoreException to improve this situation: isStaleMetadata(), isSchemaChangePending(), and awaitSchemaChange().
- boolean method isStaleMetadata() returns true if the exception was caused by stale metadata encountered while attempting to perform a data operation. This condition is the clue to tip us off on the fact that some schema has changed in the database, and the application is not yet aware of it. When this method returns true, the user should call session.unloadSchema() to refresh the metadata, then retry.
- boolean method isSchemaChangePending() returns true when some other thread has already called session.unloadSchema() to initiate schema change handling, but the handling had not yet completed at the time the exception was thrown. In this case, the user can call awaitSchemaChange() to pause until handling completes.
- void method awaitSchemaChange() will block the caller’s thread until schema change handling, already in progress in some other thread, has completed.
The next listing contains a utility class, Customers, illustrating some use of the new features.
// demo/Customers.java
package demo;
import com.mysql.clusterj.ClusterJDatastoreException;
import com.mysql.clusterj.ClusterJTableException;
import com.mysql.clusterj.Session;
import com.mysql.clusterj.Query;
import java.util.function.Supplier;
import java.util.List;
class Customers {
static final Class
cls = Customer.class;
static void ensureMappedTable(Session session) {
Customer instance = null;
int wait_msec = 0;
while(instance == null) {
try {
instance = session.newInstance(cls);
} catch (ClusterJTableException ex) {
wait_msec += ex.elapsedMillis();
System.out.println(
"Waiting for table: " + ex.getTableName());
}
}
if (wait_msec > 0)
System.out.println("Waited " + wait_msec +
" msec for table to become available.");
}
static void handleException(Session session,
ClusterJDatastoreException ex) {
if (ex.isStaleMetadata()) {
// This thread should do the work
session.unloadSchema(cls);
}
else if(ex.isSchemaChangePending()) {
// Wait for some other thread to finish handling the problem
ex.awaitSchemaChange();
}
else throw ex;
}
static
T runChecked(Session session, Supplier
function) {
while(true) {
try {
return function.get();
} catch(ClusterJDatastoreException ex) {
handleException(session, ex);
}
}
}
static Customer put(Session session, int id, String name) {
return runChecked(session, () ->
{ Customer customer = session.newInstance(cls);
customer.setId(id);
customer.setName(name);
session.savePersistent(customer);
return customer;
});
}
static Customer get(Session session, int id) {
return runChecked(session, () -> session.find(cls, id));
}
static List
getAll(Session session) {
return runChecked(session, () ->
{ Query
query = session.createQuery(
session.getQueryBuilder().createQueryDefinition(cls));
query.setOrdering(Query.Ordering.ASCENDING, "id");
return query.getResultList();
});
}
}
This code presents several concepts to explore.
- ensureMappedSession()calls session.newInstance(), catches ClusterJTableException, and loops until the expected table becomes available. It can be called at application startup time to ensure that the table exists before the application continues.
- handleException() illustrates the correct way to handle schema changes in a multi-threaded application, with one thread calling into session.unloadSchema(), and other threads waiting for it to finish. If the supplied ClusterJTableException is some other error condition, not related to schema change, it will be rethrown.
- runChecked() is a functional wrapper for any Cluster/J operation that might fail when a schema change is in progress. It tries to execute the operation, and on catching ClusterJDatastoreExceptionit will call handleException().
- The next three methods are wrappers that use runChecked() to perform common Cluster/J operations. The put() method calls session.newInstance() to create an instance of the domain object, fills in the supplied values, and then uses session.savePersistent() to save it in the database. The get() method is a simple wrapper over session.find(). getAll() uses a Cluster/J Query to fetch the whole customers table in primary key order.
A Complete Application
The overview of schema change handling is now complete, and I will conclude with a simple database application allowing someone to view and modify the data in the customers table. After saving the previous files (demo/Customer.java and demo/Customers.java), and then saving this final one as demo/TableDemo2.java, you will have a simple runnable database application.
// demo/TableDemo2.java
// This demo requires Cluster/J 9.4.0 or later.
package demo;
import com.mysql.clusterj.ClusterJHelper;
import com.mysql.clusterj.SessionFactory;
import com.mysql.clusterj.Session;
import java.io.Console;
import java.util.HashMap;
import java.util.Scanner;
public class TableDemo2 {
static final java.io.Console console = System.console();
static void display(Customer c) {
if(c == null) console.writer().println("[null]");
else console.writer().println(c.getId() + " : " + c.getName());
}
// user types "get all" and system prints list of customers
// user types "get [id]" and system responds with name
// user types "set [id] [name]" and system stores id and name in database
static void repl(Session session) {
Customer customer = null;
while(true) {
String line = console.readLine("> ");
if (line == null) break;
Scanner scanner = new Scanner(line);
if (scanner.findWithinHorizon("get", 0) != null) {
if(scanner.hasNextInt()) {
display(Customers.get(session, scanner.nextInt()));
} else if(scanner.hasNext("all")) {
for(Customer result : Customers.getAll(session))
display(result);
}
} else if (scanner.findWithinHorizon("set", 0) != null &&
scanner.hasNextInt()) {
display(Customers.put(session, scanner.nextInt(),
line.substring(scanner.match().end())));
} else console.writer().println("Error");
}
}
public static void main(String[] args) {
HashMap
properties = new HashMap
();
properties.put("com.mysql.clusterj.database", "demo");
properties.put("com.mysql.clusterj.connectstring", "localhost:1186");
properties.put("com.mysql.clusterj.table.wait.msec", "500");
properties.put("com.mysql.clusterj.tls.path",
System.getenv("NDB_TLS_SEARCH_PATH"));
SessionFactory factory = ClusterJHelper.getSessionFactory(properties);
Session session = factory.getSession();
Customers.ensureMappedTable(session);
repl(session);
}
}
The application is a simple client that understands two commands, set and get. It should be possible to run this from the sandbox script:
sh sandbox.sh demo
After the application starts up and connects to the database, it will print:
Waiting for table: customers
In another window, you can connect to the database and finally create the customers table.
mysql> create database demo;
Query OK, 1 row affected (0.051 sec)
mysql> use demo;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> CREATE TABLE customers ( id int not null primary key, name varchar(200) ) engine = ndb;
Query OK, 0 rows affected (0.199 sec)
Once the table exists, the demo will report finding it, and provide a prompt that allows you to edit data.
Waited 19100 msec for table to become available.
> set 1 Lucy
1 : Lucy
> set 2 Desi
2 : Desi
> get all
1 : Lucy
2 : Desi
>
The data you enter from the Cluster/J application can be confirmed from the MySQL client. Of course, the MySQL client also allows you to change the data structure – for instance, to change the length and character set of the name column.
mysql> select * from customers;
+----+-------+
| id | name |
+----+-------+
| 1 | Lucy |
| 2 | Desi |
+----+-------+
2 rows in set (0.011 sec)
mysql> ALTER TABLE customers MODIFY COLUMN name varchar(250) character set "Latin1";
Query OK, 2 rows affected (0.229 sec)
Records: 2 Duplicates: 0 Warnings: 0
mysql>
Now you can see Cluster/J’s schema change handling in action. The next time you issue a command from the Cluster/J demo, the data operation will fail, and it will notice the schema change, refresh its metadata, and then run the operation again.
> get all
Jul 03, 2025 12:39:22 PM com.mysql.clusterj.core.SessionFactoryImpl unloadSchema
INFO: Schema change - replaced DomainTypeHandler for customers version 2.0 with version 2.1
1 : Lucy
2 : Desi
>
Conclusion
MySQL NDB Cluster provides high reliability, high performance, and fault tolerance. It is a distributed database that can keep running despite power failures, natural disasters, and disrupted networks. Cluster/J is a development environment that makes it easy to harness that power in a Java application. But the years of engineering effort we on the NDB team put into making the database survive component failures don’t count for as much if the application then trips over predictable lifecycle events like schema deployment and shema change. We believe that the world’s most important database applications deserve the reliability of MySQL NDB Cluster and one of the easiest, best-designed application frameworks ever built, and we keep improving Cluster/J towards that goal.
