X

An Oracle blog about Oracle Coherence

CoherenceRx: Reactive Extensions for Oracle Coherence

Guest Author

We are pleased to announce the release of a new Oracle Coherence Community open source project.  The CoherenceRx project provides a reactive API for the Oracle Coherence in-memory data grid, based on the popular RxJava library.

It is implemented as a thin wrapper around the Oracle Coherence Asynchronous API, which implies that it requires Coherence 12.2.1 or a newer release.

Why CoherenceRx?

Reactive Programming is somewhat of an all-or-nothing proposition, or as Andre Staltz pointed out in his excellent tutorial:

Everything is a Stream

When you are writing a reactive application and need to access a data source that doesn't provide a reactive API, life can get complicated. In order to simplify our users' lives we decided to implement CoherenceRx and release it as an open source add-on for Coherence.

Using CoherenceRx

The easiest way to include CoherenceRx into your own project is to add it as a Maven dependency (along with Coherence itself and RxJava):

<dependency>
<groupId>com.oracle.coherence</groupId>
<artifactId>coherence</artifactId>
<version>${coherence.version}</version>
</dependency>
<dependency>
<groupId>com.oracle.coherence</groupId>
<artifactId>coherence-rx</artifactId>
<version>${coherence-rx.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>

and configure versions within Maven properties section:

<coherence.version>12.2.1-0-0</coherence.version>
<coherence-rx.version>1.0.0</coherence-rx.version>
<rxjava.version>1.1.0</rxjava.version>

Once you have the necessary dependencies properly configured, you can use the static RxNamedCache.rx method to create an instance of RxNamedCache:

NamedCache<Long, Product>   cache   = CacheFactory.getTypedCache("trades", withTypes(Long.class, Product.class));
RxNamedCache<Long, Product> rxCache = RxNamedCache.rx(cache);

Of course, you can also use static import for the RxNamedCache.rx method, which would make the code even simpler.

The RxNamedCache interface will be familiar to anyone who has used Coherence NamedCache API before, with one major difference: all the methods return an Observable.

For example, RxNamedCache.get will return an Observable<V> which will eventually emit the value of the cache entry for the given key and complete:

rxCache.get(5L).subscribe(product -> System.out.println("Got: " + product));

Another important difference is that the bulk read operations, such as getAll, keySet, entrySet and values do not return a single container value like their NamedCache counterparts, but an Observable stream of individual values:

rxCache.values().subscribe(product -> System.out.println("Got: " + product));

This is both more efficient, as it doesn't realize full result set on the client, and simpler, as it allows you to process each individual value as it is emitted by the underlying Observable.

For example, if you wanted to process batches of 10 products at a time, you could trivially accomplish that using bufferoperation:

rxCache.values()
.buffer(10)
.subscribe(productList -> System.out.println("Got: " + productList));

Observing Event Streams

Oracle Coherence provides rich event notification functionality, so it only made sense to provide an adapter that allows you to use RxJava to process stream of event notifications.

CoherenceRx introduces ObservableMapListener, which extends RxJava Observable and implements CoherenceMapListener interface. The ObservableMapListener simply propagates each received event to all of its subscribers:

ObservableMapListener<Long, Product> listener = ObservableMapListener.create();
listener.subscribe(System.out::println);
cache.addMapListener(listener);

The above is not very interesting, and could be easily achieved using standard SimpleMapListener. But it becomes a lot more interesting when you start applying various RxJava operators to transform, filter and even combine event streams:

ObservableMapListener<Long, Trade> listener = ObservableMapListener.create();
listener.filter(evt -> evt.getId() == MapEvent.ENTRY_INSERTED)
.map(MapEvent::getNewValue)
.buffer(10, TimeUnit.SECONDS)
.subscribe(trades -> System.out.println("Trades placed in the last 10 seconds: " + trades));
cache.addMapListener(listener);

It is important to note that unlike Observables returned by the RxNamedCache methods, which are 'cold', the ObservableMapListener is a 'hot' Observable and will start receiving and processing the events as soon as it is registered with the cache using NamedCache.addMapListener method.

Because of that, it is important that you add Subscribers to it before calling NamedCache.addMapListener, or you could miss some events.

Feedback

CoherenceRx is the latest example of how Oracle Coherence continues to improve the developer experience when working with In-Memory Data Grid platforms.  In the past several months, we have provided groundbreaking support for Distributed Java 8 Lambdas and Streams, a new Async API based on CompletableFutures, and, now, support for Reactive Programming.  We welcome feedback on the CoherenceRx project in the comments section below, or, as you use it, you can file issues on the GitHub project page.

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.Captcha