X

A blog about Oracle Technology Network Japan

Elasticsearchで簡単検索

Guest Author

※本記事は、Henry Naftulinによる"Easy Searching with Elasticsearch"を翻訳したものです。


Elasticsearchの高水準/低水準APIを使って同期/非同期検索を行う

 

著者:Henry Naftulin

2020年1月10日

 

Elasticsearchは、Apache Luceneという全文検索ライブラリをベースに作られている、オープンソースの検索エンジンです。Apache Luceneは、索引作成および検索のテクノロジーと、スペルチェック、高度な分析、およびトークン化の機能を提供するJavaライブラリです。Luceneは1999年にSourceForgeプロジェクトとして始まり、2001年にApache Software Foundationに参加しました。そして少なくとも、人気の検索エンジンであるSolrとElasticsearchのバックボーンになっています。いずれの検索エンジンも強力で、それぞれに長所と短所があります。Solrの方が歴史があり、伝統的にドキュメントも充実しています。しかし、テキスト検索だけでなく時系列検索や集計処理も必要になるアプリケーションには、Elasticsearchがお勧めです。本記事では、Elasticsearchのみを取り上げます。SolrエンジンとElasticsearchエンジンの比較に関する詳細については、こちらの記事をご覧ください。

Elasticsearchの学習は広範なトピックです。検索に最適化されたドキュメントの設計、問合せおよび分析、マッピング、クラスタ管理、データ取り込み、セキュリティが含まれます。本記事では、Elasticsearchの中核となる概念を紹介したうえで、Elasticsearch Java APIを使って索引内のドキュメントを作成、更新、削除、検索する方法について詳しく見ていきます。これらの操作を、低水準APIおよび高水準APIの両方で行う方法について説明します。また、タスクを同期的および非同期的に実行する方法についても説明します。さらに、Elasticsearchクラスタにデータをストリーミングする方法も紹介します。この方法は、ストリームやキューなど、大きすぎてメモリにロードすることができないソースからデータを読み取る場合に必要となります。

 

使ってみる

最初に、Elasticsearchをダウンロードします。続いて、インストール先のbinディレクトリに移動し、elasticsearch.batを実行して起動します。Elasticsearchエンジンが起動したら、ログに「started」と出力されます。http://localhost:9200/を開くと、単一ノード・クラスタが起動していることを示すJSONレスポンスが返されます(図1参照)。

JSON response showing an Elasticsearch cluster is running

図1:Elasticsearchクラスタが起動していることを示すJSONレスポンス

 

中核となる概念

これでElasticsearchが起動しました。まずは、用語に親しむため、いくつかの中核的な概念について説明します。理解を助けるため、それぞれの概念を、データベース技術における同等の概念と対比させる形で表1にまとめました。この対比は必ずしも厳密ではありませんが、新しい用語を少しばかり学びやすくしています。

Elasticsearchの概念 SQL databaseの同等の概念 Elasticsearch conceptの説明
Data type Data type テキスト、キーワード、整数、データ、範囲、地理的位置などの型。文書フィールドに適用される
Document Database record JSON文書。第一級オブジェクトとしてElasticsearchに格納される。フィールドを含み、各フィールドは特定のデータ型を持つ
Type   非推奨。以前、似たような文書をまとめておくのに便利な方法
Index Database table (previously was similar to a database, but now that types are gone, it is similar to a table) 同型の文書を保存・管理するインデックス
Node Node Elasticsearch の単一実行インスタンス
Cluster Cluster 同じインデックスを検索するElasticsearchインスタンス。単一ノードであってもElasticsearchではクラスタになる
Shard/primary shard   クラスタの1つのノードに存在するインデックスの一部
Replica/replica shard   高可用性のために別のノードに保存されているシャードの余分なコピー

表1:Elasticsearchとデータベースにおける概念の比較

本記事では、カタログの項目を大量に処理します。ここでは、カタログの項目として、ID、説明、価格、売上ランキング(その項目の人気順位を表した数)があるものとします。カタログの項目は、1つのカテゴリに属し、その製造元であるメーカーが存在します。カテゴリには名前と親カテゴリがあり、メーカーには名前と住所があります。

 

低水準同期CRUD API

