※ 本記事は、Witold Swierzyによる”Oracle 23c JDBC driver: support for reactive programming“を翻訳したものです。

2023年4月28日


目次

概要

Oracle Database 23cで提供される新機能のリストは非常に長くなっています。データベース管理に厳密に関連するものと、アプリケーション開発に関連するものがあります。もちろん、後者のカテゴリにはJavaとJDBCも含まれています。この記事では、アプリケーション開発のリアクティブ・アプローチに関連するOracle JDBCドライバ23cの主な2つの新機能について説明します。これにより、バックエンド開発者の作業が簡単になります。: 

  • JDBCリアクティブ拡張
  • JDBCリアクティブ・ストリーム収集

この記事には、サンプル・スクリプトも含まれています。このスクリプトは、データベース・レイヤーで、デモで使用されるオブジェクトを作成する次のスクリプトが実行されていることを前提としています。:

drop table if exists test;
create table test ( id         number(10) primary key,
                    timestamp# date,
                    val        varchar2(2000));

drop sequence if exists test_seq;                
create sequence test_seq;

create or replace trigger test_tr
before insert on test for each row
begin
    select test_seq.nextval
    into :new.id;

    :new.timestamp# := sysdate;
end;
/

insert into test(val) values (DBMS_RANDOM.STRING('a',2000));

commit;

ノート:
この記事で説明している機能(上記のスクリプトを含む)を示すアプリケーションの完全な実行可能コードは、私のgithubリポジトリで使用できます

JDBCリアクティブ拡張

Javaでのリアクティブ・プログラミングのサポートは、JDK v.9.0で初めて実装されているため、新機能ではありません。この機能を提供する他のプログラミング言語と同様に、通常の線形コードの実行中にいつでも発生する可能性のある特定の状況に対応するコードを作成できます。ただし、JDBCインタフェースではこのプログラミング方法をサポートしていません。アプリケーション・サーバー、ETLシステムなどの一部のバックエンド・アプリケーションが、一部のイベント(入力ストリームへの新規データの到着など)に対応する必要がある場合、または一般に、SQL文の完了を待機するだけではいけません。この機能がないと複雑なコードになり、通常は個別のスレッドとデータベース接続のプールが使用されます。このような開発の最終結果であるアプリケーションは、通常、保守が困難であり、パフォーマンスの問題を引き起こす可能性があります。このようなシステムのOracle JDBCドライバ23c開発で導入されたリアクティブ・プログラミングのサポートのおかげで、少なくとも場合によっては簡単に行うことができます。ただし、この機能はJDBC標準の一部ではないことを覚えておく必要があります。これは、この特定のJDBCドライバによって提供される(複数の)拡張機能の1つであるため、Connection、PreparedStatement、ResultSetなどの標準のJDBCインタフェースを使用するかわりに、Oracle実装を明示的に使用する必要があります。では、その仕組みを見てみましょう。このデモンストレーションのために、次のクラスを使用します。:

  • DMLSubscriber: リアクティブJava APIの必要に応じて、Flow.Subscriberインタフェースを実装します。このクラスのオブジェクトは、単純なINSERT…SELECT文を実行する準備された文から取得されるデータを非同期に消費するために使用されます。
  • QuerySubscriber: DMLSubscriberとまったく同じインタフェースを実装しますが、単純なSELECT文を実行する別の準備済文から取得されるデータを消費するために使用されます。
  • Main: これは主要なアプリケーション・クラスで、リアクティブ・プログラミングを使用してSELECT文およびINSERT文をNOWAITモードでコールします。

次のパッケージもインポートする必要があります。:

  • リアクティブAPIを使用するためのjava.util.concurrent.*
  • JDBCインタフェースのOracle JDBC実装を使用するためのoracle.jdbc.*

サンプル・サブスクライバ(リアクティブAPIで必要とされる)は、次のイベントに対応し、SQL文の実行中に発生する可能性があります。

  • パブリッシャへのサブスクリプション: この場合、サブスクライバのonSubscribe()メソッドがコールされます
  • パブリッシャが実行する新しいデータ伝播: この場合、サブスクライバのonNext()メソッドがコールされます。つまり、実際には、受信データの消費を担当するSQL関連サブスクライバの最も重要なメソッドです。その結果、アプリケーション・ロジック全体を実行する必要があります。
  • エラーが発生した場合: この場合、サブスクライバのonError()メソッドがコールされます。
  • SQL文の実行が完了: この場合、サブスクライバのonComplete()メソッドがコールされています

