※本記事は、“Curly Braces #4: Network data transmission and compression in Java” の翻訳記事です。

2022年5月7日 |10分読む

Eric J. Bruno


データ集約型アプリケーションがI/Oバウンドの場合、バイトの圧縮が必要になる可能性があります

メッセージング・ミドルウェアを自前で記述することで、堅牢な分散エンタープライズ・ソフトウェア・システムを構築できます。かつて私は独自のJava Message Service(JMS)実装をビルドしました。それによってJavaネットワークI/Oおよび関連するパフォーマンスについて多くを学習しました。

私が学んだ重要な教訓は、高パフォーマンスのメッセージング・ソフトウェアには、高速で効率的なコード以上のものが必要であることです。また、ネットワーキングに関する基本的な理解、および実行するシステムのI/Oの制限に関する知識も必要です。たとえば、分散コンポーネント間でデータが送信されると、I/Oバウンドと呼ばれるネットワークI/Oで最速のコードを待機するポイントもあります。図1は、まさにそれを示しています。

 

Illustration of an I/O-bound thread

図 1.  I/O バウンド・スレッド

I/Oの遅延に対処するための可能なソリューションとしてデータ圧縮に入る前に、Javaネットワーク・プログラミングの基本を確認します。

 

Java ネットワーク・プログラミング

Javaネットワーキングの基盤は、 java.net.Socket クラスと java.net.ServerSocket クラスの組合せです。つまり、ソケットはクライアント・コード用ですが、 ServerSocket はクライアントが接続するサーバー用です。

ほとんどのJava I/Oプログラミングと同様に、データは java.io.InputStream クラスと java.io.OutputStream クラスの組合せによって交換されます。サーバー・クラスとクライアント・クラスを使用しても、データの方向は決まらないことに注意してください。Javaサーバー・アプリケーションは、接続後にクライアントを待機してデータを送信したり、サーバーが単にリスニングするクライアントにデータを提供したりできます。また、データは、Webブラウザやサーバーと同様に、リクエスト・パラダイムとレスポンス・パラダイムの2つの間で双方向に移動できます。

クライアントの接続を待機する ServerSocket のサンプル実装を次に示します。クライアントは、実装言語に関係なく、正しいIPアドレスおよびポートに接続する任意のアプリケーションです。

    try {
        ServerSocket clientConnect = new ServerSocket(8080);
        Socket client = clientConnect.accept(); // 呼び出しをブロックしています
        InputStream instream = client.getInputStream();
        DataInputStream dis = new DataInputStream( instream );
        while ( true ) {
            String msg = dis.readUTF();
            System.out.println("Message: " + msg);
        }
    }
    catch ( Exception e ) {
        e.printStackTrace();
    }

前述のコードでは、実行しているホストのポート8080でリスニングする ServerSocketが作成されます。 accept() へのコールは、ネットワーク・クライアントがリスニング・ポートに接続するまでブロックして待機します。リスニング・ポートでは、クライアントへのソケット接続が返されます。

この実装では、サーバーは String メッセージをリスニングし、コマンドラインに出力します。これを行うには、リスナーをインスタンス化するために、クライアントの InputStreamDataInputStream コンストラクタに渡されます。 readUTF への後続の呼出しは、文字列メッセージが完全に到着するまでブロックされます。

サーバーに接続して String メッセージを送信する簡易クライアント・コードを次に示します。

    try {
        Socket sender = new Socket("localhost", 8080);
        if ( sender.isConnected() ) {
            DataOutputStream outputStream =
                    new DataOutputStream( conn.getOutputStream() );
            
            outputStream.writeUTF("I love Oracle Java Magazine!");
        }
    }
    catch ( Exception e ) {
        e.printStackTrace();
    }

この時点で、必要なアプリケーションレベルのプロトコルを理解することが重要です。前述の例では、Java文字列データがネットワーク経由で送信されます。ただし、その他のJavaオブジェクト・データは、次のように ObjectOutputStream クラスおよび ObjectInputStream クラスを使用してネットワーク上でシリアライズおよび送信できます。

    try {
        Socket sender = new Socket("localhost", 8080);
        if ( sender.isConnected() ) {
            ObjectOutputStream oos = 
              new ObjectOutputStream( 
                new BufferedOutputStream( sender.getOutputStream() ));
                    
            MyObject myObj = new MyObject();
            myObj.message = "I love Java!";
            myObj.messageId = getMessageId();
            // ...
            oos.writeObject( myObj );
            oos.flush();
        }
    }
    catch ( Exception e ) {
        e.printStackTrace();
    }