それでは、低水準クライアントの作成、読取り、更新、削除(CRUD)APIから見ていきます。低水準クライアントは、最低限のElasticsearch依存性しか必要としません。また、Elasticsearchが提供するRESTエンドポイントAPIに対応しています。したがって、Elasticsearchの新しいリリースでは、低水準クライアントの依存性と下位互換性が保たれることになります。このクライアントが「低水準」と呼ばれる理由は、JSONオブジェクト・リクエストを作成し、そのレスポンスを手動で解析する必要があるためです。メモリが制限されている環境では、このソリューションしか利用できない可能性があります。高水準クライアントAPIは低水準APIを使って作られているため、まず低水準APIから始めるのは合理的です。

低水準Elasticsearchライブラリは、RESTクライアントをインポートするだけで取得できます。Mavenの場合は次のようにします。今回のコードでは、JSONとモデル・クラスとの間のシリアライズおよびデシリアライズを行う際に役立つライブラリもインポートしていますが、ここには記載していません。

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>elasticsearch-rest-client</artifactId>
  <version>7.0.0</version>
</dependency>

 

RESTクライアントを構築するために、RESTクライアント・ビルダーを使用し、クライアントをその通信相手となるホスト(複数可能)に向けています。このクライアントはスレッドセーフであるため、アプリケーションのライフサイクル全体で使用できます。内部的なHTTPクライアントのリソースを解放するためには、アプリケーションがクライアントを使い終わった際にクライアントをクローズする必要があります。一例を挙げれば、try-with-resourcesを使ってクライアントを初期化しています。初期化が完了したら、クライアントをCrudMethodsSynchronousコンストラクタに渡しています。

CrudMethodsSynchronousクラスは、Elasticsearchのcreate/update/delete APIを呼び出すためのラッパーとして使用しています。

public static void main(String[] args) {
  try (RestClient client = RestClient.builder(
          new HttpHost("localhost", 9200, "http")).build()){
              CrudMethodsSynchronous scm = 
             new CrudMethodsSynchronous(
                 "catalog_item_low_level", client);
  }
}

 

Elasticsearchの索引にドキュメントを挿入するため、PUTリクエストを作成してクライアントにリクエストを実行してもらいます。ドキュメントを作成する際に重要な点は、その際に使うHTTPメソッドです。ここで使うことにしたのはPUTです。POSTメソッドを使うこともできます。実際のところ、レコードを作成する際に望ましいメソッドはPOSTであり、レコードを更新する際に望ましいメソッドはPUTです。しかし、今回の場合、このサンプル・プログラムを何度か実行します。その際に、索引をクリアしないこともあるため、PUTの方が望ましいというわけです。ここでは、PUTメソッドをアップサート(upsert。つまり、insertまたはupdate)として使っています。

URIも重要です。例を見てみます。

http://localhost:9200/catalog_item_low_level/_doc/

URIの最初の部分は索引、つまりドキュメントの格納場所で、データベースに相当する部分です。この場合はcatalog_item_low_levelです。2番目の部分は_docとなっていて、ドキュメントを扱うことを示しています。以前は、catalogitemなどのタイプをここで指定していましたが、Elasticsearch 7では、タイプは不要になっています。最後の部分はドキュメントのIDです。なお、索引名は小文字とする必要があることに注意してください。この例では、ドキュメントを1つずつ作成します。後ほど、複数の項目をまとめて作成する方法を紹介します。

public void createCatalogItem(List<CatalogItem> items) {
  items.stream().forEach(e-> {
      
    Request request = new Request("PUT", 
            String.format("/%s/_doc/%d", 
                 getIndex(), e.getId()));
    try {
        request.setJsonEntity(
            getObjectMapper().writeValueAsString(e));
      
        getRestClient().performRequest(request);
      } catch (IOException ex) {
        LOG.warn("Could not post {} to ES", e, ex);
      }
  });
}

項目を作成したら、全文検索で1つの項目を探してみます。今回の例では、「flashlight」を探します。この検索は、ドキュメント内のすべてのフィールドを対象にして行い、いずれかのフィールドにトークンとして「flashlight」が含まれているレコードを返します。ここで注意すべき点は、Elasticsearchにはフィルタと検索という両方の概念があることです。フィルタは高速な検索で、関連性を評価せずに結果を返すものです。一方で検索は、結果を返すとともに、それぞれの結果について関連度スコアを評価します(本記事では、検索のみを扱います)。

