X

A blog about Oracle Technology Network Japan

Jakarta EEで始める並列プログラミング

Guest Author

※本記事は、Josh Juneauによる"Get started with concurrency in Jakarta EE"を翻訳したものです。


優れたアプリケーションの要の1つである良好なパフォーマンスというのは、複数のタスクをパラレルに同時実行できることを指す場合が多い

著者:Josh Juneau
2020年6月22日

Jakarta EEプラットフォームでアプリケーションやサービスを開発するとき、重要なツールとなるのがJakarta Concurrency APIです。このAPIによって、ユーザーにシームレスな操作を提供するために使用できるさまざまな戦略が実現します。さらに、Java SEのConcurrency API(java.util.concurrent)に大変よく似ているため、これら2つのAPIでの相互移行は容易です。本記事では、Jakarta EEプラットフォームで適格に動作する堅牢なアプリケーションを開発するために使用できる多くの戦略について説明します。

最初に、並列実行の中核的な概念であるスレッド処理について、簡単に確認しておきます。スレッドは、アプリケーションの処理の実行をカプセル化したものです。JVMでは、複数のスレッドを同時に処理できます。スレッドには優先順位があり、優先順位の高いスレッドは優先順位の低いスレッドよりも先に実行可能となっています。

このようなスレッド処理は、Java SEアプリケーションの開発では非常に良好に機能します。しかし残念ながら、サーバー環境で複数のスレッドを無制限に生成するというのは、受け入れられないソリューションです。環境に提供されているすべてのメモリを1つのスレッドが消費してしまう可能性もあるからです。さらに、コンテナでは同時に複数のアプリケーションを実行できます。複数のスレッドが実行された場合、コンテナ内の他プロセスのリソースが枯渇する可能性も高まるでしょう。Jakarta EEサーバー環境では、タスクの実行を管理する多くのサービスを提供しています。そのため、Java SE環境で複数のスレッドを生成するのと同じようにして、タスクをそれらのサービスに渡すことができます。

なお、本記事のすべてのコードは、GitHubで公開されています。

 

並列プログラミングを使ってみる

完全なJakarta EEプラットフォームには、Jakarta Concurrency APIのサポートが含まれているため、このAPIを使ってみるのは簡単です。Jakarta EEに完全対応しているアプリケーション・サーバーやコンテナには、必ずこのAPIが含まれています。Jakarta EE Web Profileや、Jakarta EEプラットフォームの一部のみを使っている場合、MavenプロジェクトのPOMファイルに以下を追加するだけで必要な依存性を取得することができます。

<dependency>
    <groupId>jakarta.enterprise.concurrent</groupId>
    <artifactId>jakarta.enterprise.concurrent-api</artifactId>
    <version>2.0.0-RC1</version>
</dependency>

コンテナは、Jakarta Concurrency APIを使ううえで重要な役割を果たします。コンテナには、バックグラウンドでタスクを実行するために使うサービスが含まれているからです。これは、Java SEのデスクトップ・ソリューションで、長時間実行されるタスクをバックグラウンド・スレッドで実行することに似ています。つまり、対応しているJakarta EEコンテナには、このAPIを使えるサービスがデフォルトで存在するということです。ただし、このサービスをカスタマイズすることや、さまざまなアプリケーションで使用するために複数のサービスを作成することも可能です。

注:Jakarta Concurrency 1.1を使っている場合、パッケージのネーミング規則はjavax.enterprise.concurrent.*です。一方、リリース2.0.0以降を使っている場合、パッケージの名前はjakarta.concurrent.*に変更されています。本記事では、Jakarta Concurrency 1.1を含むJakarta EE 8を使用します。

 

コンテナを構成する

対応するコンテナには、Jakartaの並列プログラミングに使用するための、以下に示すサービスが含まれています。

  • ManagedExecutorService:Java SE Concurrency APIの一部であるExecutorServiceを拡張したサービスです。タスクをサーバーに送信し、非同期式に処理するために使います。このサービスがタスクの送信を終えると、コール元にFutureオブジェクトが返ります。
  • ManagedScheduledExecutorService:Java SE Concurrency APIの一部であるScheduledExecutorServiceを拡張したサービスです。遅延タスクや定期タスクを送信して処理するために使います。このサービスがタスクの送信を終えると、コール元にFutureオブジェクトが返ります。
  • ManagedThreadFactory:Java SE Concurrency APIの一部であるThreadFactoryを拡張したサービスです。このサービスの主な目的は、タスクをサーバーに渡して処理できるように、アプリケーション用のマネージド・スレッドを生成することです。Java SEのスレッド管理と非常によく似た形で機能します。
  • ContextService:このサービスでは、コンテキストを意識したダイナミック・プロキシ・オブジェクトを生成する方法を提供します。本記事では扱いません。

