※本記事は、Eric J. Brunoによる”HTML5 Server-Sent Events with Micronaut.io and Java“を翻訳したものです。


シンプルで信頼できるメッセージ・サービスを構築する

著者:Eric J. Bruno

2020年4月6日

 

筆者は最近、マイクロサービス・フレームワークのクラウド側実装としてMicronautを選択した、エンド・ツー・エンドIoTプロジェクトに関わりました。Micronautでは、KafkaとRabbitMQ、そして2つのHTML5メッセージ・パラダイムであるServer-Sent Events(SSE)とWebSocketを組込みでサポートしています。しかし、パブリッシュ/サブスクライブやキューベースのメッセージングなどを目的に、これらを有効に使用するためには、多少の手間がかかります。本記事では、MicronautのSSEサポートを使用して構築する、シンプルで信頼できるメッセージ・システムについて考えます。

プロジェクトのWebサイトに記載されているように、MicronautはJVMをベースとした最新のフルスタック・フレームワークで、簡単にテストできるモジュール式のマイクロサービスやサーバーレス・アプリケーションを構築するためのものです。Micronautでは、依存性注入やアスペクト指向プログラミング、事前コンパイルを使用して、超高速な起動、優れたスループット、低メモリ・オーバーヘッドを実現しています。そのため、Micronautは、インスタンスが短時間でスピンアップおよびスピンダウンされるクラウドベースのマイクロサービスで、優れた選択肢となっています。

Micronautの紹介としては、まずJonas Havers氏の記事「Micronautでマイクロサービスを構築する」をご覧になってから、細かい索引が付いたユーザー・ガイドをざっとお読みください。

本記事のすべてのソース・コードは、こちらからダウンロードすることができます。このファイルには、次に示す3つの主要なプロジェクトが含まれています。

  • MessageServer:QueueクラスとTopicControllerクラスを含むMicronaut SSEサーバーで、Micronautの組込みSSEサポートを利用する
  • TemperatureSender:温度測定デバイスをシミュレートしたもので、Micronaut SSEを使って温度の測定値を送信する
  • Thermometer:JavaScriptを使用したWebアプリケーションで、Micronautサーバーをリスニングして最新の温度を受け取る

ダウンロード・ファイルには、独自のSSELibraryに加え、サンプル・クライアントであるQueueSenderおよびQueueReceiverの2つも含まれています。

それでは早速、SSEメッセージング・サポートの説明から始めます。

 

Server-Sent Eventsの概要

HTML5 SSEは、ブラウザ(または任意の実装アプリケーション)がHTTPまたはHTTPSでサーバーからアップデートを受け取れるようにするサーバー・プッシュ・テクノロジーです。SSEは、ブラウザの外部、つまり任意の言語で書かれたアプリケーション間でも動作します。SSEには別個のサーバーは必要ありません。HTTPおよびHTTPSで動作し、ファイアウォールも使用でき、シンプルです。

HTML5 SSEメッセージングでは、2つの主要なコンポーネントを使用しています。1つはテキストベースのメッセージをシンプルなプロトコルで送信するtext/event-stream MIMEタイプであり、もう1つはメッセージを受信するイベント・リスナーを持つEventSourceインタフェースです。

SSEの詳細については、W3CのHTML5仕様をご覧ください。サンプル実装に関しては、筆者による記事「HTML5 Server-Sent Events and Examples」もご覧いただけます。

 

Micronaut.ioによるSSEプログラミング

SSEの第一歩として、Micronautの@Controller属性を使ってMicronautのControllerクラス(実質的にはHTTPリスナー)を作ってみます。このプロジェクトでは、2つのControllerクラスを作りました。1つはキューベースのメッセージング用、もう1つはトピックベース(パブリッシュ/サブスクライブ)のメッセージング用です。

@Controller("/messageserver/api/") 
public class QueueController extends Messenger {
    ...
}

@Controller("/messageserver/api") 
public class TopicController extends Messenger {
    …
}

図1に示すように、Micronautベースの実装は送信アプリケーションと受信アプリケーションの間に位置します。

Micronaut’s location between the sender and receiver apps

 

図1:送信アプリと受信アプリの間に位置するMicronaut

