X

Technologie - Trends - Tipps&Tricks
in deutscher Sprache

JSON nach Oracle Database pumpen mit REST und Kafka

Marcel Boermann-Pfeifer
Principal Solution Engineer

Wir von Oracle werden nicht müde zu erzählen, daß man mit unserer "Converged Database", der ganz normalen Oracle Datenbank mit ihren allumfassenden Möglichkeiten, auch JSON Dokumente verarbeiten kann. Und dies ohne SQL Kenntnisse über eine einfache API, SODA genannt, Simple Oracle Document Access. Es ist lediglich eine unterstützte Programmiersprache zu wählen wie Python oder Java, und der entsprechende Oracle Datenbank-Treiber Oracle JDBC oder cx_Oracle enthält auch gleich die SODA API. Doch nebenbei hat sich eine kleine Perle eingeschlichen, denn man kommt sogar gänzlich ohne Datenbank-Treiber und sprach-abhängige APIs aus, wenn man alternativ die SODA REST API verwendet. Umso leichter kommen dann die JSON Daten z.B. aus einem Kafka Cluster in die Datenbank hinein oder wieder heraus. Ein gut denkbarer Anwendungsfall, denn die Oracle Datenbank ist bekannt dafür, JSON besonders schnell zu verarbeiten und zusätzliche Unterstützung durch Verschlüsselung, Backup, Partitionierung und Kompression der Daten zu bieten.

Dieser Beitrag ist sinnvollerweise in drei Bereiche aufgeteilt -
Die SODA REST Schnittstelle der Oracle Datenbank,
Dem Aufbau eines Kafka Clusters und der Ablage von Dokumenten darin,
und der Erstellung eines kleinen Konnektors zwischen Kafka und der Datenbank, der die Dokumente aus Kafka an die Datenbank zur Ablage weiterreicht.
 

Die SODA REST API

ORDS logo

Widmen wir uns zunächst der SODA REST API. Sie steht jeder Datenbank-Edition kostenlos zur Verfügung und ist fester Bestandteil einer jeden ORDS-Installation. Mit ORDS sind die "Oracle REST Data Services" gemeint, mit Hilfe derer man sowohl das Low-Code Tool "Application Express" betreibt als auch aus relationalen Tabellen, Views und PL/SQL  REST Services erzeugen kann.

Ist ORDS einmal installiert und eingerichtet , z.B. als Docker image wie hier beschrieben oder als fester Bestandteil jeder Autonomous Datenbank in unserer Cloud, ist es nur noch ein kurzer Weg um auch SODA zu aktivieren. Dazu genügt per SQL die Freischaltung eines Datenbank-Benutzers für REST Zugriffe und die Zugriffsberechtigung per SODA auf denselben Datenbank-Benutzer:

Angemeldet in der Oracle DB als Benutzer SYS, SYSTEM oder ADMIN:

GRANT SODA_APP to <dbuser>;
Angemeldet als betroffener <dbuser>:
EXEC ORDS.ENABLE_SCHEMA;
COMMIT;

Ab jetzt kann mit SODA REST gearbeitet werden. Der allererste Schritt wird immer sein, eine neue Collection anzulegen, ein Ablageort in der Datenbak für JSON Dokumente. Das geschieht in typischer REST-Manier mit einer PUT Operation auf eine in der URL benannten collection:

curl --request PUT 'http://<hostname>:<port>/ords/<dbuser>/soda/latest/<collection>'
oder als volles aber verfälschtes Beispiel mit Basic Authentication gegen eine Autonomous Datenbank. Den Authorization-Header hat "Postman" generiert aus einem Benutzernamen und Kennwort, genauso wie den gesamten curl-Aufruf. Ein sehr hilfreiches Tool!
curl --location --request PUT 'https://n7pmwsc8te8fjty-meinedb.adb.eu-frankfurt-1.oraclecloudapps.com/ords/json/soda/latest/kafkasink' \
--header 'Authorization: Basic SlNPTjpyadayadasZGVaMzIjIw=='

Diese Collection wird in der Datenbank repräsentiert als gleichnamige Tabelle mit einer BLOB (bzw. JSON ab Version 20c) -Spalte für die JSON Dokumente und einigen Metadaten wie automatisch erzeugter ID-Spalte usw. Es ist empfohlen, aber nicht unbedingt nötig, für jeden JSON-Dokumenttyp (Bestellung, Artikel, Adresse, IoT-Komponente...) eine eigene Collection anzulegen. Sonst wird es später in Abfragen etwas aufwändiger, diese auseinanderzuhalten.

