X

Technologie - Trends - Tipps&Tricks
in deutscher Sprache

Alles im Flow mit JDBC 21c - der asynchrone Treiber ist da!

Marcel Boermann-Pfeifer
Principal Solution Engineer

Jeder Java Programmierer kennt das sicherlich: Datenbankzugriffe kosten Zeit, Aufrufe an Klassen im JDBC Treiber und Netzwerkzugriffe blockieren die Aufrufer-Threads teilweise für viele Millisekunden pro Aufruf. Die Zahl der Datenbank-Aufrufe läßt sich zwar durch Batch Operationen und Puffer wie Array-Fetch Größen reduzieren, aber dennoch könnte durch Parallelverarbeitung innerhalb eines Java Programms noch viel erreicht und verbessert werden. Jedoch war es mit hohem Aufwand verbunden, parallel zu laufenden Datenbank-Calls weiterzuarbeiten. Der Aufwand bestand darin, eigene Threads zu erzeugen und zu verwalten, um z.B. bei laufenden Inserts weiter aus Dateien zu lesen oder ein schnelles, weil asynchrones "ich bin fertig" an den Client zurückzumelden während die Datenbank-Operation noch lief. Viele Datenbank-Interaktionen wie Insert, Update oder Delete müßten eigentlich gar nicht auf das Ergebnis des Zugriffs, womöglich noch verbunden mit einem trägen commit, warten. Es hätte doch genügt, auf die Sicherungs-Mechanismen von Treiber und Datenbank zu vertrauen wie z.B. Transparent Application Continuity (kurz TAC) und Database Replay. Dann könnten immer wieder asynchrone Arbeitspakete an die Datenbank versandt werden um sie so besser unter "back pressure" zu setzen, um ein aktuelles Schlagwort zu verwenden.

Diese Problemstellung existiert nicht nur im Bereich von Datenbanken, sondern eigentlich überall - bei REST Service Zugriffen, bei Dateiverarbeitung, für schnellere User Interfaces, bei Messaging Diensten und vielem mehr. Die JavaScript-Welt hat es einst vorgelebt und sämtliche Operationen stets asynchron ausgeführt. Und so entstanden sehr bald auch in Java Ansätze für lightweight-Threads, Stream-Verarbeitung und Ereignis-Steuerung. Diese mündeten in das aktuelle und allseits anerkannte "Reactive Programming" Paradigma auf Basis einer Streaming-API in aktuellen JDKs und darauf aufsetzenden Frameworks zur leichtgewichtigen Parallelverarbeitung wie RxJava, Reactor, akka usw.

Es fehlte jetzt noch ein parallel verarbeitender, nicht-blockierender Datenbank-Treiber, der beispielsweise auf bestehende JDBC Treiber aufsetzt und herkömmliche Operationen in parallele Threads transparent verpackt. Oder noch besser: Es gäbe optimierte JDBC Treiber, deren Innerstes bereits non-blocking, parallel und in Streams "denkt". Ein erster Ansatz in dieser Richtung von Oracle hieß ADBA (Asynchronous DataBase Access), ein OpenSource Projekt. Dieser wurde allerdings aufgrund des anders strukturierten Projekts "R2DBC" (reactive relational database connectivity) aufgegeben, welches eine Unterstützung durch weitere Datenbank-Hersteller fand. Und an diesem Punkt stehen wir aktuell: dem neuen Oracle JDBC Treiber 21c - der übrigens auch mit Datenbank Version 19c schon erfolgreich getestet wurde.
 

Der Oracle JDBC Treiber 21c bietet:

  • Eine Reactive Streams API für beliebige Oracle Datenbank Zugriffe für asynchrone, nicht-blockierende Datenbank-IOs.
  • Eine noch performantere Reactive Streams Ingestion API für besonders schnelle und nicht-blockierende Daten-Inserts.
  • Eine Basis für Reactive Programming Frameworks wie RxJava , Reactor, akka und Co.
  • Und seit wenigen Tagen ein auf dem neuen JDBC Treiber sowie auf Reactor aufsetzender R2DBC Treiber als OpenSource von Oracle. Nur ein dünner Layer, denn glücklicherweise bietet bereits der JDBC Treiber die meisten Features an, sie müssen lediglich durchgereicht werden.

