Thursday May 02, 2013

Integrating WebSockets and JMS with CDI Events in Java EE 7

UPDATE July 15th, 2013
I've updated this blog entry to clarify the issue of integration between WebSockets API 1.0 and other Java EE technologies, and also to link you to the bugs submitted in the WebSockets, JMS, and specially the CDI specification. I want to talk all spec leaders involved in this, specially JJ Snyder, Danny Coward, and Nigel Deakin.

This is a great reminder of how important is your participation during the specification definition process. Please read, test and report any issue you find in a specification before it becomes final. Join the Adopt a JSR program and work with us!

------------------ 

WebSocket is the new kid on the block when you think about Web Development these days. And it is expected that you want to integrate it with whatever is available in your hands. Java EE 7 is coming with cool things beyond this, for example JMS 2.0. And then you wonder: how can I send asynchronous messages to all WebSocket sessions connected to my website? Server push; no polling: for real!

The answer is quite simple: CDI. Also know as the Java EE magic glue. CDI enables a developer to build inter-communication between, apparently, distinct parts of your application. Let's go through all the steps to enable your WebSocket application to send and receive messages through JMS.

1 - Creating the WebSocket Server Endpoint

First we need to build the WebSocket server endpoint that will receive messages from clients, and to also notify clients asynchronously with a server push, with incoming JMS message payloads:

@Named
@ServerEndpoint("/websocket") public class WebSocketEndpoint implements Serializable { // this object will hold all WebSocket sessions connected to this WebSocket // server endpoint (per JVM) private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<Session>()); 

Now you must also add three key methods to this WebSocket: 

@OnOpen public void onOpen(final Session session) { sessions.add(session); } @OnMessage public void onMessage(final String message, final Session client) { ... } @OnClose public void onClose(final Session session) { sessions.remove(session); }

Notice that on onOpen and onClose, we manage all user sessions connected to this endpoint. We will see later how sessions will be used inside onMessage. For now, let's create a SessionBean to send messages to a JMS Queue.

2 - Creating the SessionBean to send JMS messages

Due to missing parts from the specifications, we cannot use @Inject JMSContext inside a @ServerEndpoint WebSocket, and we can't also set a server endpoint as a MessageListener to receive JMS messages. Simply put, integration between JMS and WebSockets (and probably other Java EE APIs) is not straightforward. You can follow the discussion in the following issues:

 

But fortunately, there are two ways to do this: 

  1. Create a stateless SessionBean that sends messages to a JMS queue
  2. Inject JMS resources inside WebSocket Server Endpoint, and create a JMSContext from the ConnectionFactory

 

Solution #2 can be achieved using the following snippet, thanks to Nigel Deakin, spec leader of JMS who helped me discovering this issue.

@Resource(lookup="java:comp/DefaultJMSConnectionFactory") ConnectionFactory cf;
@Resource(lookup = "jms/myQueue") Queue myQueue;
 
@OnMessage
public void onMessage(final String message, final Session client) {
try (JMSContext context = cf.createContext();){
context.createProducer().send(myQueue, message);
}
}

We are going to use solution #1 and create a SessionBean to forward incoming WebSocket messages to a JMS queue. Create a class named QueueSenderSessionBean as it follows:

@Stateless
public class QueueSenderSessionBean { ... }

This is a simple @Stateless SessionBean. Now, let's add a business method to it, called sendMessage:

public void sendMessage(String message) { ... } 

Quite straight-forward, isn't? One of the great things about JMS 2.0 is its simplicity to send messages to a destination. To do that, we need to inject two objects:

@Resource(mappedName = "jms/myQueue")
private Queue myQueue;
@Inject private JMSContext jmsContext; 

JMSContext is one of the new classes added to JMS API, and is documented here. It encapsulates a Connection and a Session, and makes use of a default ConnectionFactory, now a required resource to be provided by all Java EE 7 certified application servers. Next, all you need is to add the logic to the previously added method:

jmsContext.createProducer().send(myQueue, message);

And you are done with the SessionBean. Next we will add some glue between the SessionBean and the WebSocket to send messages to the JMS destination.

3 - Forwarding an incoming WebSocket message to a JMS destination

All you need to do here is to inject the SessionBean into your WebSocket, and call the sendMessage method inside onMessage of your endpoint. Let's start with the injection first, but due to a bug, we must do constructor injection. Open your WebSocket server endpoint class WebSocketEndpoint, and add the following field:

private QueueSenderSessionBean senderBean;

Now add the following constructor to it:
  @Inject
  public WebSocketEndpoint(QueueSenderSessionBean sb) {
     this.senderBean = sb;
  }

Next step is to simply call the method inside onMessage
senderBean.sendMessage(message);

We have finished the first part of this application. With this code, you are now able to send a message from a WebSocket client, to a JMS destination. Next, we will do the opposite. Let's push some data from a JMS queue to all WebSocket clients!

4 - Listening to a JMS Destination with a MessageDriven Bean

Funny fact: some developers have not realized yet, but the MessageDriven annotation is not specified by the JMS API. Instead, it is part of the EJB specification, and it can be used not only for JMS, but for many other things. David Blevins from the awesome Apache TomEE realized that, and proposed a small change to the EJB spec, where resource adapters required connectors to provide a messagelistener-type. His proposal though, suggests that you should be able to use an MDB to listen to different things, and the listener interface should be optional. One example is to listen to Telnet commands. Pretty awesome! But let's focus on our use case here, which is specific to JMS.

Now that we can publish messages into a Queue destination from a WebSocket client, we must process them to later forward to somewhere else. Let's start coding our JMS MDB (remember, not all MDBs are implicitly JMS-related!), implementing the MessageListener interface, required by JMS ResourceAdapter connectors:

  @Named
  @MessageDriven(mappedName = "jms/myQueue")
  public class WebSocketMDB implements MessageListener {
    @Override
    public void onMessage(Message msg) { ... }
  }

This is the basic code to any JMS MDB. Now let's do some magic... 

5 - Firing CDI events with the JMS Message payload

Remember when I told that we cannot listen to JMS destinations directly from the WebSocket server endpoint due to specification restrictions? Well. We can actually, but using a different technique. If you haven't heard about CDI Events, you should read about it before continuing this tutorial. Done? Ok, let's go. First thing we need is an Event qualifier. Create the WSJMSMessage annotation inside your project:

  @Qualifier
  @Retention(RetentionPolicy.RUNTIME)
  @Target({ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER, ElementType.TYPE})
  public @interface WSJMSMessage {}

 With a defined qualifier, CDI will be able to connect the firing event with the observer object. Go back to the WebSocketMDB and add an Event dispatcher to it, with the qualifier we created above:

    @Inject
    @WSJMSMessage     Event<Message> jmsEvent; 

 Now let's add the logic to the onMessage method:

jmsEvent.fire(msg);

6 - Listening to CDI events within the WebSocket server endpoint

This is the last server-side part of this article, then next you will see how to code Javascript on the client-side. Let's listen to CDI events fired by the MDB, with the Message payload. Open again your WebSocketEndpoint class, and add the following method to it:

public void onJMSMessage(@Observes @WSJMSMessage Message msg) {
        try {
            for (Session s : sessions) {
                s.getBasicRemote().sendText("message from JMS: " + msg.getBody(String.class));
            }
        } catch (IOException | JMSException ex) {
            Logger.getLogger(WebSocketEndpoint.class.getName()).log(Level.SEVERE, null, ex);
        }     } 

Observe the @Observes and the qualifier @WSJMSMessage we defined previously. This is what tells CDI to listen to the fired events by the MDB.

7 - Client-side Javascript to connect with the WebSocket server endpoint

This has been floating around the Internet for a while as it is not Java nor Java EE specific, but anyway it is basically this:

// note the final path is the same defined inside WebSocketEndpoint class at @ServerEndpoint websocketSession = new WebSocket('ws://' + document.location.host + '/your-app-context-root/websocket');

Here is the final Javascript used by this example, as well the HTML interface.

Conclusion

I hope you have found this article useful to begin your development with Java EE 7, and what are the possibilities of integrating CDI, WebSockets, JMS, and EJB. These are the main points about this article:

  • ability to asynchronously communicate with WebSocket clients (although you can also use session.getAsyncRemote() to send messages asynchronously)
  • ability to do a server push to WebSocket clients at any point in your application
  • ability to scale server-pushed communication to WebSocket client sessions across a cluster using JMS Topics
    This is perhaps one of the most interesting thing about this setup. If you use a Topic instead of a Queue, you will be able to push data to all WebSocket sessions connected to your application across a cluster. There's a know limit of roughly 64k client sessions per web server, and in this example we use a static synchronized Set to hold a reference to them. Imagine now a cluster. We change this to a Topic clustered subscriber, and we are able to scale up server pushed data :-)
The source code of this project is available at my GitHub repository javaee7-jms-websocket-example. I hope you liked the article!

About


Bruno has been having fun working with Java since 2000 and now helps Oracle on sharing the technology accross all Latin America. Also plays videogames, does trekking and loves beer.

Follow me on Twitter! @brunoborges

Search

Archives
« May 2013 »
SunMonTueWedThuFriSat
   
1
3
4
5
6
7
8
9
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 
       
Today