※本記事は、Daniel Keo による “Reactive streams programming over WebSockets with Helidon SE” を翻訳したものです。
Helidon SEを使用し、リモートのパブリッシャが一度に送信するデータの量をクライアントからパブリッシャに通知することで、クライアント・アプリケーションから非同期トラフィックを制御できる
September 11, 2020
リアクティブ・ストリーム・プログラミングは、非同期ストリーム処理を扱う方法として人気が上昇しており、リアクティブなアプローチを採用するAPIは増加を続けています。リアクティブなアプローチでは、すべてが非同期でノンブロッキングでなければなりません。また、パブリッシャのペースが速い場合にサブスクライバが飽和しないように、フィードバックを行ってデータ・フローを制御する仕組みを実装する必要があります。
しかし、最適なソリューションはReactive Streams APIを使うことだと筆者は考えます。このAPIでストリームによる双方向通信を行えば、アプリケーションがどのくらいの負荷に耐えられるかをパブリッシャに伝えることができます。つまり、必要なバックプレッシャを適用して、ニーズや要求に応じてストリームを開始したり停止したりすることができます。図1をご覧ください。
この非同期双方向通信というニーズがあることで、ネットワーク経由でデータをリモートでストリーミングするという開発者の選択肢が限られてしまう可能性があります。もちろん、この問題に対処できる高耐久性のメッセージ・システムは存在します。そういったシステムでは、ネットワークを流れる大量の非同期トラフィックを扱えますが、かなり大がかりなインフラストラクチャが必要な場合が多く、おいそれと使えるものではありません。しかし、サーバーからクライアントへのリアクティブ接続という単純なユースケースで、大量のデータが送信されるわけではなく、サーバーによる送信データ量がクライアントからサーバーに通知される場合はどうでしょうか。

図1:Reactive Streams APIのパブリッシュ/サブスクライブ・モデル
ネットワーク経由でパブリッシャとサブスクライバを接続するソリューションはすでに存在します。たとえば、RSocket、Reactive gRPC、ServiceTalkといったものです。これらのソリューションは仕様に準拠しており、すぐに使用できます。
しかし、すでにHelidon SEマイクロサービスのJavaライブラリでWebSocketなどの双方向ネットワーク・プロトコルを使っている方の場合、単純にWebSocketをそのまま使ってクライアント・サブスクライバをリモートのパブリッシャに接続するのはどれほど大変なことでしょうか。本記事では、その点に迫ります。
誤解のないように言っておきますと、Reactive Streams for JVM APIを自力で実装するのはかなり厄介な場合があり、通常は推奨されません。というのも、この一見単純そうに見えるAPIには、複雑な仕様があるからです。しかしそれと引き替えに、ストリームのアイテムのシリアル化を完全に制御でき、ネットワークの問題から回復する際の選択肢を広げることができます。
実際、WebSocketプロトコルはこのようなタスクに非常に適しています。Helidon SEでWebSocketを使う利点と欠点については、後ほど述べたいと思います。本記事では、Helidon SEを使用し、リモートのリアクティブ・サブスクライバにパブリッシュするためのWebSocketサーバーを作成します。そして、カスタムのリアクティブ・シグナルを定義し、そのシグナルをJSONにシリアライズし、JavaおよびJavaScriptベースのサブスクライバから(ストリーム経由で)サーバーに接続します。
そして、そのすべてがエンド・ツー・エンドでリアクティブです。
本記事のサンプルは、すべてGitHubに掲載しています。
先へと進む前に、Helidonでマイクロサービスを書く場合、2つのプログラミング・モデルがサポートされている点に注意してください。
- Helidon SEは、リアクティブ・プログラミング・モデルをサポートするマイクロフレームワークです。今回はこちらを使用します。
- Helidon MPは、Jakarta EEコミュニティがポータブルな形でマイクロサービスを実行できるようにするEclipse MicroProfileランタイムです。
サブスクライバとしての WebSocket サーバー・エンドポイント
アプリケーションからネットワーク経由でリアクティブ・ストリームに接続するには、サーバー側に中間リアクティブ・サブスクライバ、クライアント側にパブリッシャが必要です。実際にはサーバー側がパブリッシュするので、少しわかりにくいかもしれませんが、このWebSocketネットワーク・ブリッジをストリームの中間で処理を行うプロセッサだと考えてください。つまり、サーバー側は上流ストリームのサブスクライバとして振る舞う必要があります。一方、クライアント側は下流ストリームのパブリッシャとして振る舞います。図2をご覧ください。