各Controllerに、URIパス/messageserver/apiを指定しています。このパスはMicronautサーバーのベースURIと連結されます。Messengerベース・クラスについては、後ほど説明します。ここでは、イベントの送受信を行う一部のコードに注目します。リスト1に示すのは、イベントを受信して@Get RESTエンドポイントを設定するコードです。このコードでは、エンドポイントの名前(この場合はQueueまたはTopic)と、リスニングするキューの名前を指定しています。

リスト1:キューに格納されたSSEメッセージを受信するRESTエンドポイント

@Get("/queue/{name}")
public Publisher<Event<String>> index(Optional<String> name) { 
    // Determine queue to listen to
    Queue dest = getQueue( name.get() );

    return Flowable.generate(() -> 0, (i, emitter) -> { 
        // Get the message first...
        Message msg = dest.getNextMessage();
        String data = new String( msg.getData() );
            
        // Then deliver it…
        emitter.onNext( 
            Event.of(data)
        );
            
        // Finally delete it after delivery...
       dest.deleteMessage( msg.getId() );
    });
}

@Getアノテーションでは、以下がHTTP GET呼出しハンドラであること、そしてURLの一部としてqueueおよびキュー名が含まれることを示しています。エンドポイントが呼び出されたとき、getQueueでは、与えられた名前を使ってHashMap内にあるQueue宛先オブジェクトを検索します。見つからない場合は、新しいQueueオブジェクトを作成し、与えられた名前を使ってHashMapに挿入します。

次に、Flowableエミッタを使ってEventオブジェクトを生成するリアクティブ・ストリーム・パブリッシャが利用できるようになったときに、そこからメッセージが送信されます。メッセージは、最大1つのサーバーに配信されるまで、キュー・パラダイムによって保存されます。その結果、それぞれのメッセージが配信された後、キューに格納されたメッセージはエミッタによって削除されます。

アプリケーションから宛先にメッセージを送信する際には、HTTP POSTを使います(リスト2参照)。MicronautのPOSTハンドラ(@Postアノテーションの付加により示されています)では、最初に宛先を名前で検索します。

リスト2:配信のために宛先キューに向けてメッセージを送信するHTTP POSTメソッド

// Content-Type: text/event-stream
@Consumes(MediaType.TEXT_EVENT_STREAM) 
@Post("/queue/publish")  
public HttpResponse queue( Session session,  
                           HttpRequest<?> request,
                           @Body String data ) {
    try {
        HttpParameters params = request.getParameters();
        String queueName = params.getFirst("name").orElse(null);
        Queue dest = getQueue(queueName);

        return processSend(dest, data);
    }
    catch ( Exception e ) {
        e.printStackTrace();
    }
    
    return HttpResponse.status(HttpStatus.UNAUTHORIZED, 
                               "Not authenticated");
}

@Consumesアノテーションでは、POSTがContent-Type HTTPヘッダー・フィールドとしてHTTP MIMEタイプtext/event-streamを受け取れることを示しています。宛先の名前は、HTTPパラメータとして渡すことが想定されています。宛先オブジェクトを取得したら、メッセージは、ライブの受信者に向けてルーティングされます。受信者が存在しない場合、メッセージは永続化されます。

 

SSEメッセージの送信(クライアントのコード)

アプリケーションからHTML5 Server-Sent Eventsを送信するために、SSEDataPublisherヘルパー・クラス(ダウンロードのsselibraryパッケージに含まれています)を使うことができます。このクラスは、キューベースのメッセージでもトピックベースのメッセージでも等しく良好に機能します。ここに含まれている実質的なメソッドは1つだけで、sendMessageという名前です。このメソッドでは、メッセージの永続化と配信を処理するメッセージ・サーバーのURL、メッセージのペイロード、セキュリティ向上のための認証コード(省略可能)を受け取ります。

メッセージのペイロードは、HTML5 SSE仕様に従い、次に示す3つのフィールドを含むテキスト文字列の形式とする必要があります。

  • イベント・タイプ(ハートビートやメッセージなど)、例:event: message
  • ミリ秒単位でのリトライ間隔、例:retry:30000
  • データ本体、例:data: “actual data here…”