describe kafkasink
Name          Null?    Typ           
------------- -------- ------------- 
ID            NOT NULL VARCHAR2(255) 
CREATED_ON    NOT NULL TIMESTAMP(6)  
LAST_MODIFIED NOT NULL TIMESTAMP(6)  
VERSION       NOT NULL VARCHAR2(255) 
JSON_DOCUMENT          BLOB          

Ob das Anlegen der Collection erfolgreich war, können Sie ebenso via REST überprüfen. Ein kleiner SODA GET Aufruf liefert eine Liste aller Collections im Zugriff:

curl http://<hostname>:<port>/ords/<dbuser>/soda/latest/

Oder wie in meinem Beispiel mit einer Autonomous Datenbank:

curl 'https://n7pmwsc8te8fjty-meinedb.adb.eu-frankfurt-1.oraclecloudapps.com/ords/json/soda/latest/' \
--header 'Authorization: Basic SlNPTjpyadayadasZGVaMzIjIw'
Selbstverständlich gelangen Sie an die abgelegten JSON Dokumente mittels SQL und können Operationen darauf mit klassischen (objekt-)relationalen Tabellen verknüpfen und auch bestehende SQL-Erweiterungen für JSON Dokumente darauf anwenden. Das ist ein echter Mehrwert und guter Grund, warum man überhaupt JSON in der Oracle Datenbank ablegen sollte.

Die SODA REST API unterstützt typische CRUD- und komplexe, schachtelbare Abfrage-Operationen (sogar speziell auf Geodaten). Queries werden in JSON-Dokumenten formuliert, auch Updates bestehender JSON Dokumente werden als Patching-Instruktionen (was kommt wohin, wird wie geändert) in JSON formuliert. Die meisten Operationen werden per POST request der Datenbank übergeben.

Ein neues, beliebiges JSON Dokument wird wie nachfolgend eingefügt und als Resultat der Operation wird ein Antwort-Dokument zurückgegeben mit einer neuen ID darin:

curl --request POST 'https://n7pmwsc8te8fjty-meinedb.adb.eu-frankfurt-1.oraclecloudapps.com/ords/json/soda/latest/kafkasink' \
--header 'Authorization: Basic SlNPTjpyadayadasZGVaMzIjIw==' \
--header 'Content-Type: application/json' \
--data '{"test": "testing"}'

Das knappe Ergebnis-Dokument als Beispiel:

{"items":[{"id":"A5D07365A2AB4887990120FE4868C5B9","etag":"D1CABE4B743B4806A72D40D7490BAA7B","lastModified":"2020-11-05T15:24:57.874076000Z","created":"2020-11-05T15:24:57.874076000Z"}],"hasMore":false,"count":1}

Haben Sie nun eines oder mehrere Dokumente in der Datenbank, können Sie sie per REST (und SQL) abfragen. Ein kleines Beispiel für eine Abfrage per REST, die das eben eingefügte Dokument wiederfindet, sieht wie folgt aus:

curl --request POST 'https://n7pmwsc8te8fjty-meinedb.adb.eu-frankfurt-1.oraclecloudapps.com/ords/json/soda/latest/custom-actions/query/kafkasink' \
--header 'Authorization: Basic SlNPTjpyadayadasZGVaMzIjIw==' \
--header 'Content-Type: application/json' \
--data '{"test" : { "$startsWith" : "tes"}}

Als Ergebnis kommt ein JSON Dokument mit einer Ergebnisliste aus Metadaten und den eigentlichen gefundenen Dokumenten. Möchten Sie nur die IDs der gefundenen Dokumente erhalten oder nur die Dokumente selbst ohne Metadaten, können Sie der URL entsprechende Parameter anhängen:

?action=query&fields=value
oder
?action=query&fields=id

Eine umfassende Syntax-Beschreibung der Queries per REST findet sich in unserer Dokumentation:Overview of SODA Filter Specifications

Es ist auch möglich, gleich mehrere Dokumente auf einmal einzufügen in einer Art von "Micro-Batch" Operation. Das ist sinnvoll, wenn z.B. pro Sekunde sehr viele Dokumente erzeugt und dann gesammelt regelmäßig zur Datenbank geschickt werden. Ein typischer Fall also für den Kafka Messaging und Streaming-Framework als Vermittler und Sammler solcher Dokumente.
Um mehrere Dokumente auf einmal einzuspielen sind diese in ein übergeordnetes JSON Dokument einzubetten, ein simpler Array bestehend aus einzelnen JSON Dokumenten. Der Größe halber verweise ich hier auf unsere Beispiele, die jeder ORDS Installation beiliegen: unterhalb des ORDS Verzeichnisses liegt bei examples/soda/getting-started ein ganzer Stapel an JSON Dokumenten für Queries, Indexing, Inserts und auch Bulk inserts. "POList.json" ist ein solches bulk insert Dokument, das genau 70 Bestellungen enthält. Mit folgendem beispielhaften Aufruf gelangen alle 70 Bestellungen auf einmal in die Datenbank:

curl --location --request POST 'https://n7pmwsc8te8fjty-meinedb.adb.eu-frankfurt-1.oraclecloudapps.com/ords/json/soda/latest/custom-actions/insert/kafkasink' \
--header 'Authorization: Basic SlNPTjpyadayadasZGVaMzIjIw==' \
--header 'Content-Type: application/json' \
--data-binary '@/tmp/ords-19.2.0/examples/soda/getting-started/POList.json'

Ich möchte gar nicht näher auf weitere Möglichkeiten und Aspekte der SODA REST API eingehen, denn die beiden wichtigsten Operationen für das Pumpen von JSON Dokumenten von Kafka via REST in die Datenbank sind nun bereits bekannt: das Anlegen von Collections und das Hochladen einzelner oder mehrerer Dokumente.

Nun alle SODA REST Operationen noch als kurzer tabellarischer Überblick - und ein Hinweis auf unsere Dokumentation zu deren Parametrisierung, sprich der nötigen Syntax im JSON Format: Using SODA for REST

Liste aller Collections GET /ords/<dbuser>/soda/latest
Collection erstellen PUT /ords/<dbuser>/soda/latest/<collection>
Dokument hinzufügen POST /ords/<dbuser>/soda/latest/<collection>
Dokumente hinzufügen POST /ords/<dbuser>/soda/latest/custom-actions/insert/<collection>
Collection abfragen POST /ords/<dbuser>/soda/latest/custom-actions/query/<collection>
Dokument abrufen GET /ords/<dbuser>/soda/latest/<collection>/<id>
Dokument überschreiben PUT /ords/<dbuser>/soda/latest/<collection>/<id>
Dokument löschen DELETE /ords/<dbuser>/soda/latest/<collection>/<id>
Dokumente löschen POST /ords/<dbuser>/soda/latest/custom-actions/delete/<collection>
Alle Dokumente löschen POST /ords/<dbuser>/soda/latest/custom-actions/truncate/<collection>
Dokument ändern / patchen PATCH /ords/<dbuser>/soda/latest/<collection>
Index erstellen / löschen POST /ords/<dbuser>/soda/latest/custom-actions/index/<collection>
Collection löschen DELETE /ords/<dbuser>/soda/latest

 

 

 

 

 

 

 

 

 

 

 

 

Der Apache Kafka Messaging bzw. Streaming Framework

Kafka Logo

Nun ist es nicht mehr weit, bis Nachrichten im JSON Format aus Kafka in die Datenbank gelangen. Es gibt mit dem optionalen bzw. zusätzlichen KafkaConnect Framework vorgefertigte Konnektoren, die beispielsweise JSON Dokumente in relationale Strukturen überführen (mapping genannt) und in die Datenbank per JDBC einfügen bzw. auslesen können. Falls die Zielstruktur nicht bekannt sein sollte (oder darf) und einfach beliebiges JSON abzulegen sei ist der Weg jedoch nicht geeignet. Das Mapping kostet obendrein Zeit und Performance, was auch nicht immer wünschenswert ist. Zum Glück existieren weitere Konnektoren z.B. für Dateien, aber eben auch für REST Services bzw. HTTP "sinks" und "sources" in der Nomenklatur von KafkaConnect. Kurze lauffähige Beispiele konnte ich nur in der immer proprietärer werdenden Welt von confluent finden, einer Organisation die sich dem Aufbau und Vertrieb einer Kafka- und Docker-basierten Laufzeitumgebung widmet. Andere vorgefertigte Konnektoren unterstützten einfach keine HTTP Authentisierung oder ließen andere Features aus. Also entschied ich mich kurzerhand selbst ein wenig Java Code zu schreiben und zunächst ohne die KafkaConnect API  auszukommen. Vorerst.

Bauen wir uns zunächst zum Test ein kleines Kafka-Cluster auf, um Nachrichten dorthin zu schicken und später in die Datenbank weiterzuleiten. Zwei möglichst einfache Varianten sind zum Einen die von confluent vorgefertigten Docker-Images, die mit wenigen Zeilen in einer Shell zum Leben erweckt werden, und zum Anderen die Oracle Cloud mit ihrem Streaming-Dienst, die mit wenigen Mausklicks ein Produktives Cluster entstehen läßt, das man auf Wunsch über das Internet erreichen kann.

