Tuesday Dec 08, 2009

Simple long polling in Scala with Atmosphere

There are two styles of pushing events from servers to clients: long polling; and HTTP streaming. The former tends to be easier for developers to understand "push", especially when a bit of HTML and JavaScript is utilized, and it does not break the REST request/response constraint. So there are some advantages over HTTP streaming depending on what your requirements are.

I recently took a slight detour into Node.js, impressive stuff. A blog entry written by Simon Willison presented a simple long-polling example.

Hmm... i wonder if i can do something functionality equivalent in Scala and using Atmosphere. I might iron out some bugs in Atmosphere, learn some stuff along the way, and help developers better understand long polling and Atmosphere.

A common pattern for long polling is:

  1. when a client makes a request and the server has data, that data is returned immediately and the connection is not suspended.
  2. when a client makes a request and the server has no data, the connection is suspended, and the client waits for a response. When data is available the server returns that data and the connection is resumed.

In cases 1 or 2 the client make will another request after it has received the data, and the process repeats. Further more, there may be 2 or more clients waiting for data whose connection will be resumed when the same data is available, for example, consider the case of a chat application where a client may send a message to all other clients.

Notice that there is an interval of time when a client processes a response and makes a new request. Within that interval new data may be available. To ensure that the client does not miss out on that data the server must store that data and the client must signal to the server what data it has previously received.

To play with these concepts i wrote a very simple long polling chat server written in Scala using Atmopshere that functions as previously described, you can download it here.

The main class that acts as the chat application is as follows:

01  @Path("/chat")
02  @Singleton
03  @Produces(Array("application/json"))
04  class MessagesResource {
06    var messages = List[Message]();
08    @Suspend { val resumeOnBroadcast = true }
09    @GET
10    def getMessages(@QueryParam("date") lastSeenTime : long) : unit = {
11      val lastSeenDate = new Date(lastSeenTime);
13      messages.filter(_.date.compareTo(lastSeenDate) > 0) match {
14        case Nil => // Suspend and wait for data
15        case x => // Return with data
16          throw new WebApplicationException(
17            Response.ok(x.reverse).build())
18      }
19    }
21    @Broadcast
22    @POST
23    @Consumes(Array("application/x-www-form-urlencoded"))
24    def publishMessage(@FormParam("message") message : String) = {
25      val m = new Message(new Date(), message);
26      messages = m :: messages;
27      List(m);
28    }
29  } 

The root resource class

A JAX-RS root resource class, MessageResource, is served from the "chat" path segment as declared by the @Path annotation at line 1, and there will be one instance of this class created per the application (using the Jersey @Singleton annotation at line 2) because the messages are going to be stored in memory. All resource methods (see later) will produce JSON, as declared by the @Produces annotation at line 3.

The list of chat messages are stored in an immutable list, List[Message], a reference of which can be updated (which is why var is utilized) at line 6.

Sending a chat message

A client will send a chat message by performing a POST request to the chat resource with a representation conforming to the "application/x-www-form-urlencoded" media type, which basically is the default format used by HTML forms of name=value pairs separated by '&' characters, with a name of "message" and the value that is the chat message. Such a contract is defined at lines 22 to 24 with the JAX-RS resource method publishMessage. The JAX-RS @FormParam annotation is utilized to obtain the form parameter value whose name is "message".

Highlighted, in yellow at line 21, is the Atmopshere annotation @Broadcast. This declares that any response entity returned from the publishMessage resource method will be broadcast on all suspended connections. This is how one chat client may send a message to all other clients participating in chatting.

A new message is created, at line 25, that contains the date it was created at and the actual chat message. The date is used later to determine what messages to send back to the client. A new list of messages is then created, at line 26, by concatenating the newly created message and all the previous messages, and that new list is assigned to the messages field. Then the resource method, at list 27, returns a list with one element that contains the newly create message. A list is returned so that a JSON array will be produced with one object that contains the date and message values (more on how the JSON is created later).

Receiving a chat message

A client will receive one or more chat messages by performing a GET request to the chat resource with a query parameter declaring the date of the message it last received. Such a contract is defined at lines 9 and 10 with the JAX-RS resource method getMessages. The JAX-RS @QueryParam is utilized to obtain the query parameter whose name is "date" that corresponds to the date of the last received message.

Highlighted, in yellow at line 8, is the Atmopshere annotation @Suspend. This declares that, if the resource method returns normally, the connection will be suspended and resumed when a broadcast occurs on that connection.

For expediency i found it is easier to pass dates between the client and server as non-negative numbers thus line 11 creates a java.util.Date object (i know i should be using Joda time!) to easily compare dates of messages.

The list of messages is then filtered to obtain a list of those messages that have been broadcast after the date the client last received a message, at line 13. If the list is Nil (or empty), at line 14, the resource method returns and and the connection will be suspended until a chat message is broadcast. If the list is not empty, at line 15, then the list is reversed, so messages are ordered in increasing time from left to right, a Response is built with the list and is thrown with a WebApplicationException. The throwing of the exception tells Atmosphere the connection should not be suspended. So this is the way client may receive any pending data immediately.

The client

It is very easy to test the chat server using curl. For example, if you run the chat server, then in one terminal window type:

curl -v http://localhost:9999/app/chat 

then the connection is suspended and curl will wait for a response. Then in another terminal window type:

curl -v -d message=HELLO http://localhost:9999/app/chat

You will observe that a message similar to the following will be returned by both curl statements:


Then if you type the following in the first terminal window (substituting the date value as appropriate):