反対側のリスナーは、前述のように接続しますが、ブロッキング・コールは、シリアライズされたJavaオブジェクトが返されるまで待機します。

    ObjectInputStream ois =
    new ObjectInputStream( 
        new BufferedInputStream( client.getInputStream() ));
                
    MyObject myObject = (MyObject)ois.readObject();

ここでも、クライアントとサーバーの両方が、ネットワーク経由で MyObjectオブジェクトのシリアライズ・インスタンスを送信することに同意します。 BufferedOutputStream オブジェクトを使用してバッファI/Oを使用すると、JVMがオブジェクトへのバイトのアセンブリを内部的に効率的に処理するため、通常はパフォーマンスが向上します。

パフォーマンスについて深堀してみましょう。私の経験上、アプリケーションがネットワークを介してデータを送信する時間が長くなると、CPU使用率が減少し、つまり、より高速なサーバーでネットワーク・アプリケーションをチューニングしてもあまり効果がありません。代わりに、ネットワーク I/Oを改善する必要があります。より高速なI/O機能を備えたサーバーは役立ちますが、それも飽和状態になります。設計を改善する必要があり、コードを改善することを意味します。

改良の1つは、損失のないアルゴリズムを使用して送信される前にデータを圧縮することです(これにより、元のバイトが戻されます)。I/Oバウンド・サーバーがある場合、データの圧縮にCPU処理時間を費やす余裕があります。これにより、ネットワークI/Oが削減されます。

ちなみに、これはWebサーバーが通常、JPEGなどの圧縮形式でイメージを送信する理由の1つです。これは、画像の消費量がI/Oや帯域幅が少ないためです。ただし、JPEGを使用すると圧縮が失われるため、非圧縮イメージは元のイメージと正確には同じではありません。不十分な圧縮は、Webサイトを表示しても問題ありませんが、データ処理には使用できません。

 

バイトの圧縮方法

JDK java.util.zip パッケージには、データの圧縮と解凍、 .zip ファイルおよび .gzip ファイルの作成などのクラスが用意されています。このプロジェクトでは、適切なクラスはそれぞれバイトを圧縮および圧縮解除する Deflater および Inflaterです。まず、次の圧縮アルゴリズムを選択します。

Deflater compressor = new Deflater(Deflater.BEST_SPEED);

この圧縮アルゴリズムは、実行速度に優先順位を付けます。実行速度は最小限のCPUリソースを使用しますが、圧縮率も低下します。つまり、出力ファイルが大きくなります。可能なかぎり圧縮が必要で、バイトを圧縮するために処理時間が長くなる可能性がある場合は、 Deflater.BEST_COMPRESSIONを使用します。これらの圧縮オプションは、アプリケーション、データ型、データ・サイズまたはその他の要因に応じて、圧縮と速度比のバランスをとるために使用できる範囲に含まれています。「フィールド・サマリー」セクションでこれらすべてを確認できます

データ圧縮を使用する送信者のサンプルを次に示します。

DataOutputStream dos = 
    new DataOutputStream( conn.getOutputStream() );
byte[] bytes = messageTxt.getBytes("UTF-8");

// バイトを圧縮する
Deflater compressor = new Deflater(Deflater.BEST_SPEED);
compressor.setInput(bytes);
compressor.finish();
byte[] compressed = new byte[bytes.length];
length = compressor.deflate(compressed);

// 圧縮されたデータを送信する 
dos.write(compressed, 0, length);
dos.flush();

コードは、 DataOutputStream と一部のメッセージ・テキストを使用して簡単に開始します。メッセージが長いため、送信するバイト数が多いと想定します。

次に、最適な処理速度のための Deflater セットを作成します。前述の例では、入力を設定してバイトを追加し、 finish() メソッドをコールしています。このクラスは、データ・ストリームを操作することもできます。後続の deflate() のコールによって、指定された配列にバイトが圧縮され、新しい(より小さい)長さが返されます。最後に、圧縮されたバイトがネットワーク経由で送信されます。