これらのサービスに送信されたタスクは、コール元と同じアプリケーション・コンテキスト内で実行されます。そのため、タスクはコール元のアプリケーションと同じリソースにアクセスすることができ、簡単に状態管理やデータ共有などを行うことができます。

対応するすべてのJakarta EEコンテナでは、事前構成されたリソースを利用できますが、カスタム・リソースを作成することもできます。たとえば、必要とするそれぞれのアプリケーションやサービス用にカスタムのManagedExecutorServiceを生成するという方法も妥当かもしれません。マイクロサービス環境では、1つのコンテナに1つのサービスのみがデプロイされるのが一般的です。その場合は、デフォルトのリソースで十分でしょう。

対応するすべてのアプリケーション・サーバー・コンテナでは、カスタムのマネージド・リソースを作成する手段が提供されているはずだからです。また、複数の手段を提供しているものも多くあります。たとえば、Payaraサーバーを使う場合、管理コンソールやCLIからいずれかのリソースを作成できます。CLIからManagedExecutorServiceを作成する場合は、asadminユーティリティを使って次のコマンドを発行します。

bin/asadmin create-managed-executor-service concurrent/name-of-service

 

Jakartaの並列実行のユースケース

並列実行にはさまざまなユースケースがあります。たとえば、複数のアクションを非同期式に実行できるサーバー側タスクの作成です。また、リモート・サーバー上にある大規模なデータセットなどの高価なリソースを取得する場合に、シームレスなフロントエンド・エクスペリエンスを提供するというものもあります。

以降のセクションでは、特に一般的なユースケースのいくつかについて、コンテナ・リソースを使用する方法を示す例を交えながら説明します。本記事の例では、スポーツ・チームの選手リストを管理するアプリケーションを構築します。一元管理されているRDBMSをデータ・ストレージとして使い、選手リストのデータを表示するレポートを提供します。ユーザー・インタフェースはJakarta Server Facesとステートレス・ビューを使って開発します。一方、データベースを呼び出すバックエンド・ロジックは一連のサービスであり、それぞれのサービスが別々のJakarta EEアプリケーションです。なお、注意すべき重要な点は、並列実行するリソースをXMLまたはアノテーションで構成できることです。本記事では、アノテーションを使用します。

 

ManagedExecutorService

Jakarta Concurrency APIで特によく使われるサービスが、ManagedExecutorServiceです。クラスでこのサービスを使う場合、RunnableおよびCallableという2種類のインタフェースを実装できます。Runnableインタフェースは、処理するタスクをManagedExecutorServiceに送信する際に使用できます。その結果として返されるFutureを使用して、ジョブが完了したタイミングを判定することができます。Callableの実装も同じように使いますが、こちらでは結果を返すことができます。

次の例では、アプリケーション・ユーザー・インタフェース内のボタンを使ってレポートを呼び出しています。その際に、レポートを実行するバックグラウンド・プロセスが生成されます。そのプロセスが完了すると、結果がユーザーに返されます。この例では、Runnableを実装するクラスを生成しています。このクラスでは、データベースに問合せを行って結果を取得するWebサービスを呼び出すために使うロジックをカプセル化しています。リスト1のコードはReportRunnableクラスです。このクラスではRunnableを実装しているため、run()メソッドの実装を含んでいます。このメソッドで、長い時間がかかるタスクを実行します。

リスト1:

public class ReportRunnable implements Runnable {

    private static Logger log = LogManager.getLogger();
    private WebTarget resource;
    private String reportName;
    private List<Roster> rosterList;

    public ReportRunnable(String reportName) {
        this.reportName = reportName;
    }

    /**
     * このメソッドは、名前付きレポートを実行するためにオーバーライドされている。
     */
    @Override
    public void run() {
        if ("RosterReport".equals(reportName)) {
            invokeRosterReport();

        } else if ("DifferentReport".equals(reportName)) {
            System.out.println("running different report...");

        }
    }