List<CatalogItem> items = 
   scm.findCatalogItem("flashlight");
LOG.info("Found {} items: {}", items.size(), items);

低水準クライアントで検索を実行するためには、索引に対してGETリクエストを発行する必要があります。URIは、/<indexname>/_searchとなります。低水準APIはElasticsearch RESTインタフェースを使うことから、REST問合せオブジェクトを手動で作成する必要があります。今回の場合は、{ "query" : {"query_string" : { "query": "flashlight" } } }となります。

Elasticsearchにリクエストを送信後、結果をResponseオブジェクトとして受け取ることになります。この結果には、戻りステータスと、JSONレスポンスを表すエンティティが含まれています。結果のCatalogItemを得るためには、レスポンスの構造を調べる必要があります。

public List<CatalogItem> findCatalogItem(String text) {
    Request request = new Request("GET", 
             String.format("/%s/_search", getIndex()));
       
    request.setJsonEntity(String.format(SEARCH, text));
    try {
        Response response = client.performRequest(request);
   if (response.getStatusLine().getStatusCode()==OK) {
           List<CatalogItem> catalogItems = 
                parseResultsFromFullSearch(response);
          
           return catalogItems;
       } 
    } catch (IOException ex) {
        LOG.warn("Could not post {} to ES", text, ex);
    }
    return Collections.emptyList();
}

まず、検索で返されたドキュメントを探し、次に、返されたJSONドキュメントをモデルに変換しています。おわかりのように、リクエストの作成でもレスポンスの解析でも、Elasticsearch RESTドキュメントを多用する必要があります。どのようにリクエストを作成し、Elasticsearchが返す内容をテストするかを確認するもっとも簡単な方法は、ChromeのAdvanced REST Client(ARC)プラグインを使うか、Postmanアプリを使うか、Kibanaをインストールすることです。

特定のフィールド(たとえば、カタログ項目のカテゴリ名)のみを検索するように変更する場合は、先ほどの問合せを{ "query" : { "match" : { "category.category_name" :"Home" } } }に変更するだけで済みます。その後、同じ処理を使ってリクエストを送信し、結果を解析します。

この2つの検索は、IDから項目を取得する処理とは異なります。IDから取得する場合は、索引にID(たとえば、/<indexname>/_doc/5)を渡すGETリクエストを発行するだけで済みます。また、返されたオブジェクトが存在する場合に取得されるのは、見つかった項目の配列ではなく、1つの項目だけであるため、その解析も異なります。次のコードは、低水準APIを使ってIDで検索する方法を示しています。すべての低水準API呼出しと同じように、JSONレスポンスを解析し、メタデータをスキップしてCatalogItem情報を抽出する必要があります。次に例を示します。

public Optional<CatalogItem> getItemById(Integer id) {
  Request request = new Request("GET", 
         String.format("/%s/_doc/%d", getIndex(), id));
  try {
    Response response = client.performRequest(request);
    if (response.getStatusLine().getStatusCode() == 200) {
      String rBody = 
          EntityUtils.toString(response.getEntity());
      LOG.debug("find by item id response: {}", rBody);
      int start = rBody.indexOf(_SOURCE);
      int end = rBody.indexOf("}}");
      String json = rBody.substring(
                     start + _SOURCE.length(), end+2); 
      LOG.debug(json);
      CatalogItem item = 
          jsonMapper.readValue(json, CatalogItem.class);
      return Optional.of(item);
    } 
  } catch (IOException ex) {
      LOG.warn("Could not post {} to ES", id, ex);
  }
  return Optional.empty();
}

ドキュメントの更新:ドキュメントを更新する方法は複数あります。ドキュメント全体を更新する方法と、特定のフィールドのみを更新する方法を使うことができます。CatalogItemはかなり小さなドキュメントであるため、ここでは全体を更新することにします。