1つのテスト・アプリケーションで、約100KBのメッセージを作成し、それぞれ500バイトを超えるまで圧縮しました。これは、ネットワークI/Oの時間と帯域幅の点で大きな節約になります。

次のコードは、受信側の終端のバイトを読み取って解凍します。

// バイトを読み取る
DataInputStream dis = new DataInputStream( instream );
byte[] compressed = new byte[ dis.available() ];
dis.readFully(compressed);

// バイトを解凍する
Inflater decompressor = new Inflater();
decompressor.setInput(compressed);
byte[] msgBytes = new byte[DEFAULT_SIZE];
decompressor.inflate(msgBytes);

String msg = new String(msgBytes);
System.out.println(msg);

最初に、受信バイトを格納するバイト配列が作成されます。次に、 Inflater クラスが使用されます。圧縮バイトを提供するために setInput()メソッドが呼び出され、指定された配列にバイトを解凍するために inflate() が呼び出されます。結果のバイトは、元の文字列を再作成するために使用されます。

 

柔軟性と予測可能性の追加

上記のプロセスはうまくいっていますが、改良点は2つあります。1つ目は、不要な場合ではなく、意味がある場合にのみデータを圧縮する柔軟性を追加することです。2つ目は、文字列の解凍に必要なバイト配列のサイズを送信することです。

getRemaining() およびその他の Inflater メソッドを使用してデータをチャンクで読み取ることは非効率的で複雑です。圧縮されていないデータ・サイズと圧縮されたデータ・サイズの両方をデータ・ストリーム自体の int 値として送信することをお薦めします。つまり、到着するビットは、表1に示されているビットのようになります。

表 1. 開始ビット・サイズおよび終了ビット・サイズ

Table of starting and ending bit sizes

サイズを決定することで、適切な条件下でのみデータを圧縮するという観点で、実行時の柔軟性を実現できます。たとえば、メッセージ・サイズに基づいて圧縮を決定できます。数バイトのみの場合は、問題になる必要はありません。

拡張送信コードは次のようになります。

DataOutputStream dos = 
    new DataOutputStream( conn.getOutputStream() );
byte[] bytes = messageTxt.getBytes("UTF-8");

// 元のメッセージの長さを書き込む
int length = bytes.length;
dos.writeInt(length);

if ( length > LENGTH_THRESHOLD ) {
    // バイトを圧縮する
    Deflater compressor = new Deflater(Deflater.BEST_SPEED);
    compressor.setInput(bytes);
    compressor.finish();
    byte[] compressed = new byte[bytes.length];
    length = compressor.deflate(compressed);
}
else {
    compressed = bytes;
}

// もう一度長さを書いてください。 圧縮されている場合は
// サイズが異なるので、受信者側で解凍する必要があることを示します。
// 受信者が解凍する必要があります。
dos.writeInt(length);

// データバイトの書き込み
dos.write(compressed, 0, length);
dos.flush();

もちろん、受信者も変える必要があります。更新されたコードを次に示します。

DataInputStream dis = new DataInputStream( instream );

// 次のメッセージの長さを取得します
int msgSize = dis.readInt();

// 圧縮されたサイズを取得する(圧縮されている場合
// このサイズは上記のサイズと異なります)
int compressedSize = dis.readInt();

// バイトを読み取る
byte[] compressed = new byte[compressedSize];
dis.readFully(compressed);

byte[] msgBytes = compressed;
if (compressedSize != msgSize) {
    // バイトを解凍する
    Inflater decompressor = new Inflater();
    decompressor.setInput(compressed);
    msgBytes = new byte[DEFAULT_SIZE];
    decompressor.inflate(msgBytes);
}

String msg = new String(msgBytes);
System.out.println(msg);

ご覧のとおり、変更は最小限ですが、結果として非常に最適化基準を満たしたときに、決定論的にデータを圧縮してI/Oやネットワークのオーバーヘッドを削減する、非常に柔軟で効率的なコードになっています。

 

まとめ

Javaを使用すると、I/Oバウンド・アプリケーションなどの悪条件を賢く克服できます。このコードは、Javaを使用したよりインテリジェントなネットワーキングの基盤を提供します。ここでは、圧縮を調整してサーバーのスループットを最適化できます。試行錯誤の結果であり、実験が大きな改善につながる可能性があることを示しています。

 

より詳細な情報