リスト3(簡潔さを優先し、一部のコードは割愛しています)に示すように、各フィールドのテキストは改行文字\nで終了し、HTTP POSTメッセージの一部として送信されなければなりません。また、データ・フィールドの最後には、改行文字をもう1つ追加します。

リスト3:HTML5 SSE仕様に従ってHTTP POSTを送信するsendMessageメソッド

String event = "event: message\n";
String retry = "retry: 300000\n";
data = "data: " + data + "\n\n";

URL url = new URL( uri.toASCIIString() );
HttpURLConnection urlConn = 
    (HttpURLConnection)url.openConnection();
urlConn.setFixedLengthStreamingMode(
    event.length() + retry.length() + data.length());
urlConn.setDoOutput(true);
urlConn.setDoInput(true);
urlConn.setRequestMethod("POST");
urlConn.addRequestProperty("Content-Type", "text/event-stream");
urlConn.addRequestProperty("Authorization-Info", authCode);

PrintWriter out = new PrintWriter( urlConn.getOutputStream() );
out.write(event);
out.write(retry);
out.write(data);

特に重要なのは、HttpURLConnection.setFixedLengthStreamingModeです。このメソッドは、HTTPリクエスト本体のストリーミングを内部バッファリングなしで行うことができるようにするためのもので、合計メッセージ・ペイロード長(改行文字を含む)をここに設定する必要があります。次に、HTTP Content-TypeとAuthorization-Info(省略可能)の各ヘッダー・フィールドを設定します。最後に、データをPOST本体に書き込みます。このメッセージは、メッセージ・サーバーで受信されて処理されます(後述)。

 

SSEメッセージの受信(クライアントのコード)

sselibraryパッケージに含まれているSSEDataSubscriberヘルパー・クラスを使用すれば、アプリケーションでHTML5 Server-Sent Eventsを受信することは簡単です。このクラスを使うためには、まずSSECallbackインタフェースを実装します。このインタフェースでは、1つのメソッドonMessageが定義されており、このメソッドを通じてメッセージが配信されます(リスト4参照)。次に、SSEDataSubscriberクラスのインスタンスを作成し、コンストラクタにメッセージ・サーバーのURL、サーバーのタイプ(TopicまたはQueue)、および認証文字列(省略可能)を渡します。

リスト4:ヘルパー・クラスを使用してアプリケーションでSSEメッセージを受信する

SSEDataSubscriber sse = new SSEDataSubscriber( 
        serverURL, SSEDataSubscriber.DestinationType.QUEUE, auth);
sse.subscribe(destinationName, this);

// ...

@Override
public void onMessage(String queue, String data) {
    // ...
}

アプリケーションでリスナーを実装するために必要なものはこれだけです。それでは、SSEDataSubscriberヘルパー・クラスの内部を詳しく見てみます。

SSEDataSubscriberクラスの内部:SSEDataSubscriberクラスでは、SSEメッセージをリスニングするHTTPの仕組みを抽象化し、隠蔽しています。コンストラクタ(リスト5参照)では、宛先のタイプ(QueueまたはTopicか)に応じて、メッセージ・サーバーのRESTエンドポイントURLに適切なAPIパスを追加しています。

リスト5:適切なRESTエンドポイントURLを作成するコンストラクタ

public SSEDataSubscriber( String serverURI,
                          DestinationType type,
                          String authCode ) {
    this.authCode = authCode;
    if ( type == DestinationType.QUEUE ) {
        this.serverURL = serverURI + "/api/queue/";
    }
    else {
        this.serverURL = serverURI + "/api/topic/";
    }
}

次に、クライアント・アプリケーションがsubscribeを呼び出したときに、与えられた宛先の名前とコールバックが保存され、Threadが開始します。すると、リスト6に示すThread.runメソッドが実行されます。

リスト6:SSEDataSubscriberによって拡張されたThread.runメソッド実装

URL url = new URL(serverURL);
URLConnection conn = url.openConnection();
conn.setDoOutput(true);
conn.setConnectTimeout(0);

BufferedReader rd = 
    new BufferedReader(
        new InputStreamReader( conn.getInputStream() ) );

