※本記事は、Todd Sharp による “Producing and Consuming Messages in Node.JS with Oracle Advanced Queuing (AQ)“ を自動翻訳したものです。
2021年 11月 19日
最近のブログ記事をお読みになった方なら、このところメッセージングに少々入れ込んでいることにお気づきかもしれません。しばらく前に、JavaアプリケーションでOracle Advanced Queuing(Oracle AQ)とMicronautを使用する方法についてお話しし、もう少し最近ではREST APIからOracle AQを操作する方法について考察しました。ただし、まだ取り上げていない、Oracle AQをネイティブに操作する別の方法があります。 oracledb モジュールとNode.JSを使用する方法です。Nodeの人気についてはわざわざお話する必要はありませんし、Nodeを使ってOracle AQを操作する方法はとても簡単です。ですから、メッセージのエンキューとデキューが可能なExpressアプリケーションの作成に取り組みましょう。この例からさらに話を広げていく予定なので、近々掲載されるフォローアップ記事を楽しみにお待ちください。途中、わからない点が出てきたり、この機能をさらに深く追求したい場合は、Node oracledbモジュールによるOracle AQの使用方法に関するドキュメントをチェックしてみてください。
この記事では、次のタスクを実行します。
ヒント! Oracle AQ はAutonomous DB に無償で同梱されています。また、2つの無償のAutonomous DB インスタンスを Oracle Cloud の”Always free tier” で起動できるため、このデモを実行 (またはこのコードを本番環境で使用)しても、料金は一切発生しません!!
キューの作成
まずは、適切な権限とロールを持つユーザーがいることを確認します。Oracle AQの操作が初めての場合は、SQLエディタを開き、 admin ユーザーとして以下を実行します。
次に、新しいSQLエディタを開き、新たに作成した aqdemouser ユーザーとして接続します。キュー表、キューを作成してから、そのキューを起動する必要があります。キュー名は AQDEMOUSER.MQTT_BRIDGE_QUEUE になります(JavaScriptコードで後で使用する必要があるため、すぐ使えるようにしておいてください)。
キューはこの時点でメッセージの enqueue (送信) と dequeue (受信) ができます。キューをテストして、想定どおりに動作するかを確認したい場合は、以下を実行します。
JSON 文字列をメッセージとしてエンキューします。
次の出力が返されます。
PL/SQL procedure successfully completed.
JSON 文字列メッセージをデキューし、オブジェクトを解析し、 JSON オブジェクトから要素を取得します。
次の出力が返されます。
どのくらいかかりましたか、2 分ですか?そう長くはかかりませんでしたよね。
アプリケーションの作成
デモ・キューで使用するExpressアプリケーションを作成しましょう。このアプリケーションは、メッセージのエンキューとデキューを行うための、いくつかのHTTPエンドポイントを提供してくれます。もちろん、メッセージングは通常、ユーザーの操作や他のビジネス・ルールに反応してマイクロサービス(またはモノリス)で”目立たず”に実行されます。しかしこのデモでは、キューをテストする良い方法になってくれます。次のコマンドを実行してExpressアプリケーションを素早く作成します(Expressアプリケーション生成プログラムがインストールされていることを前提とします)。ついでに、 oracledb モジュールと (オプションで) debug モジュール(Pretty Debugメッセージ用)もインストールします。
アプリケーションがキューに接続するには、いくつかの機密値(ユーザー名、パスワードなど)が最終的に必要になります。それらの資格証明を環境変数に保存できるように dotenv を使用します。
次に .env というファイルをプロジェクト・ルート内に作成して資格証明を移入します。
Heads Up! Node の oracledb モジュールを作成するには、少し構成する必要があります ( Instant Client と自律型 DB ウォレットが必要になります)。 使用しているオペレーティング・システムによっては、 モジュールのインストール方法をガイドしてくれる便利なドキュメント・オンライン があります。最初にインストールを終えてから、以下の .env ファイルでInstant Client とウォレットへのパスを使用します。
接続文字列とは? 接続文字列が何なのかよくわからない人のために説明すると、これは [dbname]_[type] の形式を使用する事前定義された文字列です。アプリケーションに必要なパフォーマンスのレベルと同時実行性に応じて [type] を選択できます。これらの値のリストはウォレット内の tnsnames.ora ファイルにあります。
キューイング・アクティビティのすべてをカプセル化するサービス・クラスの作成に移りましょう。このサービスは後でルーター(およびアプリケーションの他の場所)に入れることができます。
サービスの作成
QueueService は、キュー関連のデータベース処理をカプセル化する基本クラスです。そのため、 oracledb モジュールを含めて、資格証明をクラスに格納し、今すぐ追加するメソッドから使えるようにします。クラスを作成し、 init() メソッドを追加します(後でこのクラスをインスタンス化するときに値を渡します)。
大量のコールバックでクラスが乱雑にならないように、このクラスではたくさんのasync/awaitコードを使用します。また、各メソッドで接続プールを取得しなくて済むように、接続プールを取得するための”helper”メソッドを追加すると便利です。そこで、 getPool() というasyncメソッドを QueueService に追加しましょう。
完了したらすべてが確実に仕上がるように、処理をクローズするメソッドを追加します。
エンキュー (1つのメッセージ) メソッドの追加
1つのメッセージをキュー内に生成できるメソッドを追加します。それには、接続プールを取得し (this.getPool())、DBへの接続をそのプールから取得した後 (pool.getConnection())、キューを接続から取得します (connection.getQueue(this.queueName))。キューを取得したら、メッセージを生成して (queue.enqOne())、メッセージを含んだ文字列に渡します(この場合、JSON文字列に変換されたオブジェクト)。次に、トランザクションを実行して (connection.commit()) 、接続をクローズします (connection.close())。
必要な場合は queue.enqMany() メソッドを使って、一連のメッセージを一度にエンキューすることもできます。詳しくは、ドキュメントを参照してください。
デキュー (1つのメッセージ) メソッドの追加
次に、1つのメッセージをデキューするメソッドを追加しましょう。このプロセスはエンキューのプロセスと似ています。プール、接続、キューを取得し、 queue.deqOne() を呼び出し、実行してクローズします。 queue.deqOptions.wait を使って、 oracledb.AQ_DEQ_NO_WAIT の値をキューのオプションに設定しています。このオプションを設定しないと、使用可能なメッセージをキューが待機してから出力が返ることになります。このデモではそれは避けたいところです。
デキュー (複数のメッセージ) メソッドの追加
エンキューの場合と同様に、複数のメッセージを同時にデキューすることができます。一連のメッセージをデキューするメソッドを追加しましょう。方法は上記の dequeueOne() メソッドとほぼ同じですが、 deqOne() の代わりに deqMany() をキュー上で呼び出しています。整数を deqMany() メソッドに渡すと、各呼び出しでデキューするメッセージ数を制限することができます。
QueueService での作業はこれで十分です。あとは実行するだけです。
キュー・サービスの初期化
QueueService を使用する前に、資格証明をサービスに設定する必要があります。思い出していただきたいのですが、資格証明は環境変数として設定してあるので、init() メソッドを介して実行時にサービスに渡すだけで済みます。 app.js ファイルを開いて、次のように指定します。
せっかくなので、アプリの終了時にプールが適切にクローズすることを確認しておきましょう。
キュー/デキューへの HTTP エンドポイントの追加
アプリを構成し、サービスを作成して初期化しました!これで、いくつかのエンドポイントを公開して操作できるようになります。初期化した QueueService をindex.js に注入します。
エンキュー用エンドポイントの追加
メッセージをキューにPOST(送信)するためのエンドポイントを作成します。
このPOSTの動作は、cURLを介した2、3のメッセージでテストできます。
必要な場合は、 -i をcURLリクエストに追加して、レスポンス・ヘッダーを確認できます ( 201 ステータス応答の確認)。
デキュー用 (1つのメッセージ) エンドポイントの追加
1つのメッセージをGET(取得)するためのエンドポイントを作成します。
クイックcURLで動作をテストします。
デキュー用 (複数のメッセージ) エンドポイントの追加
一連のメッセージをGET(取得)するためのエンドポイントを追加します。
(残り2つのエンキューされたメッセージを含んだ)配列の取得を確認するもう1つのテストを実行します。
ボーナス情報: メッセージをストリームでデキュー!
特別ボーナスとして、メッセージの定期的なストリームを返すServer-Sent Eventを使用する /dequeueStream エンドポイントも追加できます。ここではただ、1秒につき1つのメッセージのデキューを試行する時間隔を作成します。メッセージが存在する場合は、オープン・ストリームに書き込みます。クライアントが切断されたら、時間隔をクリアし、res.end() を呼び出します。
この動作もcURLでテストできます。次のようにリクエストして数秒間様子を見ます。
データがまったく入ってこないことに留意してください(キューに保留中のメッセージがないため)。cURLウィンドウを別に開いて、2、3のメッセージを送信してみてください。
ストリームを観察していると、着信メッセージが表われます!
まとめ
この記事では、AQキュー、そのキューからメッセージを生成および使用するExpressアプリケーションを作成しました。今後の記事にご注目ください。このアプリケーションをさらに活用して、Oracle AQと他のメッセージング・プロトコル間の”ブリッジ”として動作させます。