前述のすべてのメソッドは、発行元によってメイン実行シーケンスに非同期でコールされているため、SQL文の実行が終了するのを待たずにプログラムを実行できます。もちろん、両方のサブスクライバが異なるデータを消費する必要があります。

DMLSubscriberはDML文で処理された行数のみを取得するため、onNextメソッドは単一のLong値を消費できる必要があります。:

public void onNext(Long item) {
    System.out.println("DMLSubscriber.onNext: Number of rows processed : " + item);
}

QuerySubscriberは、SELECT文によって返されたOracleResultSet全体を消費する必要があります。:

public void onNext(OracleResultSet item) {
   try {
      ((ResultSet) item).next();
      System.out.println("QuerySubscriber.onNext: Number of rows in TEST table : "+
                         ((ResultSet)item).getNString(1));
   }
   catch (Exception e) {
      e.printStackTrace();
   }
}

当社の場合、ビジネス・ロジックは簡単で、標準出力でのみ適切な情報を表示するように制限されています。コードはデモンストレーションの目的でのみ作成されています。JavaリアクティブAPIで必要とされる他のメソッドは、両方のクラスと実装(同じ理由 – デモンストレーション)の簡単な動作でも同様で、作業コード全体を含むgithubリポジトリで確認できます。

必要なサブスクライバ・クラスが実装されているため、MainクラスでNOWAITモードでSQL文をコールするコードの記述を開始できます。:

// database connection used by psDML and PSQuery prepared statements
Connection con;

// prepared statements used to call INSERT and SELECT SQL statements in NOWAIT mode
PreparedStatement psDML, psQuery;

// of course we need to create the darabase connection and prepare statetements, but for this we use normal 
// JDBC functionalities
...
// after connection and prepare statements are created we can call appropriate SQL statements in NOWAIT mode
// it is being done by calling execute*AsyncOracle() methods defined in OraclePrepareStatements// these methods // return publishers, to which we need to subscribe appropriate subscribers
// as execute*AsyncOracle() methods are defined in OraclePreparedStatement, while psDML and psQuery 
// have been defined as standard JDBC PreparedStatement we need to cast them to their Oracle real 
// implementations; due to Oracle official documentation it is NOT RECOMMENDED to use normal Java casting
// for this purpose - instead of this Oracle recommends using Class.unwrap method

Flow.Publisher<OracleResultSet> fpQuery = psQuery.unwrap(OraclePreparedStatement.class).executeQueryAsyncOracle();
fpQuery.subscribe(new QuerySubscriber(latch));

Flow.Publisher<Long> fpDML = psDML.unwrap(OraclePreparedStatement.class).executeUpdateAsyncOracle();
fpDML.subscribe(new DMLSubscriber(latch));

そして … それがすべてです! まあ、ほとんどです。この場合、Main.main()メソッド全体の実行は非常に高速です。データベースへの接続、SQL文のバックグラウンド実行などよりも高速です。SQL文の実行が終了する前にプログラムが終了する(つまり、バックグラウンドで実行され、メイン処理が完了するまで待機しない)状況を回避するために、使用可能な任意のメソッド(CountDownLatchクラスなど)を使用できます。また、一部の System.out.println()呼び出しを追加して、どの部分が実行されているかについての情報を出力し、プログラムの実行中に出力されたメッセージを確認することもできます。:

Main.main: begin of the demonstration
Main.reactiveCallsDemo: connecting to the database.
Main.reactiveeCallsDemo: Connected to the database
Main.reactiveCallsDemo: SELECT started in NOWAIT mode
Main.reactiveCallsDemo: INSERT started in NOWAIT mode
Main.reactiveCallsDemo: end of method and return to Main.main.
Main.main: waiting for DML and Query subscribers
DMLSubscriber.onNext: Number of rows processed : 184
DMLSubscriber.onComplete: DML statement completed succesfully.
QuerySubscriber.onNext: Number of rows in TEST table : 184
QuerySubscriber.onComplete: SELECT statement completed succesfully.
Main.main: end of the demonstration

