We are pleased to announce the release of a new Oracle Coherence Community open source project. The CoherenceRx project provides a reactive API for the Oracle Coherence in-memory data grid, based on the popular RxJava library.
It is implemented as a thin wrapper around the Oracle Coherence Asynchronous API, which implies that it requires Coherence 12.2.1 or a newer release.
Reactive Programming is somewhat of an all-or-nothing proposition, or as Andre Staltz pointed out in his excellent tutorial:
When you are writing a reactive application and need to access a data source that doesn't provide a reactive API, life can get complicated. In order to simplify our users' lives we decided to implement CoherenceRx and release it as an open source add-on for Coherence.
The easiest way to include CoherenceRx into your own project is to add it as a Maven dependency (along with Coherence itself and RxJava):
and configure versions within Maven
Once you have the necessary dependencies properly configured, you can use the static
RxNamedCache.rx method to create an instance of
NamedCache<Long, Product> cache = CacheFactory.getTypedCache("trades", withTypes(Long.class, Product.class));
RxNamedCache<Long, Product> rxCache = RxNamedCache.rx(cache);
Of course, you can also use static import for the
RxNamedCache.rx method, which would make the code even simpler.
RxNamedCache interface will be familiar to anyone who has used Coherence
NamedCache API before, with one major difference: all the methods return an
RxNamedCache.get will return an
Observable<V> which will eventually emit the value of the cache entry for the given key and complete:
rxCache.get(5L).subscribe(product -> System.out.println("Got: " + product));
Another important difference is that the bulk read operations, such as
values do not return a single container value like their
NamedCache counterparts, but an
Observable stream of individual values:
rxCache.values().subscribe(product -> System.out.println("Got: " + product));
This is both more efficient, as it doesn't realize full result set on the client, and simpler, as it allows you to process each individual value as it is emitted by the underlying
For example, if you wanted to process batches of 10 products at a time, you could trivially accomplish that using
.subscribe(productList -> System.out.println("Got: " + productList));
Observing Event Streams
Oracle Coherence provides rich event notification functionality, so it only made sense to provide an adapter that allows you to use RxJava to process stream of event notifications.
ObservableMapListener, which extends RxJava
Observable and implements Coherence
MapListener interface. The
ObservableMapListener simply propagates each received event to all of its subscribers:
ObservableMapListener<Long, Product> listener = ObservableMapListener.create();
The above is not very interesting, and could be easily achieved using standard
SimpleMapListener. But it becomes a lot more interesting when you start applying various RxJava operators to transform, filter and even combine event streams:
ObservableMapListener<Long, Trade> listener = ObservableMapListener.create();
listener.filter(evt -> evt.getId() == MapEvent.ENTRY_INSERTED)
.subscribe(trades -> System.out.println("Trades placed in the last 10 seconds: " + trades));
It is important to note that unlike
Observables returned by the
RxNamedCache methods, which are 'cold', the
ObservableMapListener is a 'hot'
Observable and will start receiving and processing the events as soon as it is registered with the cache using
Because of that, it is important that you add
Subscribers to it before calling
NamedCache.addMapListener, or you could miss some events.
CoherenceRx is the latest example of how Oracle Coherence continues to improve the developer experience when working with In-Memory Data Grid platforms. In the past several months, we have provided groundbreaking support for Distributed Java 8 Lambdas and Streams, a new Async API based on CompletableFutures, and, now, support for Reactive Programming. We welcome feedback on the CoherenceRx project in the comments section below, or, as you use it, you can file issues on the GitHub project page.