Reactive programming sounds like the name of an emerging programming paradigm at first, but it refers to a programming technique that offers an event-driven approach for handling asynchronous streams of data. Based on data that flows continuously, reactive systems react to the data by executing a series of events.
Reactive programming follows the Observer design pattern, which can be defined as follows: when there is a change of state in one object, the other objects are notified and updated accordingly. Therefore, instead of polling events for the changes, events are pushed asynchronously so the observers can process them. In this example, observers are functions that are executed when an event is emitted. And the data stream that I mentioned is the actual observable that will be observed.
Nearly all languages and frameworks have adopted this programming approach in their ecosystems, and Java has kept the pace up in its latest releases. In this article, I explain how reactive programming can be applied by using the latest version of JAX-RS from Java EE 8 and by using Java 8 features under the hood.
The Reactive Manifesto lists four fundamental aspects an application must have in order to be more flexible, loosely coupled, and easily scalable—and, therefore, capable of being reactive. It says an application should be responsive, elastic (that is, scalable), resilient, and message-driven.
Having an application that is truly responsive is the foundational goal. Suppose you have an application that heavily depends on one big thread to handle user requests, and this thread typically sends responses back to its originating requesters after doing its work. When the application gets more requests than it can handle, this thread will start to be a bottleneck and the application itself will not be able to be as responsive as it was before. To have the application be responsive, you need to make it scalable and resilient, because responsiveness is possible only with both scalability and resilience. Resilience occurs when an application exhibits features such as auto-recovery and self-healing. In most developers’ experience, only a message-driven architecture can enable a scalable, resilient, and responsive application.
The reactive implementation might look more complicated at first glance, but after closer examination you will see that it’s fairly straightforward.
Reactive programming has started to be baked into the bits of the Java 8 and Java EE 8 releases. The Java language introduced concepts such as CompletionStage
and its implementation, CompletableFuture
, and Java EE started to employ these features in specifications such as the Reactive Client API of JAX-RS.
Let’s look at how reactive programming can be used in Java EE 8 applications. To follow along, you’ll need familiarity with the basic Java EE APIs.
JAX-RS 2.1 introduced a new way of creating a REST client with support for reactive programming. The default invoker implementation provided by JAX-RS is synchronous, which means the client that is created will make a blocking call to the server endpoint. An example for this implementation is shown in Listing 1.
Listing 1.
Response response =
ClientBuilder.newClient()
.target("http://localhost:8080/service-url")
.request()
.get();
As of version 2.0, JAX-RS provides support for creating an asynchronous invoker on the client API by just invoking the async()
method, as shown in Listing 2.
Listing 2.
Future<Response> response =
ClientBuilder.newClient()
.target("http://localhost:8080/service-url")
.request()
.async()
.get();
Using an asynchronous invoker on the client returns an instance of Future
with type javax.ws.rs.core.Response
. This would either result in polling the response, with a call to future.get()
, or registering a callback that would be invoked when the HTTP response is available. Both of these implementation approaches are suitable for asynchronous programming, but things usually get complicated when you want to nest callbacks or you want to add conditional cases in those asynchronous execution flows.
JAX-RS 2.1 offers a reactive way to overcome these problems with the new JAX-RS Reactive Client API for building the client. It’s as simple as invoking the rx()
method while building the client. In Listing 3, the rx()
method returns the reactive invoker that exists on the client’s runtime and the client returns a response of type CompletionStage.rx()
, which enables the switch from sync to async invoker by this simple invocation.
Listing 3.
CompletionStage<Response> response =
ClientBuilder.newClient()
.target("http://localhost:8080/service-url")
.request()
.rx()
.get();
CompletionStage<T>
is a new interface introduced in Java 8, and it represents a computation that can be a stage within a larger computation, as its name implies. It’s the only reactive portion of Java 8 that made it into the JAX-RS.
After getting a response instance, I can just invoke thenAcceptAsync()
, where I can provide the code snippet that would be executed asynchronously when the response becomes available, such as shown in Listing 4.
Listing 4.
response.thenAcceptAsync(res -> {
Temperature t = res.readEntity(Temperature.class);
//do stuff with t
});
The reactive approach is not limited to the client side in JAX-RS; it’s also possible to leverage it on the server side. To demonstrate this, I will first create a simple scenario where I can query a list of locations from one endpoint. For each location, I will make another call to another endpoint with that location data to get a temperature value. The interaction of the endpoints would be as shown in Figure 1.
Figure 1. Interaction between endpoints
First, I simply define the domain model and then I define the services for each domain model. Listing 5 defines the Forecast
class, which wraps the Temperature
and Location
classes.
Listing 5.
public class Temperature {
private Double temperature;
private String scale;
// getters & setters
}
public class Location {
String name;
public Location() {}
public Location(String name) {
this.name = name;
}
// getters & setters
}
public class Forecast {
private Location location;
private Temperature temperature;
public Forecast(Location location) {
this.location = location;
}
public Forecast setTemperature(
final Temperature temperature) {
this.temperature = temperature;
return this;
}
// getters
}
For wrapping a list of forecasts, the ServiceResponse
class is implemented in Listing 6.
Listing 6.
public class ServiceResponse {
private long processingTime;
private List<Forecast> forecasts = new ArrayList<>();
public void setProcessingTime(long processingTime) {
this.processingTime = processingTime;
}
public ServiceResponse forecasts(
List<Forecast> forecasts) {
this.forecasts = forecasts;
return this;
}
// getters
}
LocationResource
, which is shown in Listing 7, defines three sample locations returned with the path /location
.
Listing 7.
@Path("/location")
public class LocationResource {
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getLocations() {
List<Location> locations = new ArrayList<>();
locations.add(new Location("London"));
locations.add(new Location("Istanbul"));
locations.add(new Location("Prague"));
return Response.ok(
new GenericEntity<List<Location>>(locations){})
.build();
}
}
TemperatureResource
, shown in Listing 8, returns a randomly generated temperature value between 30 and 50 for a given location. A delay of 500 ms is added within the implementation to simulate the sensor reading.
Listing 8.
@Path("/temperature")
public class TemperatureResource {
@GET
@Path("/{city}")
@Produces(MediaType.APPLICATION_JSON)
public Response getAverageTemperature(
@PathParam("city") String cityName) {
Temperature temperature = new Temperature();
temperature.setTemperature(
(double) (new Random().nextInt(20)+30));
temperature.setScale("Celsius");
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {}
return Response.ok(temperature).build();
}
}
I will first show the implementation for the synchronous ForecastResource
(shown in Listing 9), which first fetches all locations. Then, for each location, it invokes the temperature service to retrieve the Celsius value.
Listing 9.
@Path("/forecast")
public class ForecastResource {
@Uri("location")
private WebTarget locationTarget;
@Uri("temperature/{city}")
private WebTarget temperatureTarget;
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getLocationsWithTemperature() {
long startTime = System.currentTimeMillis();
ServiceResponse response = new ServiceResponse();
List<Location> locations = locationTarget.request()
.get(new GenericType<List<Location>>() {});
locations.forEach(location -> {
Temperature temperature = temperatureTarget
.resolveTemplate("city", location.getName())
.request()
.get(Temperature.class);
response.getForecasts().add(
new Forecast(location)
.setTemperature(temperature));
});
long endTime = System.currentTimeMillis();
response.setProcessingTime(endTime - startTime);
return Response.ok(response).build();
}
}
When the forecast endpoint is requested as /forecast
, you should see output similar to Listing 10. Notice that the processing time of the request took 1,533 ms, which makes sense because requesting temperature values for three different locations synchronously would add up to 1,500 ms.
Listing 10.
{
"forecasts": [
{
"location": {
"name": "London"
},
"temperature": {
"scale": "Celsius",
"temperature": 33
}
},
{
"location": {
"name": "Istanbul"
},
"temperature": {
"scale": "Celsius",
"temperature": 38
}
},
{
"location": {
"name": "Prague"
},
"temperature": {
"scale": "Celsius",
"temperature": 46
}
}
],
"processingTime": 1533
}
So far, so good. Now it’s time to introduce reactive programming on the server side, where a call for each location could be done in parallel after getting all the locations. This can definitely enhance the synchronous flow shown earlier. This is done in Listing 11, which defines a reactive version of this forecast service.
Listing 11.
@Path("/reactiveForecast")
public class ForecastReactiveResource {
@Uri("location")
private WebTarget locationTarget;
@Uri("temperature/{city}")
private WebTarget temperatureTarget;
@GET
@Produces(MediaType.APPLICATION_JSON)
public void getLocationsWithTemperature(
@Suspended final AsyncResponse async) {
long startTime = System.currentTimeMillis();
// Create a stage on retrieving locations
CompletionStage<List<Location>> locationCS =
locationTarget.request()
.rx()
.get(new GenericType<List<Location>>() {});
// By composing another stage on the location stage
// created above, collect the list of forecasts
// as in one big completion stage
final CompletionStage<List<Forecast>> forecastCS =
locationCS.thenCompose(locations -> {
// Create a stage for retrieving forecasts
// as a list of completion stages
List<CompletionStage<Forecast>> forecastList =
// Stream locations and process each
// location individually
locations.stream().map(location -> {
// Create a stage for fetching the
// temperature value just for one city
// given by its name
final CompletionStage<Temperature> tempCS =
temperatureTarget
.resolveTemplate("city",
location.getName())
.request()
.rx()
.get(Temperature.class);
// Then create a completable future that
// contains an instance of forecast
// with location and temperature values
return CompletableFuture.completedFuture(
new Forecast(location))
.thenCombine(tempCS,
Forecast::setTemperature);
}).collect(Collectors.toList());
// Return a final completable future instance
// when all provided completable futures are
// completed
return CompletableFuture.allOf(
forecastList.toArray(
new CompletableFuture[forecastList.size()]))
.thenApply(v -> forecastList.stream()
.map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join)
.collect(Collectors.toList()));
});
// Create an instance of ServiceResponse,
// which contains the whole list of forecasts
// along with the processing time.
// Create a completed future of it and combine to
// forecastCS in order to retrieve the forecasts
// and set into service response
CompletableFuture.completedFuture(
new ServiceResponse())
.thenCombine(forecastCS,
ServiceResponse::forecasts)
.whenCompleteAsync((response, throwable) -> {
response.setProcessingTime(
System.currentTimeMillis() - startTime);
async.resume(response);
});
}
}
The reactive implementation might look more complicated at first glance, but after closer examination you will see that it’s fairly straightforward. Within the ForecastReactiveResource
implementation, I first create a client invocation on the location services with the help of the JAX-RS Reactive Client API. As I mentioned previously, this is an addition to Java EE 8, and it helps to create a reactive invoker simply by use of the rx()
method.
Reactive programming is more than enhancing the implementation from synchronous to asynchronous; it also eases development with concepts such as nesting stages.
Now I compose another stage based on location to collect the list of forecasts. They will be stored in one big completion stage, named forecastCS
, as a list of forecasts. I will ultimately create the response of the service call by using only forecastCS
.
Let’s continue by collecting the forecasts as a list of completion stages as defined in the forecastList
variable. To create the completion stages for each forecast, I stream on the locations and then create the tempCS
variable by again using the JAX-RS Reactive Client API, which will invoke the temperature service with city name. I use the resolveTemplate()
method here to build a client, and that enables me to pass the name of the city to the builder as a parameter.
As a last step of streaming on locations, I do a call to CompletableFuture.completedFuture()
by providing a newly created instance of Forecast
as the parameter. I combine this future with the tempCS
stage so that I have the temperature value for the iterated locations.
The CompletableFuture.allOf()
method in Listing 11 transforms the list of completion stages to forecastCS
. Execution of this step returns the big completable future instance when all provided completable futures are completed.
The response from the service is an instance of the ServiceResponse
class, so I create a completed future for that as well, and then I combine the forecastCS
completion stage with the list of forecasts and calculate the response time of the service.
Of course, this reactive programming makes only the server side execute asynchronously; the client side will be blocked until the server sends the response back to the requester. In order to overcome this problem, Server Sent Events (SSEs) can also be used to partially send the response once it’s available so that for each location, the temperature values can be pushed to the client one by one. The output of ForecastReactiveResource
will be something similar to Listing 12. As shown in the output, the processing time is 515 ms, which is the ideal execution time for retrieving a temperature value for one location.
Listing 12.
{
"forecasts": [
{
"location": {
"name": "London"
},
"temperature": {
"scale": "Celsius",
"temperature": 49
}
},
{
"location": {
"name": "Istanbul"
},
"temperature": {
"scale": "Celsius",
"temperature": 32
}
},
{
"location": {
"name": "Prague"
},
"temperature": {
"scale": "Celsius",
"temperature": 45
}
}
],
"processingTime": 515
}
Conclusion
Throughout the examples in this article, I first showed the synchronous way to retrieve the forecast information by choreographing location and temperature services. Then I moved on to the reactive approach in order to have the asynchronous processing occur between service calls. When you leverage the use of the JAX-RS Reactive Client API of Java EE 8 and classes such as CompletionStage
and CompletableFuture
shipping with Java 8, the power of asynchronous processing is unleashed with the help of reactive-style programming.
Reactive programming is more than enhancing the implementation from a synchronous to an asynchronous model; it also eases development with concepts such as nesting stages. The more it is adopted, the easier it will be to handle complex scenarios in parallel programming.
This article was originally published in the January/February 2018 issue of Java Magazine.
Mert Çalişkan
Mert Çalışkan (@mertcal) is a Java Champion and a coauthor of PrimeFaces Cookbook (Packt Publishing, 2013) and Beginning Spring (Wiley Publications, 2015). He just released his latest book, Java EE 8 Microservices, and he works as a Principal Engineer on Bitbucket, Atlassian.