Process finished with exit code 0

いくつかのポイント

  1. データベース・プログラムに接続した後、SELECT文をコールします( “Main.reactiveCallsDemo: SELECT started in NOWAIT mode”メッセージ)
  2. SELECT実行が完了する前に、プログラムがINSERT文をコールします( “Main.reactiveCallsDemo: INSERT started in NOWAIT mode”メッセージ)
  3. これらの文をコールした後、Main.main()メソッドは完了を待機します。ただし、CountDownLatchクラス( “Main.main: waiting for DML and Query Subscriber”メッセージ)の使用が原因です。両方のSQL文がバックグラウンドで実行されます。
  4. SQL文の結果は、サブスクライバのonNext()メソッドによって表示されます。

アプリケーションの観点から、これらのコールは非同期で実行されました。ただし、実際のデータベースは非同期モードですか。Oracle公式ドキュメントによって提供される回答は明確です: いいえ。実際、データベース接続は通常の同期モードで動作します。理由は明らかです。1つのSQL操作の一貫性と原子性を確保する必要性を考慮する必要があります。これは、この機能をSQL文実行の非同期モードとして記述しないようにする理由でもあります。個人的には、リアクティブAPIサポートとして呼ぶことを好みます。この機能が実際に何であるかです。もちろん、問合せやDMLのみでなく、以下もサポートしているため、場合によっては非常に役立ちます。

  • connection creation
  • transaction management
  • ResultSet processing
  • LOB data processing

アプリケーション・コードからは、実際の非同期とみなされます。ただし、データベース・レイヤーの観点からは、従来の同期APIが引き続き使用されます。

JDBCリアクティブ・ストリーム収集

Oracle JDBCドライバ23cに実装されているもう1つの便利な機能は、Streamsの取込みです。パイプラインベースのSQL処理を実装する非常に複雑な方法を提供します。バックグラウンドでは、リアクティブJava APIを使用し、SQL処理の自動化に使用されるPublisherインタフェース(PushPublisher)を独自に実装します。1つのConnectionオブジェクトを使用して複数の異なるSQLコマンドを複数回実行できる従来のJDBC処理とは異なり、このAPIは独自のReactiveStreamsIngestionオブジェクトを使用してデータベース接続を処理し、固定列セットを持つ1つの表に制限します。また、内部ダイレクト・パス・ロードを使用して、指定した表へのデータ挿入を高速化します。考えられるユース・ケースは次のとおりです。:  

  • data stream processing
  • IOT systems
  • support for time series data
  • call detail records (CDRs)
  • social media

その他多数… 記事のこの部分は、その使用方法と仕組みを示しています。

まず、このAPIに必要なojdbc11.jarライブラリの外部rsi.jarucp.jarおよびons.jarを追加する必要があります。また、この記事の最初の部分で使用されるパッケージに加えて、この機能を使用するには、oracle.rsi.*パッケージをインポートする必要があります。

適切なjarアーカイブをプロジェクトに追加した後、RSI APIの使用方法を示すコードの作成を開始できます。

String value;

// auxiliary, traditional database connection used to check number of rows in TEST table
// this connection is NOT used by RSI API - we use it just to check how TEST table looks during the RSI 
// processing

Connection conAux = DriverManager.getConnection(
"<database connection string>",
"<username>",
"<password>");

// auxiliary PreparedStatement object used to check number of rows in TEST table
// this object is also NOT USED by RSI directly
PreparedStatement psAux = conAux.prepareStatement("SELECT COUNT(*) FROM TEST");
// auxiliary ResultSet object used to check number of rows in TEST table
ResultSet rsAux;
int countInt;

/* Streams Ingestion additionally to traditional JDBC parameters, like
connection string, username and password, requires providing the following data
1. ExecutorService: pool of threads responsible for execution
2. buffer size
3. interval (in seconds) between pushing the data into the database
4. names of target schema, table and columns
*/