public void updateCatalogItem(CatalogItem item) {
  Request request = 
      new Request("POST", 
                  String.format("/%s/_update/%d", 
                  getIndex(), item.getId()));
  try {
    request.setJsonEntity("{ \"doc\" :" + 
               jsonMapper.writeValueAsString(item)+"}");
      
    Response response = client.performRequest(request);
    LOG.debug("update response: {}", response);
  } catch (IOException ex) {
    LOG.warn("Could not post {} to ES", item, ex);
  }    
}

特定のフィールドのみを更新する場合も、同じURIにPOSTリクエストを発行します。ただし、リクエストでオブジェクトを送信するのではなく、更新が必要なフィールドのみを送信します。

public void updateDescription(Integer id, String desc) {
  Request request = new Request("POST", 
           String.format("/%s/_update/%d", index,  id));
  try {

    request.setJsonEntity(
         String.format(
            "{ \"doc\" : { \"description\" : \"%s\" }}", 
      desc));
      
    Response response = client.performRequest(request);
    LOG.debug("update response: {}", response);
  } catch (IOException ex) {
    LOG.warn("Could not post {} to ES", id, ex);
  }
}

次に例を示します。 項目の削除:項目の削除も簡単です。索引およびドキュメントID(たとえば、/<indexname>/_doc/5)を使用してDELETEリクエストを送るだけで済みます。

public void deleteCatalogItem(Integer id) {
  Request request = new Request("DELETE", 
        String.format("/%s/_doc/%d", getIndex(),  id));
  try {
    Response response = client.performRequest(request);
    LOG.debug("delete response: {}", response);
  } catch (IOException ex) {
    LOG.warn("Could not post {} to ES", id, ex);
  }
}

これで、低水準同期クライアントにおけるCRUDメソッドの概要説明が終わりました。

非同期呼出し:低水準クライアントを使って非同期呼出しを行う場合は、performRequestメソッドの代わりにperformRequestAsyncメソッドを呼び出す必要があるだけです。非同期呼出しには、レスポンス・リスナーを提供する必要があります。レスポンス・リスナーには、onSuccessおよびonFailureという2つのメソッドを実装する必要があります。次のコードの5行目と9行目をご覧ください。この例では、Elasticsearchの索引に対して、複数の項目の非同期アップサートを行っています。

カウントダウン・ラッチはJavaのconcurrentパッケージの一部です。ここでは、スレッド同期メカニズムとして使っています。カウントダウン・ラッチは、整数値で初期化します。待機スレッドからawait()メソッドを呼び出し、その他のスレッドからcountDownを呼び出します。その結果、countDownがラッチの初期値の回数だけ呼び出されるまで、待機スレッドはブロックされます。

これを行うために、カウントダウン・ラッチを作成しています。すべての項目が送信され、Elasticsearchで処理を終えるまでcreateCatalogMethodが終了しないようにするために、作成したカウントダウン・ラッチを使用しています。今回のレスポンス・リスナーの実装では、成功した場合と失敗した場合の両方でラッチのcountDownメソッドを呼び出し、Elasticsearchで項目の処理が完了したことを示しています。

public void createCatalogItem(List<CatalogItem> items) {
  CountDownLatch latch = new CountDownLatch(items.size());
  ResponseListener listener = new ResponseListener() {
    @Override
    public void onSuccess(Response response) {
      latch.countDown();
    }
    @Override
    public void onFailure(Exception exception) {
      latch.countDown();
      LOG.error(
        "Could not process ES request. ", exception);
    }
  };
      
  itemsToCreate.stream().forEach(e-> {
    Request request = new Request(
                 "PUT", 
                 String.format("/%s/_doc/%d", 
                               index, e.getId()));
      try {
        request.setJsonEntity(
            jsonMapper().writeValueAsString(e));
        client.performRequestAsync(request, listener);
      } catch (IOException ex) {
        LOG.warn("Could not post {} to ES", e, ex);
      }
    });
  try {
    latch.await(); // すべてのスレッドが終了するまで待機
    LOG.info("Done inserting all the records to the index");
  } catch (InterruptedException e1) {
    LOG.warn("Got interrupted.",e1);
  }
}

非同期クライアントは、高水準RESTクライアントと組み合わせることで、大幅に使いやすくなります。

 