Eine einfache Kafka-Umgebung können Sie auf einer Linux Maschine mit folgenden Kommandos ins Leben rufen:

docker run -d \
    --net=host \
    --name=zookeeper \
    -e ZOOKEEPER_CLIENT_PORT=32181 \
    -e ZOOKEEPER_TICK_TIME=2000 \
    confluentinc/cp-zookeeper:5.0.0

docker run -d \
    --net=host \
    --name=kafka \
    -e KAFKA_ZOOKEEPER_CONNECT=localhost:32181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092 \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
    confluentinc/cp-kafka:5.0.0

docker run -d \
  --net=host \
  --name=schema-registry \
  -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=localhost:32181 \
  -e SCHEMA_REGISTRY_HOST_NAME=localhost \
  -e SCHEMA_REGISTRY_LISTENERS=http://localhost:8081 \
  confluentinc/cp-schema-registry:5.0.0

Eigentlich ist es ausreichend, nur die mit "kafka" und "zookeeper" benannten Container zu erzeugen. Vielleicht ist Ihnen aufgefallen, daß in dem Beispiel Docker Images verwendet werden, die von confluent stammen ...

Nun wird ein sogenanntes "Topic" in Kafka angelegt, in dem die JSON Dokumente abgelegt werden können. Kafka ist den altbekannten Messaging Systemen sehr ähnlich, nur schlanker und vereinfacht, somit können bereits bekannte Konzepte leicht übernommen werden. Das Anlegen des Topics geschieht ohne eine zusätzliche Kafka-Installation, indem temporär ein weiterer Kafka Docker Container erzeugt wird, der über seine Kommandozeile ein entsprechendes Kommando absetzt:

docker run \
  --net=host \
  --rm \
  confluentinc/cp-kafka:5.0.0 \
  kafka-topics --create --topic SODAtopic --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:32181

Und auf gleichem Wege können kleine Test-Nachrichten in das eben erzeugte Topic abgelegt werden, um sie später auszulesen und zu versenden:

docker run \
  --net=host \
  --rm \
  confluentinc/cp-kafka:5.0.0 \
  bash -c 'echo {\"testmessage\":\"testing\"} | kafka-console-producer --topic SODAtopic --request-required-acks 1  --broker-list localhost:29092'

 

Alternativ kann in der Oracle Cloud als "Streaming Service" ein produktives Kafka-Cluster erzeugt werden mit Verschlüsselung, Authentisierung, Verfügbarkeit und Patching von Oracle. Ein kleines Terraform-Skript könnte einen neuen Stream-Pool erzeugen oder Sie klicken ein neues Cluster schnell und einfach mit der Maus:
Über das Menü "Analytics"->"Streaming" kommen Sie an den Button "Stream erstellen".

Dort geben Sie den Namen eines Streams an, z.B. SODAtopic und wählen zusätzlich "Neuen StreamPool erstellen", und "Öffentlicher Endpunkt", damit sofort ein Zugang per Internet zur Verfügung steht. Alle anderen Angaben lassen Sie vorab auf den default-Werten.


 

Ist der Stream einmal angelegt, bekommen Sie Zugangsdaten für diesen Kafka-Cluster über den Klick auf den StreamPool und den Link "Kafka-Verbindungseistellungen":

Um mit Ihrem Cloud-Benutzernamen auf den Cluster zuzugreifen, muß Ihr Cloud Benutzer ein Auth-Token besitzen. Das erzeugen Sie über die Benutzerverwaltung
in der Oracle Cloud (Identity -> User -> Ihr Benutzer, dann "Authentifizierungstoken", "Token generieren"). Bitte merken Sie sich das Token gut, Sie können es nachträglich nicht mehr einsehen oder ändern. Im Notfall erzeugen Sie einfach ein neues Token.

Mit dem Browser in der Oracle Cloud unterwegs, können Sie auch gleich eine Test-Nachricht in das eben erzeugte Topic einfügen durch Klick auf Analytics -> Streaming -> SODAtopic -> Testnachricht erzeugen

 

Nun sind wir bei beiden Umgebungen, Docker oder Cloud, an gleicher Stelle angelangt und könnten mit demselben recht kleinen Java Programm sowohl aus einer lokalen als auch einer Cloud Umgebung Nachrichten auslesen und an eine Datebank per HTTP / REST Aufruf senden. Das Programm wird hier funktional erklärt, steht aber voll ausformuliert auch auf github zur Verfügung.

 