System.out.println("Main.streamsIngestDemo: connecting to the database.");
// first we need to create a pool of executors, which will pushing the data to the TEST table
ExecutorService es = Executors.newFixedThreadPool(5);
// now we can create appropriate ReactiveStreamsIngestion object
ReactiveStreamsIngestion rs = ReactiveStreamsIngestion
   .builder()
   .url("<database connection string>")
   .username("<username>")
   .password("<password>")
   .executor(es)                             // in this place we're assiging executor to the RSI object
   .bufferRows(60)                           // in this place we set the size of the buffer for incoming data
   .bufferInterval(Duration.ofSeconds(2))    // in this place we set the frequency of the push operations
   .schema("<username which owns TEST table>")
   .table("TEST")
   .columns(new String[] {"VAL"})
   .build();
System.out.println("Main.streamsIngestiondemo: connected to the database");
// Streams Ingestion uses reactive calls and provides its own publisher and subscriber
PushPublisher<Object[]> pushPublisher = ReactiveStreamsIngestion.pushPublisher();
pushPublisher.subscribe(rs.subscriber());
System.out.println("Main.streamsIngestionDemo: ReactiveStreamsIngestion configured.");

// now - let's start to generate the data, which will be pushed to the database
for (int i = 1; i <= 60; i++) {
   // generation of a random value
   value = UUID.randomUUID().toString();
   // pushing the data into the database
   pushPublisher.accept(new Object[] {value});
   System.out.println("Main.streamsIngestionDemo: A random string #"+i+
                      " has been generated and pushed into the    database stream.");

   /* auxiliary prepared statement got from the auxiliary, separate database connection,
     checks the number of rows in the TEST table between subsequential push operations.
   */ 
   rsAux = psAux.executeQuery();
   rsAux.next();
   countInt = rsAux.getInt(1);
   System.out.println("Main.streamsIngestionDemo: Number of rows in TEST table: "+countInt);
   Thread.sleep(200);
}

// at the end we need to close the publisher and shut down the executor
pushPublisher.close();
rs.close();
es.shutdown();
System.out.println("Main.streamsIngestionDemo: end of method and return to Main.main.");
}

どのように動作しますか。見てみましょう…

Main.main: begin of the demonstration
Main.streamsIngestDemo: connecting to the database.
...
Main.streamsIngestiondemo: connected to the database
Main.streamsIngestionDemo: ReactiveStreamsIngestion configured.
Main.streamsIngestionDemo: A random string #1 has been generated and pushed into the database stream.
Main.streamsIngestionDemo: Number of rows in TEST table: 368
...
Main.streamsIngestionDemo: Number of rows in TEST table: 377
Main.streamsIngestionDemo: A random string #11 has been generated and pushed into the database stream.
...
Main.streamsIngestionDemo: A random string #20 has been generated and pushed into the database stream.
Main.streamsIngestionDemo: Number of rows in TEST table: 386
...
Main.streamsIngestionDemo: A random string #37 has been generated and pushed into the database stream.
Main.streamsIngestionDemo: Number of rows in TEST table: 403
...
Main.streamsIngestionDemo: A random string #60 has been generated and pushed into the database stream.
Main.streamsIngestionDemo: Number of rows in TEST table: 411
Main.streamsIngestionDemo: end of method and return to Main.main.

TEST表の行数は、個別の接続で独立してチェックされ、定期的に増加します。2秒ごとに(プッシュ操作の頻度を2秒に設定するため)、新しい行のセットが挿入されます。これらのプッシュ操作は、メイン・アプリケーション・スレッドに対して非同期で実行されるため、(この場合は)その間、ターゲット表の行数をチェックしたり、(通常は)プッシュするデータの新しいセットを提供するアプリケーション・ロジックを実装できます。…

まとめ

この記事で説明する両方のAPIには独自のユースケースがあります。もちろん、技術的には任意のタイプのJavaプログラムでそれらを使用できますが、それらの具体的な内容は、アプリケーション・ロジックを実行してデータを同時に処理する必要があるバックエンド・システムの一部でほとんど使用されます。これらは、少なくとも時として、非同期処理の手動コーディングを避けるために使用でき、この方法で、アプリケーション開発の複雑さ(および同時にコスト)を削減できます。

その他のリソース

Oracle JDBC Driver 23c Developer’s Guide: JDBC Reactive Extensions

Oracle 23c JDBC Developers Guide: Reactive Streams Ingestion

この記事に記載されている例の作業コードを含むGitHubリポジトリ