    /**
     * 選手リストを返すWebサービスを呼び出す。
     */
    protected void invokeRosterReport() {
        // Webサービスの呼出し
        resource = Utilities.obtainClient(Constants.ROSTER_URI, "roster").path("findAll");
       
        setRosterList(resource.request(javax.ws.rs.core.MediaType.APPLICATION_XML)
                .get(new GenericType<List<Roster>>() {
                }));
        rosterList.stream().forEach(r -> System.out.println(r.getFirstName() + " " + r.getLastName() + " - " + r.getPosition()));
    }

    /**
     * @return rosterList
     */
    public List<Roster> getRosterList() {
        return rosterList;
    }

    /**
     * @param rosterList 設定するrosterList
     */
    public void setRosterList(List<Roster> rosterList) {
        this.rosterList = rosterList;
    }

}

 

ユーザー・インタフェースから話を続けますと、Roster Reportビューには、レポートを起動するために使用するボタンが含まれています。そのボタンのコードを、rosterReport.xhtmlから抜粋して次に示します。

<p:commandButton id="rosterReport"
       actionListener="#{rosterController.invokeRosterReport}"
        value="Roster Report"/>

このボタンが押されると、actionListenerによって、コントローラ・クラスのinvokeRosterReport()というアクション・メソッドが初期化されます。コントローラ・クラスはRosterControllerという名前(リスト2参照)で、ViewScopedです。これは、ビューにアクセスがあるたびにコンテキストが再生成され、クラスのスコープが再起動することを意味します。ビューが閉じられると、スコープも消滅します。

リスト2:

@Named
@ViewScoped
public class RosterController implements java.io.Serializable {

    @Resource
    private ManagedExecutorService mes;
    
    @Resource
    private ManagedThreadFactory mtf;
    
    Thread rosterThread = null;

    private static Logger log = LogManager.getLogger();
    private WebTarget resource;

    private List<Roster> rosterList;

    private Roster current;

    private boolean managePlayer = false;

    public RosterController() {

    }

    @PostConstruct
    public void init() {
        populateRosterList();
    }

    /**
     * ManagedExecutorServiceの例。このアクション・メソッドは、Roster Listビューで
     * Refresh Listボタンが押されたときに呼び出される。ここでは、
     * 選手リストの人数の呼出しをManagedExecutorServiceに
     * 送信して処理を行う。
     *
     */
    public void refreshRosterList() {

    }

    /**
     * List<Roster>を設定する。
     */
    public void populateRosterList() {
        resource = Utilities.obtainClient(Constants.ROSTER_URI, "roster").path("findAll");
        System.out.println(resource.getUri());
        setRosterList(resource.request(javax.ws.rs.core.MediaType.APPLICATION_XML)
                .get(new GenericType<List<Roster>>() {
                }));
    }

    /**
     * IDを受け取り、対応する<code>Roster</code>を返す。
     *
     * @param id
     */
    public void findById(int id) {
        resource = Utilities.obtainClient(Constants.ROSTER_URI, "roster");
        resource = resource.path(java.text.MessageFormat.format("findById/{0}", new Object[]{id}));
        current = resource.request(javax.ws.rs.core.MediaType.APPLICATION_XML)
                // .cookie(HttpHeaders.AUTHORIZATION, authenticationController.getSessionToken())
                .get(
                        new GenericType<Roster>() {
                });
    }

    public String addPlayer() {
        String returnPage = null;
        resource = Utilities.obtainClient(Constants.ROSTER_URI, "roster").path("add");
        Form form = new Form();
        form.param("firstName", current.getFirstName().toUpperCase());
        form.param("lastName", current.getLastName().toUpperCase());
        form.param("position", current.getPosition().toUpperCase());
        Invocation.Builder invocationBuilder = resource.request(MediaType.APPLICATION_XML);
        //  .cookie(HttpHeaders.AUTHORIZATION, authenticationController.getSessionToken());
        Response response = invocationBuilder.post(Entity.entity(form, MediaType.APPLICATION_FORM_URLENCODED_TYPE), Response.class);
        if (response.getStatus() == Status.CREATED.getStatusCode() || response.getStatus() == Status.OK.getStatusCode()) {
            log.info("Successful roster Entry");
            Utilities.addSuccessMessage("Player Successfully Added");
            rosterList = null;
            populateRosterList();
            returnPage = "index";
        } else {
            log.error("Player entry error");
            Utilities.addErrorMessage("Error entering player, please try again.If issue persists, please contact service desk.");
        }
        return returnPage;
    }