Die Weiterleitung der Kafka-Nachrichten per SODA

Das kleine Programm, das Nachrichten aus Kafka an die Datenbank weiterleiten soll, ist aus Sicht von Kafka ein "Consumer". Der lauscht am Cluster, ob neue Nachrichten für das abonnierte Topic zur Verfügug stehen. In Java formuliert sieht das wie folgt aus:


        //verbinde Dich mit dem Kafka Cluster
        KafkaConsumer<string, string%gt; consumer = new KafkaConsumer<>(connectProps);;
        //registriere Dich als Consumer am Topic
        consumer.subscribe(Collections.singletonList("SODAtopic"));
        //wiederhole für immer
        while (true) {
            //warte max. 1 Sekunde oder bis neue Nachrichten kommen
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            //sende jede in diesem Durchgang erhaltene Nachricht an die Datenbank
            for (ConsumerRecord<String, String> record : records) {
                postToDatabase(record.value());
            }
            //bestätige den Erhalt der Nachrichten
            pusher.consumer.commitSync();
        }

 

Viel ist glücklicherweise und offensichtlich nicht zu tun. Es ist lediglich noch zu klären, wie der Datensatz/Record/Nachricht in Java an die Datenbank gesandt wird - und wie die Connect-Informationen für den Kafka-Cluster aussehen. Der Versand an die Datenbank erfolgt über einen HTTP Zugriff genau wie mit den obigen Beispielen, nur nicht mit "curl" sondern über einen Java "HTTP/REST Client" mit vielen Features wie Verschlüsselung, Authentisierung usw.

        Client httpClient = ClientBuilder.newClient();
        WebTarget target = httpClient.target(url);
        Invocation.Builder invocationBuilder = target
                .request(MediaType.APPLICATION_JSON)
                .header("Authorization", authorizationHeaderValue)
                .accept("application/json");
        Response response = invocationBuilder.post(Entity.json(kafkaRecord));

Das sieht doch recht simpel aus - kein Mapping zwischen JSON und relationalen Stukturen, kein JDBC oder gar Persistenz-Framework, einfach nur valides JSON weiterleiten und den Rest macht später die Datenbank z.B. beim Auswerten.

Zwischen Cloud Umgebug und lokaler Docker Umgebung sieht der Code auch identisch aus. Es sind lediglich andere Verbindungs-Informationen zu verwenden, weil der Zugriff auf ein produktives Cluster in der Cloud z.B. verschlüsselt sein sollte, genauso wie eine Authentisierung überhaupt erfolgen sollte.

Für den Zugriff auf ein lokales Docker-basiertes Kafka-Cluster genügen beispielhaft folgende Einträge:

 

        props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:29092");
        props.setProperty("group.id", "sodaGroup");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(props);

Und für den Zugrff auf den Cluster in der Oracle Cloud sind die Informationen nötig, die beim Anlegen des Streampools einsehbar waren. Zusätzlich noch der in der Cloud Oberfläche erzugte Authentisierungs Token.

        props.setProperty("bootstrap.servers", "cell-1.streaming.eu-amsterdam-1.oci.oraclecloud.com:9092");
        String authToken = "U.76NGNraByadayadar3";
        String tenancyName = "oracleemea";
        String username = "marcel.pfeifer@oracle.com";
        String streamPoolId = "ocid1.streampool.oc1.eu-amsterdam-1.amaaaaaaop3l36yardp4hio2jsjbyadayadavspenpxjd67a";
        props.setProperty("security.protocol", "SASL_SSL");
        props.setProperty("sasl.mechanism", "PLAIN");
        props.setProperty("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                + tenancyName + "/"
                + username + "/"
                + streamPoolId + "\" "
                + "password=\""
                + authToken + "\";");
        props.setProperty("group.id", "oracleGroup");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Der prinzipiell lauffähige und getestete Code mit zugehörigem Maven POM liegt zum Download bereit auf github.com. Um Testnachrichten zu generieren liegt dem Projekt ein kleiner Producer bei, falls der Kommandozeilen-Zugriff per Docker unangenehm erscheint.

Eine fertige Lösung könnte man ebenso in einen Docker Container verpacken und z.B. in einer VM in der Oracle Cloud betreiben oder in einem Managed Kubernetes Cluster ebenso dort, genannt OKE. Dann wären alle Komponenten wie Kafka, Daten-Pumpe und die (Autonomous) Datenbank nahe beieinander und schön performant.

Ich wünsche viel Spaß beim Testen und Benchmarking!

 

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.