図2:WebSocketネットワーク・ブリッジの上流ストリームと下流ストリーム
WebSocket APIでは、常にクライアント・エンドポイントからサーバー・エンドポイントに接続します。この2種類のエンドポイントにおける最大の違いは、サーバー・エンドポイントのインスタンスがWebSocketの実装によってすべてのクライアント接続に対してデフォルトで作成されることです。そのため、サーバー・エンドポイントをサブスクライバとして使うには、接続後にそのインスタンスにアクセスし、そこでリアクティブ・ストリームをサブスクライブする必要があります。
まず、このサンプル・アプリケーションのパブリッシャに対してStreamFactoryクラスを使い、クライアントが接続されるたびにパブリッシャを供給します。このファクトリでは、0.5秒間隔で10個の文字列アイテムを発行するというシンプルな新しいストリームを返します。
public class StreamFactory {
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
public Multi<String> createStream() {
return Multi.interval(500, TimeUnit.MILLISECONDS, scheduledExecutorService)
.limit(10)
.map(aLong -> "Message number: " + aLong);
}
}
次は、WebSocketで送信するシグナルを区別するための、シンプルなカスタム・ラッパーを作成します。この例では、文字列のストリームであると仮定します。簡素化するため、この例では、サブスクライブ・シグナル、すなわちonSubscribeシグナルは送りません。この簡素化の影響で、アプリケーションではWebSocket接続の準備ができるまで待機してからリクエスト・シグナルを送信しなければなりません。そのため、抽象化はさほど完璧ではありません。
public class ReactiveSignal {
public Type type;
public long requested;
public String item;
public Throwable error;
public enum Type {
REQUEST,
CANCEL,
ON_NEXT,
ON_ERROR,
ON_COMPLETE
}
public static ReactiveSignal request(long n) {
ReactiveSignal signal = new ReactiveSignal();
signal.type = Type.REQUEST;
signal.requested = n;
return signal;
}
public static ReactiveSignal cancel() {
ReactiveSignal signal = new ReactiveSignal();
signal.type = Type.CANCEL;
return signal;
}
public static ReactiveSignal next(String item) {
ReactiveSignal signal = new ReactiveSignal();
signal.type = Type.ON_NEXT;
signal.item = item;
return signal;
}
public static ReactiveSignal error(Throwable t) {
ReactiveSignal signal = new ReactiveSignal();
signal.type = Type.ON_ERROR;
signal.error = t;
return signal;
}
public static ReactiveSignal complete() {
ReactiveSignal signal = new ReactiveSignal();
signal.type = Type.ON_COMPLETE;
return signal;
}
}
この例では、WebSocketメッセージをJSON-Bでエンコードする方法が最適です。そうしておくと、後ほどJavaScriptからサーバーに接続する際に報われることになります。
public class ReactiveSignalEncoderDecoder
implements Encoder.TextStream<ReactiveSignal>, Decoder.TextStream<ReactiveSignal> {
private static final Jsonb jsonb = JsonbBuilder.create(new JsonbConfig().withAdapters(new ThrowableAdapter()));
@Override
public ReactiveSignal decode(final Reader reader) {
return jsonb.fromJson(reader, ReactiveSignal.class);
}
@Override
public void encode(final ReactiveSignal object, final Writer writer) throws IOException {
writer.write(jsonb.toJson(object));
}
@Override
public void init(final EndpointConfig config) {
}
@Override
public void destroy() {
}
}
次に、WebSocketエンドポイントを準備します。このエンドポイントもFlow.Subscriberとなるので、StreamFactoryで作成されたパブリッシャを直接サブスクライブすることができます。このコードでは、エンドポイントでWebSocketメッセージがインターセプトされるためには、前提としてサブスクリプションが実現している必要があるものとしています。
public class WebSocketServerEndpoint extends Endpoint implements Flow.Subscriber<String> {
private static final Logger LOGGER = Logger.getLogger(WebSocketServerEndpoint.class.getName());
private Session session;
private Flow.Subscription subscription;
@Override
public void onOpen(Session session, EndpointConfig endpointConfig) {
this.session = session;
System.out.println("Session " + session.getId());
session.addMessageHandler(new MessageHandler.Whole<ReactiveSignal>() {
@Override
public void onMessage(ReactiveSignal signal) {
System.out.println("Message " + signal);
switch (signal.type) {
case REQUEST:
subscription.request(signal.requested);
break;
case CANCEL:
subscription.cancel();
break;
default:
throw new IllegalStateException("Unexpected signal " + signal.type + " from upstream!");
}
}
});
}
@Override
public void onError(final Session session, final Throwable thr) {
LOGGER.log(Level.SEVERE, thr, () -> "WebSocket error.");
super.onError(session, thr);
}
@Override
public void onClose(final Session session, final CloseReason closeReason) {
super.onClose(session, closeReason);
subscription.cancel();
}
@Override
public void onSubscribe(final Flow.Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(final String item) {
sendSignal(ReactiveSignal.next(item));
}
@Override
public void onError(final Throwable throwable) {
sendSignal(ReactiveSignal.error(throwable));
try {
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage()));
} catch (IOException e) {
LOGGER.log(Level.SEVERE, e, () -> "Error when closing web socket.");
}
}
@Override
public void onComplete() {
sendSignal(ReactiveSignal.complete());
try {
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Completed"));
} catch (IOException e) {
LOGGER.log(Level.SEVERE, e, () -> "Error when closing web socket.");
}
}
private void sendSignal(ReactiveSignal signal) {
session.getAsyncRemote().sendObject(signal);
}
}
WebSocketのAsyncRemoteを使ってメッセージを非同期式に送信している点に注意してください。これが必要なのは、リアクティブ・パイプラインではスレッドをブロックすることが禁止されているからです。
ここで唯一足りないものは、WebSocketサーバーとなるHelidon SEの起動です。起動後は、クライアントが接続してきたときに、作成したすべてのエンドポイント/サブスクライバが、StreamFactoryによって供給される新しいパブリッシャをサブスクライブします。こうすれば、下流ストリームのクライアントから最初のリクエスト・シグナルがWebSocketで着信したときに、上流ストリームはサブスクリプションの準備ができた状態になっています。
StreamFactory streamFactory = new StreamFactory();
TyrusSupport tyrusSupport = TyrusSupport.builder()
.register(
ServerEndpointConfig.Builder.create(
WebSocketServerEndpoint.class, "/messages")
.encoders(List.of(ReactiveSignalEncoderDecoder.class))
.decoders(List.of(ReactiveSignalEncoderDecoder.class))
.configurator(new ServerEndpointConfig.Configurator() {
@Override
public <T> T getEndpointInstance(final Class<T> endpointClass)
throws InstantiationException {
T endpointInstance = super.getEndpointInstance(endpointClass);
if (endpointInstance instanceof WebSocketServerEndpoint) {
WebSocketServerEndpoint endpoint =
(WebSocketServerEndpoint) endpointInstance;
//Endpoint is instantiated for every connection; lets subscribe it to the upstream
streamFactory.createStream().subscribe(endpoint);
}
return endpointInstance;
}
})
.build())
.build();
Routing routing = Routing.builder()
.register("/ws", tyrusSupport)
.build();
WebServer.builder(routing)
.build()
.start();
サーバー側はこれで完成です。
パブリッシャとしての WebSocket クライアント・エンドポイント
次の手順は、クライアント側でFlow.Publisherを使い、アプリケーションでサブスクライブを行えるようにすることです。パブリッシャが従わなければならない仕様上のルールは多数ありますが、一番優先されるのは、サブスクライバのメソッドに逐次的にシグナルを送ること(JVMのReactive Streams仕様のルール1.3)と、下流ストリームからのシグナルをブロックしないこと(ルール3.4および3.5)です。ここでは、WebSocketで押し寄せてくる非同期シグナルからサブスクライバ自体を守れるように、Helidon SEのSequentialSubscriberクラスを実際のサブスクライバのラッパーとして利用します。リクエストやキャンセルのシグナルが、確実にノンブロッキングで処理を妨害しないようにするため、サーバー側と同じように、WebSocketのAsyncRemoteをそのまま使ってシグナルを上流ストリームに送信します。
public class WebSocketClientEndpoint extends Endpoint implements Flow.Publisher<String>, Flow.Subscription {
private static final Logger LOGGER = Logger.getLogger(WebSocketClientEndpoint.class.getName());
private Session session;
private Flow.Subscriber<? super String> subscriber;
@Override
public void onOpen(final Session session, final EndpointConfig endpointConfig) {
this.session = session;
session.addMessageHandler(new MessageHandler.Whole<ReactiveSignal>() {
@Override
public void onMessage(ReactiveSignal signal) {
switch (signal.type) {
case ON_NEXT:
subscriber.onNext(signal.item);
break;
case ON_ERROR:
subscriber.onError(signal.error);
break;
case ON_COMPLETE:
subscriber.onComplete();
break;
default:
subscriber.onError(new IllegalStateException("Unexpected signal " + signal.type + " from upstream!"));
}
}
});
}
@Override
public void onError(final Session session, final Throwable thr) {
Optional.ofNullable(subscriber).ifPresent(s -> s.onError(thr));
LOGGER.log(Level.SEVERE, thr, () -> "Connection error");
super.onError(session, thr);
}
@Override
public void onClose(final Session session, final CloseReason closeReason) {
subscriber.onComplete();
super.onClose(session, closeReason);
}
@Override
public void subscribe(final Flow.Subscriber<? super String> subscriber) {
Objects.requireNonNull(subscriber, "subscriber is null");
// Notice usage of Helidon's SequentialSubscriber as a wrapper
// to get around difficulties with specification rules 1.3, 1.7
this.subscriber = SequentialSubscriber.create(subscriber);
subscriber.onSubscribe(this);
}
@Override
public void request(final long n) {
sendAsyncSignal(ReactiveSignal.request(n));
}
@Override
public void cancel() {
sendAsyncSignal(ReactiveSignal.cancel());
}
private void sendAsyncSignal(ReactiveSignal signal) {
try {
//reactive means no blocking
session.getAsyncRemote().sendObject(signal);
} catch (Exception e) {
subscriber.onError(e);
}
}
}
あとは接続して何かをリクエストするだけです。その際には、同じエンコーダを再利用してメッセージをシリアライズします。今回接続するのはテスト・アプリケーション1つだけであるため、クライアント・エンドポイントは自分でインスタンス化することができます。
public class Client {
private static final Logger LOGGER = Logger.getLogger(Client.class.getName());
public static void main(String[] args)
throws URISyntaxException, DeploymentException, InterruptedException, ExecutionException {
ClientManager client = ClientManager.createClient();
WebSocketClientEndpoint endpoint = new WebSocketClientEndpoint();
Future<Session> sessionFuture = client.asyncConnectToServer(endpoint,
ClientEndpointConfig.Builder
.create()
.encoders(List.of(ReactiveSignalEncoderDecoder.class))
.decoders(List.of(ReactiveSignalEncoderDecoder.class))
.build(),
new URI("ws://localhost:8080/ws/messages"));
//Wait for the connection
sessionFuture.get();
//Subscribe to the publisher and wait for the stream to end
Multi.create(endpoint)
.onError(throwable -> LOGGER.log(Level.SEVERE, throwable, () -> "Error from upstream!"))
.onComplete(() -> LOGGER.log(Level.INFO, "Complete signal received!"))
.forEach(s -> System.out.println("Received item> " + s))
.await();
}
}
出力は次のようになります。0.5秒ごとに10個のアイテムが着信した後に、完了シグナルを受け取ります。
Received item> Message number: 0
Received item> Message number: 1
Received item> Message number: 2
Received item> Message number: 3
Received item> Message number: 4
Received item> Message number: 5
Received item> Message number: 6
Received item> Message number: 7
Received item> Message number: 8
Received item> Message number: 9
Jul 30, 2020 5:34:22 PM io.helidon.fs.reactive.Client lambda$main$2
INFO: Complete signal received!
おわかりのように、forEachでWebSocketのパブリッシャをサブスクライブしています。その際にLong.MAX_VALUEをリクエストしていますが、これはサブスクライバがアイテムをいくつでも確実に消費できるという意味です。ありがたいことに、上流ストリームはアイテムを10個だけ送信して完了しました。
Java によるエラー・シグナル処理
何か問題が発生したらどうすればよいでしょうか。それについて考えてみましょう。まず、クライアントのコードが確実にエラー・シグナルのログを出力し、問題を報告するようにします。
Multi.create(endpoint)
.onError(throwable -> LOGGER.log(Level.SEVERE, throwable, () -> "Error from upstream!"))
.onComplete(() -> LOGGER.log(Level.INFO, "Complete signal received!"))
.forEach(s -> System.out.println("Received item> " + s))
.await();
次に、障害を発生させるために、4つ目のアイテムとしてエラー・シグナルを生成するようにStreamFactoryに指示します。
public class StreamFactory {
public Multi<String> createStream() {
return Multi.concat(Multi.just(1, 2, 3), Single.error(new Throwable("BOOM!")))
.map(aLong -> "Message number: " + aLong);
}
}
コードを実行すると、次の結果が表示されます。スタック・トレースが生成されているのはJSON-Bアダプタのおかげです。
Received item> Message number: 1
Received item> Message number: 2
Received item> Message number: 3
Jul 31, 2020 4:30:11 PM io.helidon.fs.reactive.Client lambda$main$1
SEVERE: Error from upstream!
java.lang.Throwable: BOOM!
at app//io.helidon.fs.reactive.StreamFactory.createStream(StreamFactory.java:9)
at app//io.helidon.fs.reactive.Server$1.getEndpointInstance(Server.java:67)
at app//org.glassfish.tyrus.core.TyrusEndpointWrapper$1.getEndpointInstance(TyrusEndpointWrapper.java:225)
これで、Java同士をリアクティブ・ストリームでつなぐソリューションが完成しました。次に、多言語環境に少し足を踏み入れたらどうなるでしょうか。
JavaScript によるフルスタック・リアクティブ・コーディング
バックエンドにリアクティブWebSocketエンドポイントがある場合、そのエンドポイントをフロントエンドのリアクティブ・パイプラインに接続してみてはいかがでしょうか。
ここでは、アプリケーションをReactive Extensions for JavaScript(RxJS)ストリームに接続してみます。エンド・ツー・エンドでリアクティブ性を維持するため、リアクティブな命令を使ってカスタム・シグナルをRxJSストリームにマッピングします。以下のスニペットでは、takeWhile命令を利用して完全なシグナルを検出しています。ON_COMPLETEタイプが着信すると、RxJSストリームが完了します。
エラー・シグナルをマッピングするために最適なのは、flatMapと同じ機能をRxJSで持つmergeMapです。mergeMapを使ってON_ERRORタイプの任意のアイテムをストリーム・エラーにマッピングし、エラーをメイン・ストリームにフラット化します。エラーがON_NEXTだった場合は、アイテムを単にアンラップし、of(msg.item)を使ってフラット化します。
const { Observable, of, from, throwError} = rxjs;
const { map, takeWhile, mergeMap } = rxjs.operators;
const { WebSocketSubject } = rxjs.webSocket;
const subject = new WebSocketSubject('ws://127.0.0.1:8080/ws/messages');
// Now I have to map the custom signals to RxJS
subject.pipe(
// Map the custom ON_COMPLETE to RxJS complete signal
takeWhile(msg => msg.type !== 'ON_COMPLETE'),
// Map the custom ON_ERROR to RxJS error signal or unwrap next item
mergeMap(msg => msg.type === 'ON_ERROR' ? throwError(msg.error) : of(msg.item))
)
.subscribe(
// invoked for every item
msg => onNext(msg),
// invoked when error signal is intercepted
err => console.log(JSON.stringify(err, null, 2)),
// invoked when complete signal is intercepted
() => console.log('complete')
);
これを接続しても何も起こりません。その理由は、バックエンドがバックプレッシャに対応しており、アイテム数のリクエストが届くのを待っているからです。そこで、カスタムのリクエスト・シグナルを送信するボタンを追加します。
const input = $("#input");
const submit = $("#submit");
submit.on("click", onSubmit);
function onSubmit() {
subject.next({"requested":input.val(),"type":"REQUEST"});
}
実際に動作する完全なサンプルをGitHubで公開しています。このサンプルを使って任意の数のアイテムをリクエストし、結果を視覚的に確かめてみてください。ストリームは新しい接続のたびに初期化されるので、10個のアイテムがなくなってストリームが完了しても、ページをリロードするだけで再度開始されます。図3にユーザー・インタフェースを示します。