    public void remove(Roster player) {
        resource = Utilities.obtainClient(Constants.ROSTER_URI, "roster");
        resource = resource.path(java.text.MessageFormat.format("/{0}", new Object[]{player.getId()}));
        try {
            resource.request().delete();

            Utilities.addSuccessMessage("Removed Player");
            rosterList = null;
            populateRosterList();
        } catch (Exception e) {
            Utilities.addErrorMessage("Error Removing Player");
        }
    }

    public void updatePlayer() {
        resource = Utilities.obtainClient(Constants.ROSTER_URI, "roster");
        resource = resource.path(java.text.MessageFormat.format("/{0}", new Object[]{current.getId()}));

        Response response
                = resource.request().put(Entity.entity(current, MediaType.APPLICATION_XML));
        if (response.getStatus() == Status.OK.getStatusCode()) {
            Utilities.addSuccessMessage("Updated Player");
            rosterList = null;
            managePlayer = false;
            populateRosterList();
        } else {
            Utilities.addErrorMessage("Error Removing Player");
        }
    }

    public void manage(Roster player) {
        current = player;
        managePlayer = true;
    }

    public void cancelAction() {
        managePlayer = false;
        current = null;
        rosterList = null;
        populateRosterList();
    }

    public void clear(AjaxBehaviorEvent event) {
        cancelAction();
    }

    public void invokeRosterReport() {
        ReportRunnable rosterReport = new ReportRunnable("RosterReport");
        /*
             * 通常、このFutureオブジェクトは、どこかにキャッシュしてから、
             * 定期的にポーリングしてタスクのステータスを取得する必要がある。
         */
        Future reportFuture = mes.submit(rosterReport);
        while (!reportFuture.isDone()) {
            System.out.println("Running...");
        }
        if (reportFuture.isDone()) {
            System.out.println("Report Complete");

        }
    }
    
    public void invokeThreaddedRosterReport(){
        RosterRunnable rosterReport = new RosterRunnable();
        /*
             * 通常、このFutureオブジェクトは、どこかにキャッシュしてから、
             * 定期的にポーリングしてタスクのステータスを取得する必要がある。
         */
        rosterThread =mtf.newThread(rosterReport);
        rosterThread.start();
    }

    /**
     * @return rosterList
     */
    public List<Roster> getRosterList() {
        return rosterList;
    }

    /**
     * @param rosterList 設定するrosterList
     */
    public void setRosterList(List<Roster> rosterList) {
        this.rosterList = rosterList;
    }

    /**
     * @return current
     */
    public Roster getCurrent() {
        if (current == null) {
            current = new Roster();
        }
        return current;
    }

    /**
     * @param current 設定するcurrent
     */
    public void setCurrent(Roster current) {
        this.current = current;
    }

    /**
     * @return managePlayer
     */
    public boolean isManagePlayer() {
        return managePlayer;
    }

    /**
     * @param managePlayer 設定するmanagePlayer
     */
    public void setManagePlayer(boolean managePlayer) {
        this.managePlayer = managePlayer;
    }

}

 

ManagedExecutorServiceリソースは、@ResourceアノテーションによってRosterControllerに注入されます。注入が行われたら、Jakarta Concurrency APIを使って、必要なインタフェースを実装しているクラスを呼び出すことができます。invokeRosterReport()メソッドで、その呼出しが行われています。すなわちこのメソッドでは、ReportRunnableクラスのインスタンスが作成され、注入されたManagedExecutorServiceリソースにそのインスタンスが渡されています。

次の抜粋で示すのは、サービスを呼び出して、Futureオブジェクトが返されている部分です。そして、生成されたタスクの実行が完了したタイミングを判定するため、このFutureに対してポーリングが行われています。

ReportRunnable rosterReport = new ReportRunnable("RosterReport");
Future reportFuture = mes.submit(rosterReport);
while (!reportFuture.isDone()) {
    System.out.println("Running...");
}
if (reportFuture.isDone()) {
    System.out.println("Report Complete");
}

