We often need to ensure that messages are processed in strict order, in this blog post I explain a pattern that enables this to be enforced using Oracle Integration. The pattern also deals with the case where we need to limit the concurrency of calls to an endpoint system.
This article was co-written with Glenn Mi & David Craft
The basic problem is that I may issue a stream of requests that must be executed in order, for example create account, update account address, update account contacts. The latter two activities cannot occur until the first has completed.
Before we can sequence messages we need to know the order in which they should be processed, so there must be some sort of sequencing ID which we can use. This could be a timestamp or it could be an actual sequence ID. If we are using timestamps then the closer to the message origin that the timestamp is applied the better, for example if we take the timestamp from when it arrives in Oracle Integration then a network delay may already have caused our messages to be out of order.
Typically we don't want all messages to be in the same ordered sequence. In our account example only messages for a given account need to be ordered. Messages for different accounts can execute in parallel. So we also need some sort of group id to identify different sequence streams within our message stream.
Once we have the messages and know their order we can process them. Inherent in a resequencing solution is some sort of delay to allow messages to arrive out of order and then be sorted into order. The size of the delay specifies how much time we can accept a message to be delayed before we go ahead without it.
Based on the above we have the following key parameters to consider in our resequencing solution.
The solution uses a number of feature flags. These will become available in the product over the next few months. In the mean time they can be specifically requested as explained in the feature flag blog.
A integration in this tier front-ends the re-sequencer and the real business integration so that the message can go through the re-sequencer to be re-sequenced. The front end integrations are specific to the use case and act as a means of converting typed requests into a common format, including group and sequencing fields.
They receive the typed business payload and extract id and group id, from the message before calling the standardized producer message.
Note all these integrations are generic and can be reused as is.
Entry point of the re-sequencer. This receives the resequencing message, creates a new row in group table if it's not already there, and sets the status of the group to 'N'. It then creates a message in the message table. A sample message payload is shown below.
{ "gid": "gid", "gtype": "order", "id": "mid", "sequenceId": 123, "payload": "string repesentation of the payload" }
Detects the active groups and kicks off the message consumers. Scheduled to run every 1 minute. When scheduling, use this expression: FREQ=MINUTELY;INTERVAL=1;
This finds active groups, limiting parallelism to throttle outgoing calls to prevent overloading the target system. For each active group this will invoke a message consumer.
Processes active messages of the given group. It receives the group id and type from the group consumer. It loads active messages of the group ordered by sequence-id. The messages have to be at least parking time old. This ensures that we have a window for message to arrive out of order and be processed in order.
The consumer loops through active messages, it marks the message status as 'P' and invokes the Dispatcher. Note exceptions can occur here.
Once the dispatcher returns for a given message it is delete and the group status may be updated to mark the group status to be c if there are no active messages, or 'N' if there are new active messages.
A sample message payload is shown below:
{ "id": "Engineering", "type": "employee" }
This is a request/response integration which reconstructs the original payload and send to the real backend integration. Unlike the Resequencer Integrations, the dispatcher is not generic because it needs to invoke specific business integrations.
It receives the message and converts the payload to the original typed business payload. It uses the group id to find the business end point and invoke it synchronously. Exceptions can happen here.
The dispatchers interface is shown below:
{ "id": "Engineering",
"gid" : "Zebra", "gtype" : "order", "sequenceId" : 123,
"payload" : "original payload"}
This integration is used to manage the resequencer. Currently this manager supports three operations:
Path: /configs Method: GET
This operation returns the config of all the types.
Example of invocation:
$ curl https://my.integration.cloud/ic/api/integration/v1/flows/rest/RSQMANAGER/1.0/configs -v -u username:password
Path: /configs/{type} Method: PUT
This operation update the config for the given type.
Example of invocation:
$ curl -X PUT https://my.integration.cloud/ic/api/integration/v1/flows/rest/RSQMANAGER/1.0/configs/employee -v -u username:password -H "Content-Type: application/json" -d@config.json
config.json example:
{ "maxConcurrent": 5, "timeWindow": 11 }
Path: /types/{type}/groups/{group}/recover Method: PUT
This operation deletes stuck messages in the message table and reactivates the group by setting its status to 'N'.
Example of invocation:
$ curl -X PUT https://my.integration.cloud/ic/api/integration/v1/flows/rest/RSQMANAGER/1.0/types/employee/groups/eng/recover -v -u username:password -H "Content-length: 0"
A business integration is the real integration that processes the business messages. It has its own typed interface. For each Business Frontend integration there should be a corresponding Business Integration.
Exception can happen when the dispatcher invokes the business integration. This exception will bubble up to the message consumer and cause the message consumer instance to fail. When this happens. The group status will stay in status 'P in the database'.
If the problem is caused by some system error like networking issues then after fixing the problem, you can recover by resubmitting the failed message consumer instance.
If the error is caused by a bad request, resubmitting the request will not help. In this case we would need to skip the bad request and move on. To do this you can invoke the resequencer manager integration to remove the stuck message and reactivate the group:
$ curl -X PUT https://my.integration.cloud/ic/api/integration/v1/flows/rest/RSQMANAGER/1.0/types/employee/groups/eng/recover -v -u username:password -H "Content-length: 0"
We used the ATP adapter to minimize operations overhead (autonomous database manages itself) and reduce latency (no need for additional agent hop).This requires us to enable feature oic.cloudadapter.adapters.atpdatabase.
To be able to convert an opaque string to type XML in RSQDispatcher, it is easiest to enable feature oic.ics.mapper.jetmap-enablement, which allows you to easily modify the mapping xsl as explained below:
In mapping select to map the opaque string, and convert it using parseEscapedXml() function. Then view the mapping xsl, modify it to change value-of to copy-of. and then import it.
Mapping Before | Mapping After |
---|---|
<ns2:Employee xml:id="id_16"> <xsl:value-of xml:id="id_21" select="oraext:parseEscapedXML (/nssrcmpr:execute/ns0:request-wrapper/ns0:payload )"/> </ns2:Employee> |
<ns2:Employee xml:id="id_16"> <xsl:copy-of xml:id="id_21" select="oraext:parseEscapedXML (/nssrcmpr:execute/ns0:request-wrapper/ns0:payload )"/> </ns2:Employee> |
You can deploy this par file. It has the following connections that need configuring and activating.
The solution uses a database. You can execute this SQL script to create the required tables in the database.
This article was written with assistance from Glenn Mi & David Craft