String line;
while ((line = rd.readLine()) != null) {
    if ( line != null && line.length() > 0 ) {
        // Did we get a heartbeat or useful data?
        if ( line.startsWith(":") ) {
            // heartbeat message...
        }
        else if ( line.startsWith("data:") ) {
            // Received data, send to the client's callback
            if ( callback != null ) {
                callback.onMessage(destination, line);
            }
        }
    }
}

このスレッドで、与えられた宛先に送信されるメッセージの処理専用として、メッセージ・サーバーへのコネクションが作成されます。HTTPメッセージを受信したら、ハートビート(空のメッセージ)であるか、実データを含むメッセージ(テキストdata:が存在する)であるかが判定されます。データは、与えられたコールバックのonMessageメソッドを使って非同期式にクライアントに配信されます。

それでは、メッセージ・サーバーのマイクロサービス(Micronaut.ioで実装されたもの)に戻り、それがどのようにメッセージの処理と配信を行っているのかを確認してみます。

 

Messengerベース・クラス

再びQueueControllerクラスとTopicControllerクラスに注目します。この2つはいずれもベース・クラスMessengerを継承していることに注意してください。TopicおよびQueueという両方の宛先タイプのprocessSendメソッド(前述のリスト2で参照されているもの)が実装されているのは、このクラスです(リスト7参照)。まず、データを改行文字で分割しています(SSEメッセージを送信した際、仕様に従って改行文字を追加したことを思い出してください)。

リスト7:メッセージ・サーバーのマイクロサービス内にあるMessenger.processSendメソッド

public HttpResponse processSend(Destination dest, String data) {
    String[] lines = data.split(System.getProperty("line.separator"));
    try { 
        for ( String line: lines) {
            if ( line.contains("event:")) { }
            else if ( line.contains("id:") ) { }
            else if ( line.contains("data:") ) {
                int start = line.indexOf("data:")+"data:".length();
                data = line.substring(start).trim();
                dest.addMessage(data);
            }
        }

        return HttpResponse.ok(dest.getName());
    } 
    catch ( Exception e ) {
        return HttpResponse.serverError(e.toString());
    }
}

メッセージのdata:フィールドをメッセージのテキストから取得しています。そして、宛先のaddMessageメソッドを呼び出してメッセージ・データを渡しています。このメソッドはDestination抽象ベース・クラスで定義されていますが、その拡張クラスであるQueueとTopicでは実装が異なっています。次は、その点に注目してみます。

Topicクラスの内部:宛先の1つであるTopicの仕組みは単純です。送信された各メッセージは、すべてのアクティブなリスナーに配信されます。つまり、1対多関係が成立しています(図2参照)。

With Topic-based publish/subscribe messaging, each message is delivered to every active subscriber

 

図2:トピックベースのパブリッシュ/サブスクライブ・メッセージングでは、各メッセージがすべてのアクティブなサブスクライバに配信される

メッセージが送信されたとき、addMessageメソッドでは、メッセージのペイロード(テキスト)をカプセル化するMessageオブジェクトを作成し、TopicオブジェクトのlastMessageメンバー変数にMessageを格納して、Topicのモニター・オブジェクトを待機しているすべてのスレッドに対してシグナルを送ります。この処理をリスト8に示します。

リスト8:着信したトピック・メッセージの処理

public boolean addMessage(String msgData) {
    Long messageId = System.currentTimeMillis();
    Message msg = new Message(messageId, msgData);
    lastMessage = msg;

    // Notify ALL listeners of the message
    synchronized (topicMonitor ) {
        topicMonitor.notifyAll();
    }
    return true;
}

メッセージ・サーバーのRESTエンドポイントを呼び出してトピック・メッセージを受け取るすべてのクライアントは、最終的に、TopicクラスのgetNextMessageを呼び出し、メッセージが利用できるようになってシグナルを受け取るまで、そこでモニターを待機することになります(リスト9)。

リスト9:メッセージが宛先に到着するまで待機するTopic.getNextMessageメソッド

public Message getNextMessage() throws InterruptedException {
    synchronized ( topicMonitor ) {
        topicMonitor.wait();
    }
    return lastMessage;
}

トピックについては以上です。一方のQueueクラスは、もう少し込み入っています。