トランザクション:バックグラウンド・タスクを生成する際に懸念される点の1つに、トランザクションの使用があります。トランザクションは、タスクの一部が失敗した場合に、すでに完了しているかもしれない他の部分をロールバックできるようにして、データ整合性を保証する仕組みです。トランザクションを作成して管理するためには、jakarta.transaction.UserTransactionを使用します。このインタフェースを使う場合、@Resourceを使ってクラスにUserTransactionを注入する必要があります。その後、UserTransaction.begin()メソッドを呼び出して新しいトランザクションを作成することができます。また、commit()メソッドを使用してトランザクションを終えることができます。必要に応じて、rollback()メソッドを使ってロールバックすることもできます。リスト3に、RESTfulサービスを呼び出して応答を待つ際にトランザクションを使ったクラスを示します。

リスト3:

public class ReportRunnableTransaction implements Runnable {

    private static Logger log = LogManager.getLogger();
    private WebTarget resource;
    private String reportName;
    private List<Roster> rosterList;
    
    @Resource
    UserTransaction ut;

    public ReportRunnableTransaction(String reportName) {
        this.reportName = reportName;
    }

    /**
     * このメソッドは、名前付きレポートを実行するためにオーバーライドされている。
     */
    @Override
    public void run() {
        if ("RosterReport".equals(reportName)) {
            try {
                ut.begin();
                invokeRosterReport();
                // ここで何らかのデータ・トランザクションを実行
                ut.commit();
            } catch (NotSupportedException|RollbackException|
SystemException|HeuristicMixedException|
HeuristicRollbackException|SecurityException|
IllegalStateException ex) {
                java.util.logging.Logger.getLogger(
     ReportRunnableTransaction.class.getName())
    .log(Level.SEVERE, null, ex);
            }

        } else if ("DifferentReport".equals(reportName)) {
            System.out.println("running different report...");

        }
    }

    /**
     * 選手リストを返すWebサービスを呼び出す。
     */
    protected void invokeRosterReport() {
        // Webサービスの呼出し
        resource = Utilities.obtainClient(
      Constants.ROSTER_URI, "roster").path("findAll");
       
        setRosterList(resource.request(
     javax.ws.rs.core.MediaType.APPLICATION_XML)
                .get(new GenericType<List<Roster>>() {
                }));
        rosterList.stream().forEach(r -> System.out.println(r.getFirstName() + " " + r.getLastName() + " - " + r.getPosition()));
    }

// getterとsetter

}

 

複数の非同期タスクを扱う:複数のタスクを非同期式に実行しなければならないこともあるでしょう。そのような場合には、ManageExecutorService.invokeAll()メソッドを使用します。複数のタスクを送信して非同期実行するためには、実行可能なタスクをArrayListに設定し、その配列をinvokeAll()に渡します。これにより、タスクの完了チェックや結果の取得を目的とした、Futureオブジェクトのリストが返されます。Futureを同じ方式で扱えるように、それぞれのタスク・クラスは共通の形式に準拠する必要があります。この例では、非同期呼出しの対象となるそれぞれのタスク・クラスで実装する型クラスを作成しています。この型で定義されたフィールドやメソッドは、後ほど結果を処理する際に使用できます。

まずは、共通のフィールドおよびメソッドを含む型クラスを作成します。次のコードは、RosterInfoという名前の型クラスを示しています

public class RosterInfo {
    public String team;
    public List<Roster> players = null;

    public RosterInfo(String team,
                 List<Roster> players){
        this.team = team;
        this.players = players;
    }
}

次に、型クラスを実装するタスク・クラスを作成します。今回の場合、タスク・クラスではCallableを実装します。こうすることで、Futureの結果を返すことができ、さらにチェック例外も使用できます。この例のタスク・クラスは、RosterTaskという名前です。リスト4にコード全体を示します。このクラスのコンストラクタでは、問合せを行う選手リストが属するチームを特定するIntegerを受け取ります。

リスト4:

public class RosterTask implements Callable<RosterInfo>, ManagedTask {
    // オンデマンドでレポートを行うためのリクエストのID
    Integer teamId;
    RosterInfo rosterInfo;
    private WebTarget resource;
    Map<String, String> execProps;