高水準RESTクライアント

高水準RESTクライアント 高水準RESTクライアントは、低水準クライアントを使って作られています。Elasticsearchの依存性がいくつかプロジェクトに追加されますが、これから説明するように、コーディングははるかに簡単になり、楽しくなります。この点は、同期APIと非同期APIの両方に共通します。高水準APIの使用を選択する際には、1点注意すべきことがあります。Elasticsearchクラスタのメジャー・アップデートが行われるたびに、クライアントの依存性をアップグレードすることが推奨されるということです。低水準APIを使っている場合、この依存性アップグレードは必要ありません。しかし、ベースとなるElasticsearch APIが変更された場合は、その変更に対応するために実装の調整が必要となる可能性はあります。高水準クライアントを使うことでコーディングは容易になりますが、低水準クライアントの方が制御できる範囲は広く、バイナリのフットプリントは小さくなっています。

高水準Elasticsearchライブラリは、RESTクライアントをインポートするだけで取得できます。Mavenプロジェクトの場合は次のようにします。

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>
    elasticsearch-rest-high-level-client
  </artifactId>
  <version>7.4.2</version>
</dependency>    

高水準RESTクライアントの構築は、低水準RESTクライアントの構築ととてもよく似ています。唯一違うのは、低水準クライアントを高水準クライアントAPIでラップする必要がある点だけです。