Queueクラスの内部:TopicとQueueの最大の違いは、Queueには以下の特徴があることです。

  • メッセージは最大1つのリスナーに配信される必要がある(図3参照)
  • メッセージは、将来的に配信される可能性があるため、リスニングしているクライアントがなくても保存する必要がある
With a queue, each message sent is delivered to exactly one listener

 

図3:キューによって、各送信メッセージが厳密に1つのリスナーに配信される

Queue.getNextMessageは、クライアントがメッセージ・サーバーのRESTエンドポイントを呼び出したときに呼び出されます(前述のリスト1で示しています)。メッセージ自体がメモリに保持されることはなく、メッセージIDのみが保持されます(リスト10参照)。キューに格納されたメッセージは、キューに関連付けられたリスナーがメッセージを取り出そうとするまで、無期限にキューにとどまる可能性があります。そのため、メッセージすべてを保存した場合、メモリをすべて使い切ってしまう可能性があります。そこでその代わりに、メッセージのデータを永続化しています。

リスト10:同じ宛先のキューに格納された次のメッセージIDを待機する間、ブロックされるコール元

public Message getNextMessage() throws InterruptedException {
     // 呼出しのブロック
    Long messageId = messageIds.take();

    // IDを使ってメッセージ・データをロード
    Message message = persistance.getMessage(getName(), messageId);
    return message;
}

 

messageIdオブジェクトは、java.util.concurrent.ArrayBlockingQueueとして実装されています。takeの呼出しは、キューに格納されたエントリが利用できるようになるまでブロックされます。その際に、このコードではキューの先頭を削除し、ブロックされているコール元1つのみに返却しています。メッセージIDが手に入ったら、メッセージのペイロードを永続化ストアから取得しています(この点は、キューベースのメッセージングにおける信頼性の一部です)。最終的に、メッセージが配信されれば、そのメッセージIDと永続化されていたメッセージ本体は削除されます。

 

信頼性の高いメッセージングの実装

MessagePersistenceインタフェースは、実際にメッセージを永続化する仕組みの詳細を隠蔽するために定義されています。Queueオブジェクトでは、ファクトリ・パターンを使って永続化実装のインスタンスを取得しています(リスト11参照)。

リスト11:ファクトリ・パターンを使って永続化実装を取得するQueueオブジェクト

public class Queue extends Destination {
    final protected MessagePersistance persistance = 
MessagePersistanceFactory
            .getInstance().getMessagePersistance();

        //...
}

このファクトリ・パターンは、MessagePersistenceインタフェースを実装した任意の永続化実装をロードするように構成することができます(その際に、依存性注入、構成ファイル、環境変数などを使用できます)。例として、1つの実装を確認してみます。

NoSQLデータベースによる永続化:MessageNoSQLクラス(ダウンロード・パッケージに含まれています)では、MessagePersistenceインタフェースを実装しており、Oracle NoSQL Databaseを使ってメッセージIDによるメッセージの格納と取得を行います。QueueControllerクラスでは、ファクトリ・パターンを使っており、このインタフェースにのみ依存しているため、簡単にクラウドベースのNoSQLデータベースなどの実装と交換できます。

メッセージは、名前/値ペアを使って格納しています。キー(名前)は、宛先名とメッセージIDを組み合わせたものです。値はメッセージのペイロードで、バイト配列としてエンコードしています(リスト12参照)。

リスト12:メッセージをNoSQLデータストアに保存

public boolean saveMessage( String destinationName, 
                            Long messageId, 
                            String message) throws Exception {
    String idStr = messageId.toString();
    store.put( Key.createKey(destinationName, idStr),
               Value.createValue(message.getBytes()) );
    return true;
}

メッセージの取得も簡単です(リスト13参照)。まず、組み立てたキーを使用して、Valueオブジェクトを取得しています。このオブジェクトが見つかった場合、それを使って格納されたバイト配列を取得しています。この配列がメッセージのペイロードを表しています。結果は、Messageオブジェクトに変換して返却しています。

リスト13:キー(宛先名とメッセージID)からメッセージ・ペイロードを取得