図3:JavaScriptフロントエンドのユーザー・インタフェース

JavaScript によるエラー・シグナル処理
もう一度StreamFactoryを変更し、4つ目のアイテムとしてエラー・シグナルを生成します。
public class StreamFactory {
public Multi<String> createStream() {
return Multi.concat(Multi.just(1, 2, 3), Single.error(new Throwable("BOOM!")))
.map(aLong -> "Message number: " + aLong);
}
}
カスタムのエラー・シグナルがJSONにエンコードされます。図4は、フロントエンドがコンソールに表示される様子を示しています。

図4:JavaScriptとJSON-Bによるリアクティブ・ストリームのエラー処理

おわかりのように、このアプリケーションではJavaスタック・トレース付きの完全な例外が出力されます。カスタム・シグナルをエンコードするためにJSON-Bが使われているからです。
まとめ
WebSocketは、リアクティブ・ストリームによる通信に対応するほど十分に強力です。この点は、比較的小規模なアプリケーションや、制約のあるアプリケーションのユースケースではメリットです。長時間続くことが想定されるストリームや、数百万件の項目が流れてくる可能性があるストリームでは、WebSocketよりも大がかりなツールが必要です。
このトピックに興味を覚えた方には、Helidon MPで利用できるMicroProfileリアクティブ・メッセージングをお勧めします。また、バージョン2.0.0以降のHelidon SEに導入されている非CDI APIもご覧ください。
さらに詳しく
- 「Helidonが飛び立つ」(英語)
- 「マイクロサービスの開発からデプロイまで(パート1):Helidonを使ってみる」(英語)
- Project Helidon(英語)
- リアクティブ・プログラミングの概要
- 「JAX-RSによるリアクティブ・プログラミング」
- 「JDK 9のFlow APIによるリアクティブ・プログラミング」(英語)
- 「Eclipse Vert.xとRxJavaでリアクティブ・プログラミング」
- WebSocket API(英語)
- GitHubのHelidon Full Stack Reactive(英語)
Daniel Kecはプラハのオラクルに勤務するJava開発者。Helidon Projectに注力している。
