※本記事は、Todd Sharp による Producing and Consuming Messages in Node.JS with Oracle Advanced Queuing (AQ) を自動翻訳したものです。

2021年 11月 19日


最近のブログ記事をお読みになった方なら、このところメッセージングに少々入れ込んでいることにお気づきかもしれません。しばらく前に、JavaアプリケーションでOracle Advanced Queuing(Oracle AQ)とMicronautを使用する方法についてお話しし、もう少し最近ではREST APIからOracle AQを操作する方法について考察しました。ただし、まだ取り上げていない、Oracle AQをネイティブに操作する別の方法があります。 oracledb モジュールとNode.JSを使用する方法です。Nodeの人気についてはわざわざお話する必要はありませんし、Nodeを使ってOracle AQを操作する方法はとても簡単です。ですから、メッセージのエンキューとデキューが可能なExpressアプリケーションの作成に取り組みましょう。この例からさらに話を広げていく予定なので、近々掲載されるフォローアップ記事を楽しみにお待ちください。途中、わからない点が出てきたり、この機能をさらに深く追求したい場合は、Node oracledbモジュールによるOracle AQの使用方法に関するドキュメントをチェックしてみてください。

この記事では、次のタスクを実行します。

ヒント! Oracle AQ はAutonomous DB に無償で同梱されています。また、2つの無償のAutonomous DB インスタンスを Oracle Cloud の”Always free tier” で起動できるため、このデモを実行 (またはこのコードを本番環境で使用)しても、料金は一切発生しません!!

キューの作成

まずは、適切な権限とロールを持つユーザーがいることを確認します。Oracle AQの操作が初めての場合は、SQLエディタを開き、 admin ユーザーとして以下を実行します。

次に、新しいSQLエディタを開き、新たに作成した aqdemouser ユーザーとして接続します。キュー表、キューを作成してから、そのキューを起動する必要があります。キュー名は AQDEMOUSER.MQTT_BRIDGE_QUEUE になります(JavaScriptコードで後で使用する必要があるため、すぐ使えるようにしておいてください)。

キューはこの時点でメッセージの enqueue (送信) と dequeue (受信) ができます。キューをテストして、想定どおりに動作するかを確認したい場合は、以下を実行します。

JSON 文字列をメッセージとしてエンキューします。

次の出力が返されます。

PL/SQL procedure successfully completed.

JSON 文字列メッセージをデキューし、オブジェクトを解析し、 JSON オブジェクトから要素を取得します。

次の出力が返されます。

どのくらいかかりましたか、2 分ですか?そう長くはかかりませんでしたよね。

アプリケーションの作成

デモ・キューで使用するExpressアプリケーションを作成しましょう。このアプリケーションは、メッセージのエンキューとデキューを行うための、いくつかのHTTPエンドポイントを提供してくれます。もちろん、メッセージングは通常、ユーザーの操作や他のビジネス・ルールに反応してマイクロサービス(またはモノリス)で”目立たず”に実行されます。しかしこのデモでは、キューをテストする良い方法になってくれます。次のコマンドを実行してExpressアプリケーションを素早く作成します(Expressアプリケーション生成プログラムがインストールされていることを前提とします)。ついでに、 oracledb モジュールと (オプションで) debug モジュール(Pretty Debugメッセージ用)もインストールします。

アプリケーションがキューに接続するには、いくつかの機密値(ユーザー名、パスワードなど)が最終的に必要になります。それらの資格証明を環境変数に保存できるように dotenv を使用します。

次に .env というファイルをプロジェクト・ルート内に作成して資格証明を移入します。  

Heads Up! Node の oracledb モジュールを作成するには、少し構成する必要があります ( Instant Client と自律型 DB ウォレットが必要になります)。 使用しているオペレーティング・システムによっては、 モジュールのインストール方法をガイドしてくれる便利なドキュメント・オンライン があります。最初にインストールを終えてから、以下の .env ファイルでInstant Client とウォレットへのパスを使用します。

接続文字列とは? 接続文字列が何なのかよくわからない人のために説明すると、これは [dbname]_[type] の形式を使用する事前定義された文字列です。アプリケーションに必要なパフォーマンスのレベルと同時実行性に応じて [type] を選択できます。これらの値のリストはウォレット内の tnsnames.ora ファイルにあります。

キューイング・アクティビティのすべてをカプセル化するサービス・クラスの作成に移りましょう。このサービスは後でルーター(およびアプリケーションの他の場所)に入れることができます。

サービスの作成

QueueService は、キュー関連のデータベース処理をカプセル化する基本クラスです。そのため、 oracledb モジュールを含めて、資格証明をクラスに格納し、今すぐ追加するメソッドから使えるようにします。クラスを作成し、 init() メソッドを追加します(後でこのクラスをインスタンス化するときに値を渡します)。