public Message getMessage(String destinationName, Long messageId) {
    String idStr = messageId.toString();
    Key key = Key.createKey(destinationName, idStr);
    ValueVersion value = store.get(key);
    if ( value == null || value.getValue() == null ) {
        return null;
    }

    Value val = value.getValue();
    String data = new String( val.getValue() );
    return new Message( messageId, data);    
}

最初にメッセージ・サーバーが起動したときに、NoSQLデータベースに格納されているすべてのメッセージIDをメモリにロードします。これを行うため、まずQueueControllerで、永続化されているメッセージに関連付けられているすべての宛先名について反復処理を行っています(リスト14参照)。続いてgetQueueが呼ばれることで、各宛先名についてQueueオブジェクトが作成されます(キューのみが永続化されるため、これは安全です)。

リスト14:NoSQLデータベースからすべての宛先名を取得 

private void loadSavedMessages() {
    ArrayList<String> queueNames = messageDB.getStoredDestinations();
    for ( String queueName: queueNames ) {
        // Get the queue (loads all queued messages)
        Queue dest = getQueue(queueName);
    }
}

getQueueメソッド(ベース・クラスのMessengerで実装されています)では、与えられた宛先名を使ってQueueオブジェクトを作成しています。これにより、コンストラクタで指定されたデータベースから、そのキューに対応するすべてのメッセージIDがロードされます(リスト15参照)。

リスト15:キューに対応するすべてのメッセージIDをデータベースからロード

public Queue(String name) {
    // load message IDs
    ArrayList<Long> ids = persistance.getMessageIds(name);
    if ( ids != null ) {
        this.messageIds.addAll( ids );
    }
}

このサンプル実装では、kvstoreという名前のデータベースがローカル(127.0.0.1)のポート5000で実行されていることを仮定しています。これは構成ファイルでオーバーライドすることができます。

Oracle NoSQL Databaseの実行:本記事のSSEメッセージ・サーバーのコードを実行するためには、Oracle NoSQL Database Community Editionをダウンロードしてインストールします。データベースをインストールした後、config.xmlファイルを変更して、hostnameにお使いのコンピュータを、registryPortに5000を設定します。このデータベースを実行するために、次のコマンドを使用します。

> java -jar lib/kvstore.jar kvlite -secure-config disable

最後のパラメータでは、このサンプル実装を実行しやすくするためにセキュリティを無効化しています。しかし、本番環境ではこのパラメータを使うべきではありません。データベースが起動すると、次のように出力されます。

Opened existing kvlite store with config:
-root ./kvroot -store kvstore -host Dolce -port 5000 -secure-config disable -restore-from-snapshot null

以上で、SSEメッセージ・サーバーを実行できるようになります。SSEメッセージ・サーバーについては、次のセクションで説明します。

 

エンド・ツー・エンドのデモ

Micronautを使ったSSEメッセージ・サーバーは、次のコマンドで起動できます(最初にNoSQLデータベースを起動することを忘れないでください)。

> java -jar target/MessageServer-1.0-SNAPSHOT.jar

コマンドの実行に成功した場合、次のような行で終わるログが出力されます。

12:53:28.857 [main] INFO io.micronaut.runtime.Micronaut - Startup completed in 701ms. Server Running: http://localhost:8080

キュー・レシーバを実装するためには、ヘルパー・クラスSSEDataSubscriberを使用します。このクラスでは、コンストラクタのパラメータとして、MicronautサーバーURI、宛先のタイプ(QueueまたはTopic)、および認証コード(省略可能)を受け取ります。キュー・レシーバの作成後、subscribeメソッドを呼び出してキューをリスニングします(リスト16参照)。

リスト16:キューのサブスクライブ

SSEDataSubscriber sse = new SSEDataSubscriber(
        serverUrl, SSEDataSubscriber.DestinationType.QUEUE, authCode);
sse.subscribe( queueName, this );

この場合、呼び出す側のクラスでSSECallbackインタフェースを実装し、subscribeを呼び出す際に自身への参照を渡しています。メッセージがキューに着信したときに、オブジェクトのonMessageメソッドが呼び出されてペイロードが渡されます。この実現方法を確認したい方は、本記事のリスト6をご覧ください。

キューにメッセージを送信するためには、SSEDataPublisherクラス(前述のリスト3で説明しています)を使います。リスト17をご覧ください。

