Table of Contents
Introduction
List of new features provided by Oracle Database 23c is very long. Some of them are related strictly to the database administration, while some others to the application development. The latter category, of course, covers also Java and JDBC. This article describes two main new features of Oracle JDBC driver 23c related to reactive approach of application development, which may make back-end developer’s life easier:
- JDBC Reactive Extensions
- JDBC Reactive Streams Ingestion
This article contains also sample scriplets, which assume, that at the database layer the following script, creating objects used by the demo, has been executed:
drop table if exists test;
create table test ( id number(10) primary key,
timestamp# date,
val varchar2(2000));
drop sequence if exists test_seq;
create sequence test_seq;
create or replace trigger test_tr
before insert on test for each row
begin
select test_seq.nextval
into :new.id;
:new.timestamp# := sysdate;
end;
/
insert into test(val) values (DBMS_RANDOM.STRING('a',2000));
commit;
Note:
Complete executable code of the application, which demonstrates features described in this article, including above script, is available in my github repo
JDBC Reactive Extensions
Support for reactive programming in Java has been implemented for the first time in JDK v.9.0, so it is not a new feature. As in other programming languages providing this capability, it allows for creating a code, which reacts to particular situations, which may happen at any moment during a normal, linear code execution. However JDBC interface does not provide support for this way of programming. In case of some back-end applications, like application servers, ETL systems, etc. which must react to some events (like arrival of a new data in an input stream, etc.) or, in general, cannot just wait for a SQL statement to completion, lack of this functionality may drive to a complex code, which usually uses separate threads and a pool of database connections. An application, which is the final result of such development, is usually difficult to maintain and may cause some performance issues. Thanks to support for reactive programming introduced in Oracle JDBC driver 23c development of such systems may be, at least in some cases, easier. However – we have to remember, that this feature is not a part of JDBC standard. It is one of (multiple) extensions provided by this particular JDBC driver, so – instead of using standard JDBC interfaces, like Connection, PreparedStatement, ResultSet, we need to use explicitly their Oracle implementations. So let’s see how it works. For the purpose of this demonstation we will use the following classes:
- DMLSubscriber: it implements Flow.Subscriber interface, as required by the reactive Java API. An object of this class will be used to asynchronous consume data coming from a prepared statement executing a simple INSERT…SELECT statement.
- QuerySubscriber: it implements exactly the same interface as DMLSubscriber, but is used to consume data coming from another prepared statement executing a simple SELECT statement.
- Main: this is the main application class, which uses reactive programming to call SELECT and INSERT statements in NOWAIT mode.
We also need to import the following packages:
- java.util.concurrent.* to use Reactive API
- oracle.jdbc.* to use Oracle JDBC implementations of JDBC interfaces
Our sample subscribers (as required by the reactive API) react to the following events, which may happen during the xecution of a SQL statement
- subscription to a publisher: in this case a subscriber’s onSubscribe() method is being called
- new data propagation being done by a publisher: in this case a subscriber’s onNext() method is being called; this is, in fact, the most important method of SQL-related subscribers as it is responsible for consuming the incoming data – as a result whole application logic must be executed here.
- an error occurs: in this case a subscriber’s onError() method is being called
- SQL statement execution has been completed: in this case a subscriber’s onComplete() method is being called
All above method are being called by a publisher asynchronously to the main execution sequence, so the program can be executed without waiting for the end of SQL statements execution. Of course both subscribers need to consume different data.
DMLSubscriber gets only number of rows processed by a DML statement, so its onNext method should be able to consume a single one Long value:
public void onNext(Long item) {
System.out.println("DMLSubscriber.onNext: Number of rows processed : " + item);
}
while QuerySubscriber needs to consume the whole OracleResultSet returned by a SELECT statement:
public void onNext(OracleResultSet item) {
try {
((ResultSet) item).next();
System.out.println("QuerySubscriber.onNext: Number of rows in TEST table : "+
((ResultSet)item).getNString(1));
}
catch (Exception e) {
e.printStackTrace();
}
}
Of course, in our case, business logic is trivial and is limited to display appropriate information at the standard output only – the code has been created just for the purpose of demonstration. Other methods, required by Java reactive API look similalry in both classes and implement (because of the same reason – demonstration) also trivial behaviour, they can be reviewed in github repo containing the whole working code.
As required Subscriber classes have been implemented, we can start to write a code responsible for calling SQL statements in NOWAIT mode in Main class:
// database connection used by psDML and PSQuery prepared statements Connection con; // prepared statements used to call INSERT and SELECT SQL statements in NOWAIT mode PreparedStatement psDML, psQuery; // of course we need to create the darabase connection and prepare statetements, but for this we use normal // JDBC functionalities ... // after connection and prepare statements are created we can call appropriate SQL statements in NOWAIT mode // it is being done by calling execute*AsyncOracle() methods defined in OraclePrepareStatements// these methods // return publishers, to which we need to subscribe appropriate subscribers // as execute*AsyncOracle() methods are defined in OraclePreparedStatement, while psDML and psQuery // have been defined as standard JDBC PreparedStatement we need to cast them to their Oracle real // implementations; due to Oracle official documentation it is NOT RECOMMENDED to use normal Java casting // for this purpose - instead of this Oracle recommends using Class.unwrap method Flow.Publisher<OracleResultSet> fpQuery = psQuery.unwrap(OraclePreparedStatement.class).executeQueryAsyncOracle(); fpQuery.subscribe(new QuerySubscriber(latch)); Flow.Publisher<Long> fpDML = psDML.unwrap(OraclePreparedStatement.class).executeUpdateAsyncOracle(); fpDML.subscribe(new DMLSubscriber(latch));
And … that’s all! Well, almost. In our case the execution of the whole Main.main() method can be very quick. Quicker, than connecting to a database, background execution of SQL statements, etc. To avoid situation, where the program ends before the end of the execution of a SQL statement (which are, we have to remember, being executed in the background – the main processing does not wait for their completion!), we can use any available method, for example CountDownLatch class. Also we can add some System.out.println() calls printing information about which part is being executed in which moment and check messages printed out during the execution of our program:
Main.main: begin of the demonstration Main.reactiveCallsDemo: connecting to the database. Main.reactiveeCallsDemo: Connected to the database Main.reactiveCallsDemo: SELECT started in NOWAIT mode Main.reactiveCallsDemo: INSERT started in NOWAIT mode Main.reactiveCallsDemo: end of method and return to Main.main. Main.main: waiting for DML and Query subscribers DMLSubscriber.onNext: Number of rows processed : 184 DMLSubscriber.onComplete: DML statement completed succesfully. QuerySubscriber.onNext: Number of rows in TEST table : 184 QuerySubscriber.onComplete: SELECT statement completed succesfully. Main.main: end of the demonstration Process finished with exit code 0
Some points
- After connecting to the database program calls SELECT statement ( “Main.reactiveCallsDemo: SELECT started in NOWAIT mode” message)
- Before SELECT execution is completed program calls INSERT statement ( “Main.reactiveCallsDemo: INSERT started in NOWAIT mode” message)
- After calling these statements Main.main() method waits for their completion. But it is caused by using CountDownLatch class ( “Main.main: waiting for DML and Query subscribers” message). Both SQL statements are executed in the background.
- Results of the SQL statements are displayed by subscriber’s onNext() methods.
From the point of view of an application these calls have been executed asynchronusly. However – is it a real database asynchronous mode? The answer provided by the Oracle official documentation is clear: NO. In fact, the database connection works still in normal, synchronous mode: the reason is obvious – we need to take into account the need of ensuring consistency and atomicity of a single one SQL operation. This is also the reason why I am trying to avoid describing this feature as asynchronous mode of SQL statement execution. Personally prefer to call it as Reactive API support, what this feature actually is. Of course using it in some cases can be very useful, especially as it supports not only queries and DMLs, but also
- connection creation
- transaction management
- ResultSet processing
- LOB data processing
and from an application code it looks as the real asynchrony. But from the database layer perspective, it still uses traditional, synchronus API.
JDBC Reactive Streams Ingestion
Another nice feature implemented in Oracle JDBC driver 23c is Streams Ingestion. It provides very interresting way of implementing pipeline-based SQL processing. In the background it uses reactive Java API and provides its own implementation of Publisher interface (PushPublisher) which is used to automate the SQL processing. In opposition to a traditional JDBC processing, which can use a single Connection object to execute multpiple different SQL commands agains multiple different tables, this API uses its own ReactiveStreamsIngestion object to handle the database connection, limited to single one table with fixed set of columns. It also uses internally direct path load to speed up the data insertion into the specified table. Possible use cases cover:
- data stream processing
- IOT systems
- support for time series data
- call detail records (CDRs)
- social media
and many more… This part of the article shows how to use it and how it works.
First of all we need to add some external to ojdbc11.jar libraries required by this API: rsi.jar, ucp.jar and ons.jar. Also, additionally to packages used in the first part of this article, to use this feature, there is need to import oracle.rsi.* package.
After adding appropriate jar archives into the project, we can start to write a code, which demonstrates how to use RSI API
String value;
// auxiliary, traditional database connection used to check number of rows in TEST table
// this connection is NOT used by RSI API - we use it just to check how TEST table looks during the RSI
// processing
Connection conAux = DriverManager.getConnection(
"<database connection string>",
"<username>",
"<password>");
// auxiliary PreparedStatement object used to check number of rows in TEST table
// this object is also NOT USED by RSI directly
PreparedStatement psAux = conAux.prepareStatement("SELECT COUNT(*) FROM TEST");
// auxiliary ResultSet object used to check number of rows in TEST table
ResultSet rsAux;
int countInt;
/* Streams Ingestion additionally to traditional JDBC parameters, like
connection string, username and password, requires providing the following data
1. ExecutorService: pool of threads responsible for execution
2. buffer size
3. interval (in seconds) between pushing the data into the database
4. names of target schema, table and columns
*/
System.out.println("Main.streamsIngestDemo: connecting to the database.");
// first we need to create a pool of executors, which will pushing the data to the TEST table
ExecutorService es = Executors.newFixedThreadPool(5);
// now we can create appropriate ReactiveStreamsIngestion object
ReactiveStreamsIngestion rs = ReactiveStreamsIngestion
.builder()
.url("<database connection string>")
.username("<username>")
.password("<password>")
.executor(es) // in this place we're assiging executor to the RSI object
.bufferRows(60) // in this place we set the size of the buffer for incoming data
.bufferInterval(Duration.ofSeconds(2)) // in this place we set the frequency of the push operations
.schema("<username which owns TEST table>")
.table("TEST")
.columns(new String[] {"VAL"})
.build();
System.out.println("Main.streamsIngestiondemo: connected to the database");
// Streams Ingestion uses reactive calls and provides its own publisher and subscriber
PushPublisher<Object[]> pushPublisher = ReactiveStreamsIngestion.pushPublisher();
pushPublisher.subscribe(rs.subscriber());
System.out.println("Main.streamsIngestionDemo: ReactiveStreamsIngestion configured.");
// now - let's start to generate the data, which will be pushed to the database
for (int i = 1; i <= 60; i++) {
// generation of a random value
value = UUID.randomUUID().toString();
// pushing the data into the database
pushPublisher.accept(new Object[] {value});
System.out.println("Main.streamsIngestionDemo: A random string #"+i+
" has been generated and pushed into the database stream.");
/* auxiliary prepared statement got from the auxiliary, separate database connection,
checks the number of rows in the TEST table between subsequential push operations.
*/
rsAux = psAux.executeQuery();
rsAux.next();
countInt = rsAux.getInt(1);
System.out.println("Main.streamsIngestionDemo: Number of rows in TEST table: "+countInt);
Thread.sleep(200);
}
// at the end we need to close the publisher and shut down the executor
pushPublisher.close();
rs.close();
es.shutdown();
System.out.println("Main.streamsIngestionDemo: end of method and return to Main.main.");
}
And how it works? Let’s see …
Main.main: begin of the demonstration Main.streamsIngestDemo: connecting to the database. ... Main.streamsIngestiondemo: connected to the database Main.streamsIngestionDemo: ReactiveStreamsIngestion configured. Main.streamsIngestionDemo: A random string #1 has been generated and pushed into the database stream. Main.streamsIngestionDemo: Number of rows in TEST table: 368 ... Main.streamsIngestionDemo: Number of rows in TEST table: 377 Main.streamsIngestionDemo: A random string #11 has been generated and pushed into the database stream. ... Main.streamsIngestionDemo: A random string #20 has been generated and pushed into the database stream. Main.streamsIngestionDemo: Number of rows in TEST table: 386 ... Main.streamsIngestionDemo: A random string #37 has been generated and pushed into the database stream. Main.streamsIngestionDemo: Number of rows in TEST table: 403 ... Main.streamsIngestionDemo: A random string #60 has been generated and pushed into the database stream. Main.streamsIngestionDemo: Number of rows in TEST table: 411 Main.streamsIngestionDemo: end of method and return to Main.main.
As we see – number of rows in TEST table, checked independently in a separate connection, grows periodically – in every 2 seconds (as we set the push operation frequency to 2 seconds) new set of rows is inserted. These push operations are executed asynchronously to the main application thread, so it is possible to (in this case) check number of rows in the target table in the meantime, or (in general) implement any application logic providing new sets of the data to push…
Summary
Both APIs, described in this article, have their own use cases. It is, of course, technically possible to use them in any type of Java program, however, because of their specifics they will be used mostly in some back-end systems, which need to execute an application logic and process the data at the same time. They, at least sometimes, can be used to avoid manual coding of asynchronous processing, and in this way to reduce complexity (and at the same time – costs) of an applicatioin development.
Additional Resources
Oracle JDBC Driver 23c Developer’s Guide: JDBC Reactive Extensions
Oracle 23c JDBC Developers Guide: Reactive Streams Ingestion
GitHub repository with the working code of examples provided in this article