    public RosterTask(Integer id) {
        this.teamId = id;
        execProps = new HashMap<>();
       
        execProps.put(ManagedTask.IDENTITY_NAME, getIdentityName());
    }

    public RosterInfo call() {
        // Webサービスの呼出し
        
        resource = Utilities.obtainClient(Constants.ROSTER_URI, "team");
        resource = resource.path(java.text.MessageFormat.format("{0}", new Object[]{teamId}));
        Team team = null;
        team = (resource.request(javax.ws.rs.core.MediaType.APPLICATION_XML)
                .get(new GenericType<Team>() {
                }));
        resource = Utilities.obtainClient(Constants.ROSTER_URI, "roster");
        resource = resource.path(java.text.MessageFormat.format("findByTeam/{0}", new Object[]{teamId}));
        List<Roster> playerList = null;
        playerList = (resource.request(javax.ws.rs.core.MediaType.APPLICATION_XML)
                .get(new GenericType<List<Roster>>() {
                }));
       
        return new RosterInfo(team.getName(), playerList);
    }

    public String getIdentityName() {
        return "RosterTask:TeamID=" + teamId;
    }

    public Map<String, String> getExecutionProperties() {
        return execProps;
    }

    public String getIdentityDescription(Locale locale) { 
        // リソース・バンドルの使用...
        return "RosterTask asynchronous REST service invoker";
    }

    @Override
    public ManagedTaskListener getManagedTaskListener() {
        return new CustomManagedTaskListener();
    }


} 

 

このクラスでは、Callable<RosterInfo>とManagedTaskを実装している点に注意してください。クラスでCallable<RosterInfo>を実装することにより、RosterInfo型に含まれる定義への準拠が強制されます。ManagedTaskによって追加機能が実現しますが、この機能は必須ではありません。このインタフェースについては、次のセクションで説明します。

public class RosterTask
         implements Callable<RosterInfo>, ManagedTask

Callableを作成するときは、インタフェースに付随する型を返すcall()メソッドを実装しなければなりません。その型には、結果を扱うために必要な定義を含めます。call()メソッドは、クラスの実装を含み、結果を返します。この例では、call()経由でRosterServiceに問合せを行い、タスク・クラスのコンストラクタに渡された整数で特定されるチームの選手リストを取得しています。その後、RosterInfoの新しいインスタンスを作成し、選手リストに関連する情報を設定したうえで返却しています。

リスト5:

@WebServlet(name = "BuilderServlet", urlPatterns = {"/builderServlet"})
public class BuilderServlet extends HttpServlet implements Servlet {
    // エグゼキュータのインスタンスを取得

    @Resource(name = "concurrent/BuilderExecutor")
    ManagedExecutorService mes;
    RosterInfo rosterInfoHome;