リスト17:キューへのメッセージ送信

String url = serverURI + "/api/queue/publish?name=" + queueName;
SSEDataPublisher.sendMessage(url, data, authCode);

必要なものはこれだけです。キューおよびトピックに対するメッセージの送受信に同じヘルパー・クラスを使えるため、いずれのタイプのアプリケーションでも、Javaコードは非常に似たものになります。次は、Webアプリケーション内でデータの更新を動的に表示するための、JavaScriptのリスナーを作成する方法を確認します。

JavaScript SSEリスナーの実装:この最後のサンプルでは、トピックを使って疑似的な最新の温度を送信し、SSEを使ってWebページ上で動的に更新します。Javaから温度を送信する部分は、先ほどのセクションで確認したキュー・センダーと同様のものです。しかし、リスナーはHTMLベースの単純なWebページに埋め込まれたJavaScriptコードです。

まず、リスト18に示すようにEventSourceインスタンスを作成します。

リスト18:EventSourceインスタンスの作成

var source = new EventSource(uri+"/messageserver/api/topic/temp1");

次に、リスト19に示すように、デバッグとコネクションのリセットに使用するエラー・ハンドラをセットアップします。

リスト19:エラー・ハンドリングのセットアップ

source.onerror = function(event) {
    console.log("SSE onerror " + event);

    // 1秒待って再接続する
    setTimeout(function() { setupEventSource(); }, 1000); 
}

 

最後に、リスト20に示すメッセージ・リスナー関数(ペイロードはここに配信されます)を実装します。

リスト20:メッセージ・リスナー関数の実装

source.onmessage = function(event) {
    var tempGauge = document.getElementById('temperature');
    tempGauge.innerHTML = event.data;
}

この例において、tempGaugeは、更新される温度値を表示するために使用する、HTMLのdiv要素です。SSEメッセージ・サーバーが実行中で、送信元からtemp1トピックにデータがパブリッシュされたとき、Webページは図4のようになります。

A web application with simple JavaScript to update the temperature reading

 

図4:温度の測定値を更新する単純なJavaScriptを含むWebアプリケーション

なお、クロスサイト・スクリプティング・セキュリティの関係で、Chromeなどのブラウザでは、Webページを公開しているURL以外からのデータがブロックされることに注意してください。これに対処する方法はいくつかありますが、開発目的の場合に簡単な方法は、次のパラメータを使ってコマンドラインからChromeを起動することです。

google-chrome --disable-web-security --user-data-dir=<User home dir>

Windowsでは、google-chromeをchrome.exeに置き換えます。

さらに簡単な例として、いつもどおりブラウザを開いてから、有効なSSEの宛先の適切なURLを入力することもできます。たとえば、http://localhost:8080/messageserver/api/topic/topic1などです。その結果、データが更新されるたびにブラウザのページに追加されますが、やがては最新の値を確認するためにスクロールが必要となります。

 

まとめ

本記事では、MicronautのSSEサポートを使用して、キューベースのメッセージングとトピックベース(パブリッシュ/サブスクライブ)のメッセージングに対応した、シンプルで信頼性の高いメッセージ・システムを構築する方法を説明しました。Micronautの起動時間は非常に短く、スループットに優れ、メモリのオーバーヘッドは少なくなっています。そのため、インスタンスが短時間でスピンアップおよびスピンダウンされるクラウドベースのマイクロサービスで、優れた選択肢となっています。


Eric J. Bruno
Dellのアドバンスト・リサーチ・グループに所属。主にエッジや5Gを扱う。大規模分散ソフトウェアの設計、リアルタイム・システム、エッジ/IoTを専門とするエンタープライズ・アーキテクト、開発者、アナリストであり、およそ30年にわたって情報テクノロジー・コミュニティで活躍している。Twitterのフォローは@ericjbrunoから。


Eric J. Bruno

Dellのアドバンスト・リサーチ・グループに所属。主にエッジや5Gを扱う。大規模分散ソフトウェアの設計、リアルタイム・システム、エッジ/IoTを専門とするエンタープライズ・アーキテクト、開発者、アナリストであり、およそ30年にわたって情報テクノロジー・コミュニティで活躍している。Twitterのフォローは@ericjbrunoから。