大量のコールバックでクラスが乱雑にならないように、このクラスではたくさんのasync/awaitコードを使用します。また、各メソッドで接続プールを取得しなくて済むように、接続プールを取得するための”helper”メソッドを追加すると便利です。そこで、 getPool() というasyncメソッドを QueueService に追加しましょう。

完了したらすべてが確実に仕上がるように、処理をクローズするメソッドを追加します。

エンキュー (1つのメッセージ) メソッドの追加

1つのメッセージをキュー内に生成できるメソッドを追加します。それには、接続プールを取得し (this.getPool())、DBへの接続をそのプールから取得した後 (pool.getConnection())、キューを接続から取得します (connection.getQueue(this.queueName))。キューを取得したら、メッセージを生成して (queue.enqOne())、メッセージを含んだ文字列に渡します(この場合、JSON文字列に変換されたオブジェクト)。次に、トランザクションを実行して (connection.commit()) 、接続をクローズします (connection.close())。

必要な場合は queue.enqMany() メソッドを使って、一連のメッセージを一度にエンキューすることもできます。詳しくは、ドキュメントを参照してください。

デキュー (1つのメッセージ) メソッドの追加

次に、1つのメッセージをデキューするメソッドを追加しましょう。このプロセスはエンキューのプロセスと似ています。プール、接続、キューを取得し、 queue.deqOne() を呼び出し、実行してクローズします。 queue.deqOptions.wait を使って、 oracledb.AQ_DEQ_NO_WAIT  の値をキューのオプションに設定しています。このオプションを設定しないと、使用可能なメッセージをキューが待機してから出力が返ることになります。このデモではそれは避けたいところです。

デキュー (複数のメッセージ) メソッドの追加

エンキューの場合と同様に、複数のメッセージを同時にデキューすることができます。一連のメッセージをデキューするメソッドを追加しましょう。方法は上記の dequeueOne() メソッドとほぼ同じですが、 deqOne() の代わりに  deqMany() をキュー上で呼び出しています。整数を deqMany() メソッドに渡すと、各呼び出しでデキューするメッセージ数を制限することができます。

QueueService での作業はこれで十分です。あとは実行するだけです。

キュー・サービスの初期化

QueueService を使用する前に、資格証明をサービスに設定する必要があります。思い出していただきたいのですが、資格証明は環境変数として設定してあるので、init() メソッドを介して実行時にサービスに渡すだけで済みます。 app.js ファイルを開いて、次のように指定します。

せっかくなので、アプリの終了時にプールが適切にクローズすることを確認しておきましょう。

キュー/デキューへの HTTP エンドポイントの追加

アプリを構成し、サービスを作成して初期化しました!これで、いくつかのエンドポイントを公開して操作できるようになります。初期化した QueueService をindex.js に注入します。

エンキュー用エンドポイントの追加

メッセージをキューにPOST(送信)するためのエンドポイントを作成します。

このPOSTの動作は、cURLを介した2、3のメッセージでテストできます。

必要な場合は、 -i をcURLリクエストに追加して、レスポンス・ヘッダーを確認できます ( 201 ステータス応答の確認)。

デキュー用 (1つのメッセージ) エンドポイントの追加

1つのメッセージをGET(取得)するためのエンドポイントを作成します。

クイックcURLで動作をテストします。

デキュー用 (複数のメッセージ) エンドポイントの追加

一連のメッセージをGET(取得)するためのエンドポイントを追加します。

(残り2つのエンキューされたメッセージを含んだ)配列の取得を確認するもう1つのテストを実行します。

ボーナス情報: メッセージをストリームでデキュー!

特別ボーナスとして、メッセージの定期的なストリームを返すServer-Sent Eventを使用する /dequeueStream エンドポイントも追加できます。ここではただ、1秒につき1つのメッセージのデキューを試行する時間隔を作成します。メッセージが存在する場合は、オープン・ストリームに書き込みます。クライアントが切断されたら、時間隔をクリアし、res.end() を呼び出します。

この動作もcURLでテストできます。次のようにリクエストして数秒間様子を見ます。

データがまったく入ってこないことに留意してください(キューに保留中のメッセージがないため)。cURLウィンドウを別に開いて、2、3のメッセージを送信してみてください。

ストリームを観察していると、着信メッセージが表われます!

まとめ

この記事では、AQキュー、そのキューからメッセージを生成および使用するExpressアプリケーションを作成しました。今後の記事にご注目ください。このアプリケーションをさらに活用して、Oracle AQと他のメッセージング・プロトコル間の”ブリッジ”として動作させます。

Image by jplenio from Pixabay