    protected void processRequest(HttpServletRequest req, HttpServletResponse resp)
            throws ServletException, IOException {
        try {
            PrintWriter out = resp.getWriter();
            // タスクのインスタンスを作成
            ArrayList<Callable<RosterInfo>> builderTasks = new ArrayList<Callable<RosterInfo>>();
            builderTasks.add(new RosterTask(1));
            builderTasks.add(new RosterTask(2));

            // タスクを送信して待機
            List<Future<RosterInfo>> taskResults = mes.invokeAll(builderTasks);
            ArrayList<RosterInfo> results = new ArrayList<RosterInfo>();
            for (Future<RosterInfo> result : taskResults) {
                out.write("Processing Results...");
                while (!result.isDone()) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                results.add(result.get());

            }
            out.write("** Results Processed Successfully **");
            for (RosterInfo result : results) {
                if (result != null) {
                    System.out.println("===========================");
                    System.out.println("Team: " + result.team);
                    System.out.println("===========================");
                    for(Roster roster:result.players){
                        System.out.println(roster.getFirstName() + " " +
                                roster.getLastName() + " - " + roster.getPosition());
                    }
                }
            }
        } catch (InterruptedException | ExecutionException ex) {
            Logger.getLogger(BuilderServlet.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
// HttpServletのメソッド
}

 

この例では、サーブレットを使って複数のRosterTaskインスタンスを起動し、コンストラクタに毎回異なる整数を渡して、異なるチームの結果を返すようにしています。ArrayListに、タスク・クラスの各インスタンスを含めています。このリストをManagedExecutorService invokeAll()に渡し、タスクを実行しています。サーブレット全体については、リスト5を参照してください。

ArrayList<Callable<RosterInfo>> builderTasks =
    new ArrayList<Callable<RosterInfo>>();
builderTasks.add(new RosterTask(1));
builderTasks.add(new RosterTask(2));
// タスクを送信して待機
List<Future<RosterInfo>> taskResults = 
   mes.invokeAll(builderTasks);

 

結果を取得するためには、タスクの完了を一定時間待機してから、結果をリストに追加します。この例では、whileループとThread.sleep()を使って、Future.isDone()をチェックしています。

List<Future<RosterInfo>> taskResults 
     = mes.invokeAll(builderTasks);
ArrayList<RosterInfo> results 
     = new ArrayList<RosterInfo>();
for (Future<RosterInfo> result :
         taskResults) {
    while (!result.isDone()) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    results.add(result.get());
}

 

最後に、結果のリストを反復処理します。

for (RosterInfo result : results) {
    if (result != null) {
        . . .
        for(Roster roster:result.players){
            System.out.println(
                 roster.getFirstName()
                   . . . );
        }
    }
}

 

ManagedTaskの実装:ManagedTaskインタフェースによって、タスクを識別する情報を取得することや、ManagedTaskListenerを提供してライフサイクル情報を取得することや、追加の実行プロパティを提供することができます。たとえば、RosterTaskクラスではManagedTaskを実装しているため、getManagedTaskListener()をオーバーライドすることができます。このメソッドでは、カスタムのタスク・リスナーを返すことができます。次のコードをご覧ください。

@Override
    public ManagedTaskListener
      getManagedTaskListener() {
      return 
     new CustomManagedTaskListener();
    }

CustomManagedTaskListenerクラス(リスト6)では、ManagedTaskListenerを実装しています。これにより、taskSubmitted()、taskAborted()、taskDone()などのメソッドのオーバーライドが可能になります。そのため、こういったライフサイクル・アクションが実行されるタイミングで、カスタムの処理を起動できます。

リスト6:

public class CustomManagedTaskListener implements ManagedTaskListener {

    @Override
    public void taskSubmitted(Future<?> future, ManagedExecutorService mes, Object o) {
        System.out.println("Task Submitted");
    }

    @Override
    public void taskAborted(Future<?> future, ManagedExecutorService mes, Object o, Throwable thrwbl) {
        System.out.println("Task Aborted");
    }

    @Override
    public void taskDone(Future<?> future, ManagedExecutorService mes, Object o, Throwable thrwbl) {
        System.out.println("Task Complete");
    }

    @Override
    public void taskStarting(Future<?> future, ManagedExecutorService mes, Object o) {
        System.out.println("Task Starting");
    }
    
}

 

ManagedScheduledExecutorService

通常、レポート生成などの長時間かかるタスクは、営業時間外などの決められたスケジュールで実行するのが合理的です。ManagedScheduledExecutorServiceにより、スケジュールに基づいたこのようなタスクを、ManagedExecutorServiceと非常によく似た方法で実行できます。

この例では、Rosterアプリケーションから、さまざまなチームの選手リストをあらかじめ指定した間隔で1日中ポーリングしています。ここでは、先ほどの例と同じスタイルのレポート・クラスを使っています。このクラスではRunnableを実装しているため、run()メソッドをオーバーライドしてレポートのロジックを初期化しています。リスト7には、ReportRunnableという名前のクラスのコードが含まれています。このクラスのコンストラクタでは文字列ベースのレポート名を受け取っています。その後、サービスを呼び出して結果を取得することで、レポートの結果を生成しています。

リスト7:

public class RosterRunnable implements Runnable {

    private static Logger log = LogManager.getLogger();
    private WebTarget resource;
    private List<Roster> rosterList;

    public RosterRunnable() {
    }

    /**
     * このメソッドは、名前付きレポートを実行するためにオーバーライドされている。
     */
    @Override
    public void run() {
       obtainRoster();
    }

    /**
     * 選手リストを返すWebサービスを呼び出す。
     */
    protected void obtainRoster() {
        // Webサービスの呼出し
        resource = Utilities.obtainClient(Constants.ROSTER_URI, "roster").path("findAll");
       
        setRosterList(resource.request(javax.ws.rs.core.MediaType.APPLICATION_XML)
                .get(new GenericType<List<Roster>>() {
                }));
        rosterList.stream().forEach(r -> System.out.println(r.getFirstName() + " " + r.getLastName() + " - " + r.getPosition()));
    }

    /**
     * @return rosterList
     */
    public List<Roster> getRosterList() {
        return rosterList;
    }

    /**
     * @param rosterList 設定するrosterList
     */
    public void setRosterList(List<Roster> rosterList) {
        this.rosterList = rosterList;
    }

}

 

このソリューションで重要な部分は、ReportRunnableを呼ぶコードです。ReportRunnableは、ManagedScheduledExecutorServiceを経由して呼び出されているからです。この例では、ScheduledRosterReportRunnerクラスで定期的にレポートを起動しています。このクラスは、アプリケーションのデプロイ時と起動時に一度だけ呼び出され、次のコードによってレポートのスケジュールを設定しています。完全なコードは、リスト8を参照してください。

@PostConstruct
public void rosterScheduler() {
    System.out.println("Scheduling report...");
    ReportRunnable rosterReport
        = new ReportRunnable("RosterReport");
    rosterHandle = mes.scheduleAtFixedRate(
        rosterReport, 5L, 5L, TimeUnit.MINUTES);
    System.out.println("Report scheduled....");
}

例を見ればわかるように、並列タスクの実行をスケジューリングする鍵となるのはscheduleAtFixedRate()メソッドです。このメソッドでは、最初のパラメータとしてRunnableを、続いて最初の遅延(Long)、期間(Long)、TimeUnitを受け取ります。または、前回タスクの実行終了から次回タスクの開始までの待機時間をscheduleWithFixedDelay()に指定して、スケジュールを設定することもできます。

リスト8:

@Startup
@Singleton
@ApplicationScoped
public class ScheduledRosterReportRunner {

    Future rosterHandle = null;

    @Resource(name = "concurrent/_defaultManagedScheduledExecutorService")
    ManagedScheduledExecutorService mes;

    public ScheduledRosterReportRunner(){
        
    }
    
    @PostConstruct
    public void rosterScheduler() {
        System.out.println("Scheduling report...");
        ReportRunnable rosterReport = new ReportRunnable("RosterReport");

        rosterHandle = mes.scheduleAtFixedRate(rosterReport, 5L, 5L, TimeUnit.MINUTES);

        System.out.println("Report scheduled....");

    }

}

 

ManagedThreadFactory

ManagedThreadFactoryは、Jakarta EE環境でスレッドを作成するために導入されました。エンタープライズ・アプリケーションでベアボーンのスレッドを実装するためには、クラスにManagedThreadFactoryを注入してから、newThread()メソッドを呼び出して新しいスレッドを作成します。newThread()メソッドはjava.util.concurrent.ThreadFactoryから継承されているため、Java SE環境でスレッドを扱っている開発者には非常になじみのある実装です。

. . . 
@Resource
    private ManagedThreadFactory mtf;
. . .
public void invokeThreaddedRosterReport(){
        RosterRunnable rosterReport 
          = new RosterRunnable();
        rosterThread = mtf.newThread(rosterReport);
        rosterThread.start();
  }

 

まとめ

Jakarta Concurrency APIでは、エンタープライズ・アプリケーションで並列ソリューションを実現する多くのオプションや、スレッドの生成、タスクの管理、タスクのスケジューリングの各機能が提供されます。このAPIを使うためには、完全なJakarta EE 8プロファイルをロードするか、必要な依存性をプロジェクトのAPIに含めます。Jakarta EE 8にはJakarta Concurrency 1.1が含まれており、Jakarta EE 9にはJakarta Concurrency 2.0が含まれる予定になっています。
詳しくは、以下を参照してください。


Josh Juneau

Josh Juneau(@javajuneau):アプリケーション開発者、システム・アナリスト、データベース管理者。主に、Javaやその他のJVM言語を使った開発に従事。Oracle Technology NetworkやJava Magazineで多くの記事を執筆し、JavaやJava EEに関する複数の書籍をApressから出版している。JSR 372およびJSR 378のJCP専門家グループのメンバーを経験。NetBeans Dream Teamメンバー、Java Champion、CJUG OSS Initiativeのリーダーであり、JavaPubHouse Off Heapポッドキャストにレギュラー出演中。

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.