Der Treiber benötigt mindestens Java Version 11 für die Nutzung der Reactive Streams APIs.

 

Wie sieht die Reactive Streams API im neuen JDBC Treiber aus ?

Kurz gesagt bietet die in JDBC altbekannte Klasse "PreparedStatement" im Oracle Treiber nun die zusätzlichen Methoden wie

executeQueryAsyncOracle (für SELECTs), 
executeBatchAsyncOracle (für Batch DML Operationen), 
executeUpdateAsyncOracle (für einzelne DML Operationen) und
executeAsyncOracle (meist für DDL Operationen).

Die Methoden liefern statt der typischen Ergebniswerte einen java.concurrency.Flow.Publisher zurück, an den man sich im eigenen Code als java.concurrency.Flow.Subscriber anhängen kann um auf Ereignisse daraus (wie z.B. onNext() beim Lesen von Datensätzen) zu reagieren. Um die neuen Methoden nutzen zu können, sollte man aus einem generischen "PreparedStatement" ein "OraclePreparedStatement" zaubern. Doch ein simpler "cast" (zauber) reicht nicht aus, es muß schon ein "unwrap" sein.

Ein einfaches Beispiel aus unserer Dokumentation zeigt eine Methode, die eine Query ausführt und die Ergebnismenge einem Subscriber zur Verfügung stellt:

  Flow.Publisher<OracleResultSet> readData(Connection connection)
    throws SQLException {

    PreparedStatement queryStatement = connection.prepareStatement(
      "SELECT id, first_name, last_name FROM employee_names");

    Flow.Publisher<OracleResultSet> queryPublisher =
      queryStatement.unwrap(OraclePreparedStatement.class)
        .executeQueryAsyncOracle();

    // Close the PreparedStatement after the result set is consumed.
    queryStatement.closeOnCompletion();

    return queryPublisher;
  }

An anderer Stelle könnte ein Java Code die Datensätze wie nachfolgend auslesen; ein vorformulierter Subscriber räumt sowohl im Fehlerfall als auch am Ende des Ereignis-Streams auf und schließt dann das Statement. Weitere Subscriber könnten die Employee-Objekte z.B. auf dem Bildschirm ausgeben.

    Flow.Publisher<Employee> fetchData(ResultSet resultSet)
    throws SQLException {
        Statement resultSetStatement = resultSet.getStatement();

    Flow.Publisher<Employee> employeePublisher =
      resultSet.unwrap(OracleResultSet.class)
        .publisherOracle(oracleRow -> {
          try {
            return new Employee(
              oracleRow.getObject("id", Long.class),
              oracleRow.getObject("first_name", String.class),
              oracleRow.getObject("last_name", String.class));
          }
          catch (SQLException getObjectException) {
            // Unchecked exceptions thrown by a row mapping function will be
            // emitted to each Subscriber's onError method.
            throw new RuntimeException(getObjectException);
          }
        });

    employeePublisher.subscribe(
      // This subscriber will close the ResultSet's Statement
      new Flow.Subscriber<Employee>() {
        public void onSubscribe(Flow.Subscription subscription) {
          subscription.request(Long.MAX_VALUE);
        }
        public void onNext(Employee item) { }
        public void onError(Throwable throwable) { closeStatement(); }
        public void onComplete() { closeStatement(); }
        void closeStatement() {
          try { resultSetStatement.close(); }
          catch (SQLException closeException) { log(closeException); }
        }
      });

    return employeePublisher;
  }

 

Was hat es mit der schnellen Ingestion API auf sich ?