try(RestHighLevelClient client = 
    new RestHighLevelClient(
        RestClient.builder(
            new HttpHost("localhost", 9200, "http")))) {

    CrudMethodsSynchronous scm = 
        new CrudMethodsSynchronous(
            "catalog_item_high_level",  client);

本誌のダウンロード・サイトに掲載したコードには、低水準クライアントで実装したものと同じメソッドがすべて含まれています。しかし、高水準モデルの方がはるかに使いやすいため、ここではドキュメントの作成方法と検索方法の2つに限って説明します。

Elasticsearch高水準APIでドキュメントを作成するためには、IndexRequestを使用し、希望の索引名で初期化する必要があります。その後、リクエストにIDを設定し、ソースとしてJSONを追加します。このリクエストを使って高水準クライアントの索引APIを同期的に呼び出すと、索引レスポンスが返されます。このレスポンスにより、ドキュメントが作成されたのか、または更新されたのかを確認することができます。

try(RestHighLevelClient client = 
    new RestHighLevelClient(
        RestClient.builder(
            new HttpHost("localhost", 9200, "http")))) {

    CrudMethodsSynchronous scm = 
        new CrudMethodsSynchronous(
            "catalog_item_high_level",  client);
public void createCatalogItem(List<CatalogItem> items) {
  items.stream().forEach(e-> {
    IndexRequest request = new IndexRequest(index);
      try {
        request.id(""+e.getId());
        request.source(jsonMapper.writeValueAsString(e), 
                   XContentType.JSON);
        request.timeout(TimeValue.timeValueSeconds(10));
        IndexResponse response = client.index(request,  
                            RequestOptions.DEFAULT);
        if (response.getResult() == 
                  DocWriteResponse.Result.CREATED) {
          LOG.info("Added catalog item with id {} "
                   + "to ES index {}", 
               e.getId(), response.getIndex());  
             
        } else if (response.getResult() == 
                  DocWriteResponse.Result.UPDATED) {
          LOG.info("Updated catalog item with id {} " +
           " to ES index {}, version of the " +
         "object is {} ", 
         e.getId(), response.getIndex(), 
         response.getVersion()); 
        } 
   
      } catch (IOException ex) {
        LOG.warn("Could not post {} to ES", e, ex);
      }
    }
   );
}    

同様に、全文検索もはるかに読みやすくなっています。ここでは、索引を渡して検索リクエストを作成し、続いて検索問合せビルダーを使って全文テキスト検索を作成しています。検索レスポンスには、JSONナビゲーション機能が含まれています。この機能により、配列を介して結果ドキュメントに簡単にアクセスすることができます。次のコードでは、SearchHitsがその配列です。

public List<CatalogItem> findCatalogItem(String text) {
    try {
        SearchRequest request = new SearchRequest(index); 
        SearchSourceBuilder scb = new SearchSourceBuilder();
        SimpleQueryStringBuilder mcb = 
       QueryBuilders.simpleQueryStringQuery(text);
        scb.query(mcb); 
        request.source(scb);
         
        SearchResponse response = 
            client.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        SearchHit[] searchHits = hits.getHits();
        List<CatalogItem> catalogItems = 
            Arrays.stream(searchHits)
                  .filter(Objects::nonNull)
                  .map(e -> toJson(e.getSourceAsString()))
                  .collect(Collectors.toList());
         
        return catalogItems;
    } catch (IOException ex) {
        LOG.warn("Could not post {} to ES", text, ex);
    }
    return Collections.emptyList();
}

このコードでは、Elasticsearchの特定の索引を検索しました。すべての索引を検索するためには、パラメータなしでSearchRequestを作成する必要があります。ここで紹介した2つの例から、その他のCRUDメソッドでの応用パターンがわかります。つまり、最初に特定のリクエストを作成し、そのリクエストに索引とドキュメントIDを渡します。リクエストは、ドキュメントを作成する場合はIndexRequest、IDからドキュメントを取得する場合はGetRequest、ドキュメントを更新する場合はUpdateRequestというようになります。その後、Elasticsearchに対して適切なリクエスト(取得、更新、削除など)を発行します。その結果、ステータス(および該当する場合はソース・オブジェクト)を含むレスポンスを受け取ります。

非同期呼出し:高水準クライアントを使うことで、非同期呼出しを書くのが少し楽になります。非同期呼出しを書くためには、相当する同期メソッドに接尾辞Asyncを付けたものを呼び出します。その際、最後の引数として、ElasticsearchのActionListener、または高水準オブジェクト(PlainActionFutureなど)のいずれかを渡します。PlainActionFutureは、高水準APIの依存性としてインポートされる、Elasticsearchのクラスです。このクラスは、ElasticsearchのActionListenerインタフェースとJavaのFutureインタフェースの両方を実装しているため、レスポンス処理に最適です。

次のサンプル・コードでは、すべてのメソッドを非同期式に実装しています。基本的に同期式の例と同じですが、PlainActionFutureを作成している点が異なります。PlainActionFutureには、やがて検索レスポンスが設定されます。高水準RESTクライアントのsearchAsync APIには、このPlainActionFutureを渡します。このメソッドのコール元はFutureを調べ、検索が完了したタイミングで、同期APIとまったく同じように検索レスポンスを解析します。

非同期APIの最大のメリットは、検索結果が必要になるまで、ワーキング・スレッドで他の操作を行うことができることです。

public PlainActionFuture<SearchResponse> 
                                 findItem(String text) {
                 
    SearchRequest request = new SearchRequest(getIndex()); 
    SearchSourceBuilder ssb = new SearchSourceBuilder();
    SimpleQueryStringBuilder mqb = 
             QueryBuilders.simpleQueryStringQuery(text);
    ssb.query(mqb); 
    request.source(ssb);
    
    PlainActionFuture<SearchResponse> future = 
                              new PlainActionFuture<>();
    client.searchAsync(request, 
                       RequestOptions.DEFAULT, future);
    return future;
}

 

Elasticsearchへのデータ・ストリーミング

本記事の最後に、Elasticsearchにデータをストリーミングする方法と、一括処理について紹介します。一括処理により、1回のリクエストで複数の索引、更新、削除操作を行うことができます。一括リクエストのメリットは、Elasticsearchサーバーとの間をリクエストのたびに往復するのではなく、1回の往復だけですべてを行う点です。また、一括処理は、Elasticsearchへのデータ・ストリーミングとの相性が非常によいものです。唯一の注意点は、余分な待機時間が発生しないように、適切なバッチ・サイズを決定する方法を見つける必要があるということです。バッチが大きすぎて、作業が完了する前にリクエスト全体がタイムアウトとなることがないようにします。

バッチ・リクエストには、異なる操作を含めることもできます。しかし、次の例では、Elasticsearchの索引にデータを挿入する索引リクエストのみを含めています。次のルーチンでは、1つの一括リクエストを作成しています。この一括リクエストでは、バッチに渡す1つの項目につき1つの索引リクエストを追加しています。コール元には、適切なバッチ・サイズを使用する責任があります。すべての項目を追加したら、同期クライアントから一括リクエストを送信します。

一括リクエストを呼び出すと、一括レスポンスが返されます。一括レスポンスには、複数の一括レスポンス項目が含まれます。それぞれの項目はリクエストの項目に対応しており、リクエストされた操作や、その操作が成功したかどうかなどを示します。簡略化するため、この例では一括レスポンスは使用していません。

private void sendBatchToElasticSearch(
                 List<LineFromShakespeare> linesInBatch, 
      RestHighLevelClient client, 
      String indexName) throws IOException {
         
    BulkRequest request = new BulkRequest();
    linesInBatch.stream().forEach(l -> {
        try {
            request.add(new IndexRequest(indexName)
                   .id(l.getId())
                   .source(jsonMapper.writeValueAsString(l), 
             XContentType.JSON));
        
       } catch (JsonProcessingException e) {
           LOG.error("Problem mapping object {}", l, e);
       }});
    LOG.info("Sending data to ES");
    client.bulk(request, RequestOptions.DEFAULT);
}

 

上記のメソッドは、別の処理から起動します。この処理では、ファイルのストリーミングを使ってElasticsearchに1,000個ずつドキュメントをロードし、バッチを作成しています。

public void loadData(String file, String index) 
               throws IOException, URISyntaxException {
  Path filePath = Paths.get(
           ClassLoader.getSystemResource(file).toURI());

  List<String> errors = new ArrayList<>();
  List<LineFromShakespeare> lines = new ArrayList<>();
  final int maxLinesInBatch = 1000;
  try(RestHighLevelClient client = 
    new RestHighLevelClient(
      RestClient.builder(
       new HttpHost("localhost", 9200, "http")))) {
          
      Files.lines(file).forEach( e-> {
        try {
            LineFromShakespeare line = 
                jsonMapper.readValue(e, LineFromShakespeare.class);
            // 修飾する...
            linesInBatch.add(line);
            if (linesInBatch.size() >= maxLinesInBatch) {
                sendBatchToElasticSearch(linesInBatch, 
                                         client, indexName);
               linesInBatch.clear();
            }
        } catch (IOException ex) {
          errors.add(e);
          linesInBatch.clear();
        }});

    if (linesInBatch.size() != 0) {
        sendBatchToElasticSearch(linesInBatch, 
                            client, indexName);
        linesInBatch.clear();
    }
  }
       
  LOG.info("Errors found in {} batches", errors.size());
}

 

ここでは、JavaのFiles.lines APIを使ってファイルを1行ずつストリーミングし、テキストをオブジェクトに変換し、修飾したうえで、Elasticsearchに送信予定のバッチに追加しています。バッチ・サイズが1,000になったら、sendBatchToElasticSearchメソッドを使ってバッチをElasticsearchに送信しています。

おわかりのように、高水準APIを使うことでコードは簡素化され、その可読性もはるかに向上します。そのため、バイナリ・フットプリントが問題にならず、Elasticsearchクラスタがメジャー・アップグレードされるたびに依存性をアップグレードできるのであれば、高水準APIを使い続けることを強くお勧めします。

 

まとめ

本記事では、Elasticsearchを紹介しました。低水準クライアントおよび高水準クライアントの両方で使用されるJava CRUD APIを中心に、CRUDアプリケーションに必要なほとんどの機能に触れました。APIと、Elasticsearchへのデータ・ストリーミングは、本格的にElasticsearchを使う前に必要な基礎知識です。Elasticsearchが扱う領域には、ドキュメント設計やデータ分析機能に加え、複数フィールド検索、近似マッチング、ページング、サジェスチョン、ハイライト、結果のスコアリングを含む高度な検索、さまざまなタイプのデータ集計、位置情報、セキュリティ、クラスタ管理などが含まれます。この土俵はとても広いのです。ぜひお楽しみください。


 

 

Henry Naftulin

Henry Naftulin:15年以上にわたってJava EE分散システムの設計に携わる。現在は、米国有数の金融会社で、受賞歴のあるプロプライエタリな債券取引プラットフォームのリード開発者を努めている。

 

 

 

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.