curl -v http://localhost:9999/app/chat?date=1260295792434

the connection will be suspended because no messages have been broadcast after the date declared in the query parameter.

A browser-based client is more interesting and some simple HTML + JavaScript can be easily created as follows (note that i am not so experienced in this area, for example i cannot work out why the heck it works with Firefox but not Safari):

01  <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">
02      <head>
03          <script type="text/javascript" src="jquery-1.3.2.js"></script>
04          <script type="text/javascript" src="jquery.form.js"></script>
05          <script type="text/javascript">
06              var last_date = 0;
07              function fetchLatest() {
08                  var ul = $('ul');
09                  $.getJSON('chat?date=' + last_date, function(data){
11                      $.each(data, function() {
12                          ul.prepend($('<li></li>').text(new Date(this.date) + ": " + this.message));
13                          if (last_date < this.date)
14                              last_date = this.date;
15                      });
17                      setTimeout('fetchLatest()', 1);
18                  });
19              }
20          </script>
21          <script type="text/javascript">
22              $(document).ready(function() {
23                  fetchLatest();
24                  $('#publishForm').ajaxForm();
25              });
26          </script>
27      </head>
28      <body>
29          <p>
30              <form id="publishForm" action="chat" method="post">
31                  Message: <input type="text" name="message" />
32                  <input type="submit" value="Submit Comment" />
33              </form>
34          </p>
35          <p>Waiting for messages...</p>
36          <ul></ul>
37      </body>
38  </html> 

jQuery and the jQuery form plugin make it easy to perform GET and POST requests asynchronously.

Lines 30 to 33 define how the client will send chat messages to itself and other clients utilizing the POST requests. Line 24 uses the jQuery form plugin to perform the POST request asynchronously so the browser does not display the response.

Line 23 initiates long polling calling the function fetchLatest. This function, at lines 7 to 19 will use jQuery to perform a GET request asynchronously that expects a JSON array in response. The message objects in that array are iterated over and the HTML is updated for each message. Finally, at line 17 the function sets a timer to call itself (i am not sure if tail calls are optimized).

If you run the chat server and open two Firefox windows and in each go to the URL http://localhost:9999/app/ it should be possible send and receive chat messages from each window.

How messages are serialized to JSON

Instances of List[Message] are serialized where Message is as follows:

class Message(@BeanProperty val date : Date,
              @BeanProperty val message : String) {

The Jackson JAX-RS library is utilized to serialize a list of Message. The @BeanProperty annotation tells the Scala compiler to generate bean getter and setter methods and ths enables Jackson to workout what the bean properties are.

This is where it gets a little hacky (note that i have not investigated any Scala libraries for producing JSON). First, it is necessary to work around a bug in Atmosphere which does not take into account the media type declared in the @Produces when broadcasting (we are gong to fix that!). Second, Jackson does not know how to serialize the instances of the Scala List. So it is necessary to create a message body writer as follows:

01  @Provider
02  @Produces(Array("\*/\*"))
03  class ListProvider extends JacksonJsonProvider {
05    private val arrayListClass = classOf[java.util.ArrayList[_]];
07    override def isWriteable(c: Class[_],
08                             gt: Type,
09                             annotations: Array[Annotation],
10                             mediaType: MediaType) : boolean = {
11      classOf[List[_]].isAssignableFrom(c) &&
12          super.isWriteable(arrayListClass, arrayListClass,
13                            annotations, MediaType.APPLICATION_JSON_TYPE);
14    }
16    override def writeTo(t: Object,
17                         c: Class[_],
18                         gt: Type,
19                         annotations: Array[Annotation],
20                         mediaType: MediaType,
21                         httpHeaders: MultivaluedMap[String, Object],
22                         entityStream: OutputStream) : unit = {
24      val l = t.asInstanceOf[List[_]];
25      val al = new java.util.ArrayList[Any]();
26      for (m <- l) {
27        al.add(m);
28      }
30      super.writeTo(al, arrayListClass, arrayListClass,
31                    annotations, mediaType, httpHeaders, entityStream);
32    }
33  } 

The JacksonJsonProvider is extended and the methods associated with writing entities are overridden. This writer declares that the Scala type List is supported, at line 11, and the List is converted to an instance of java.util.ArrayList that Jackson understands when writing, at lines 24 to 28.

Using the Atmosphere spade server

The application is run using the Atmosphere spade server, which in turn uses embedded Grizzly Comet:

01  object ChatServer {
02    def main(args: Array[String]) {
03      try {
04        AtmosphereSpadeServer.build(args(0), "org.atmosphere.samples.lpchat").start();
05      } catch {
06        case ex : Exception => ex.printStackTrace;
07      }
08    }
09  }

Also since i was having much fun with JAX-RS, Jersey and Scala the HTML and JavaScript is returned from the following root resource class:

01  @Path("/")
02  class FilesResource {
04    @Path("jquery{id}.js")
05    @GET
06    def getJQuery(@PathParam("id") ps : PathSegment) = new File(ps.getPath());
08    @GET
09    def getIndex() = new File("index.html");
10  }

Passing thoughts

While it is obvious that this example is just a toy it does IMHO get across the concept on long polling rather well and example is very concise. I think this could be a good basis for a presentation to a JUG.

Examples such as this could also serve as the basis to develop common patterns and mechanisms in Atmosphere to manage lists of messages. The main problem in this respect is how store and retrieve messages. There are a myriad of ways and Atmosphere should preferably not dictate certain solutions in this respect e.g. perhaps i could convert the chat example to store and query messages using Couch DB?




« July 2016