Prinzipiell könnte man auch die neuen asynchronen Batch-Statements verwenden, um Daten in die Datenbank zu schreiben.
Die Reactive Streams Ingestion API bietet neben einer asynchronen Komfort-Funktion (PushPublisher genannt) den Direct Path Insert an, der neue Daten direkt und deutlich schneller in die Datenbank-Dateien einfügt. Dabei ist zu beachten, daß ein Direct Path Insert keine Datenbank-Trigger ausführt oder Constraints prüft sowie Daten immer am Ende einer Tabelle einfügt ohne auf freien Speicher zu achten. Somit ist der Mechanismus am besten auf reine "Sammel-Tabellen" anzuwenden, z.B. in einem Staging Bereich. Um die "Datenpumpe" noch weiter zu beschleunigen könnten dieselben Tabellen auf NOLOGGING gesetzt werden. Das muß jedoch manuell erfolgen und im Einzelfall entschieden werden weil nicht gerade unkritisch, fehlen dabei Log-Informationen für die Wiederherstellung der Tabelle bei einem Recovery. Für Datenbankentwickler ist dies allerdings nichts Neues.

 
Um die neue Ingestion API zu nutzen wird zusätzlich zum JDBC Treiber das "rsi.jar" im CLASSPATH benötigt.
Ein Video mit Demo zur neuen Ingestion API liegt auf Youtube - mit Hinweis zu einem Docker Container mit enthaltener Demo.
Hierzu ein kleines Code Beispiel aus unserer Dokumentation, es fügt auf die Schnelle drei Datensätze ein:
 

package oracle.rsi.demos;
import java.sql.SQLException;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import oracle.rsi.ReactiveStreamsIngestion;
import oracle.rsi.PushPublisher;

public class SimplePushPublisher {

  public static void main(String[] args) throws SQLException {

    ExecutorService workerThreadPool = Executors.newFixedThreadPool(2);

    ReactiveStreamsIngestion rsi = ReactiveStreamsIngestion
        .builder()
        .url(
            "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)
(HOST=example.com)(PORT=5521))(CONNECT_DATA=(SERVICE_NAME=myservice.com)))")
        .username(<username>)
        .password(<password>)
        .schema(<schema>)
        .executor(workerThreadPool)
        .bufferRows(10)
        .bufferInterval(Duration.ofSeconds(20))
        .table("customers")
        .columns(new String[] { "id", "name", "region" })
        .build();

    PushPublisher <Object[]> pushPublisher = ReactiveStreamsIngestion.pushPublisher();
    pushPublisher.subscribe(rsi.subscriber());

    //Ingests byte arrays using the accept method
    pushPublisher.accept(new Object[] { 1, "John Doe", "North" });
    pushPublisher.accept(new Object[] { 2, "Jane Doe", "North" });
    pushPublisher.accept(new Object[] { 3, "John Smith", "South" });

    try {
      pushPublisher.close();
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }

    rsi.close();

    workerThreadPool.shutdown();

  }

}

 

 

Wie sieht Code aus, der den R2DBC Treiber nutzt ?

Der R2DBC Code für Oracle nutzt zusätzlich den Reactor-Framework. Asynchrone Operationen liefern entweder Reactor's "Mono" oder "Flux" zurück, je nachdem ob Einzelwerte als Ergebnis zu erwarten sind oder Ergebnismengen. Einfache Mappings von Datenbank-Rows nach Java Objekten sind ebenfalls enthalten und ermöglichen ein zumindest unidirektionales, leichtes Binding an Java Objekte. Auch Metadaten-Informationen sind als Java Objekte enthalten und vorbefüllt. Mit R2DBC und "hübschen" Lambda-Expressions sieht der nötige Code für Datenbank-Zugriffe noch wesentlich schlanker und lesbarer aus als mit den Standard JDK "concurrency" Klassen, die der JDBC Treiber nutzt. Der Oracle R2DBC Treiber ist mit Anleitung und Beispielen auf github erhältlich.
Auch das "Spring Data" Projekt hat sein Augenmerk bereits auf den Oracle R2DBC Treiber gelegt und prüft aktuell dessen Einbindung in den Spring Framework.

ConnectionFactory connectionFactory = ConnectionFactories.get(
  "r2dbc:oracle://db.example.com:1521/db.service.name");

Mono.from(connectionFactory.create())
  .flatMapMany(connection ->
    Flux.from(connection.createStatement(
      "SELECT 'Hello, Oracle' FROM sys.dual")
      .execute())
      .flatMap(result ->
        result.map((row, metadata) -> row.get(0, String.class)))
      .doOnNext(System.out::println)
      .thenMany(connection.close()))
  .subscribe();

 

Wie immer viel Spaß beim Testen und Ausprobieren !

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.