X

The Integration blog covers the latest in product updates, best practices, customer stories, and more.

Ordering Delivery with Oracle Integration

Antony Reynolds
Senior Director Integration Strategy

We often need to ensure that messages are processed in strict order, in this blog post I explain a pattern that enables this to be enforced using Oracle Integration.  The pattern also deals with the case where we need to limit the concurrency of calls to an endpoint system.

This article was co-written with Glenn Mi & David Craft

The Sequencing Problem

The basic problem is that I may issue a stream of requests that must be executed in order, for example create account, update account address, update account contacts.  The latter two activities cannot occur until the first has completed.

 Before we can sequence messages we need to know the order in which they should be processed, so there must be some sort of sequencing ID which we can use.  This could be a timestamp or it could be an actual sequence ID.  If we are using timestamps then the closer to the message origin that the timestamp is applied the better, for example if we take the timestamp from when it arrives in Oracle Integration then a network delay may already have caused our messages to be out of order.

Typically we don't want all messages to be in the same ordered sequence.  In our account example only messages for a given account need to be ordered.  Messages for different accounts can execute in parallel.  So we also need some sort of group id to identify different sequence streams within our message stream.

Once we have the messages and know their order we can process them.  Inherent in a resequencing solution is some sort of delay to allow messages to arrive out of order and then be sorted into order.  The size of the delay specifies how much time we can accept a message to be delayed before we go ahead without it.

Key Parameters

Based on the above we have the following key parameters to consider in our resequencing solution.

  • gtype - Group Type Field - this is used to identify the type of stream.  Different message types may be sequenced in parallel, for instance acconut updates and personnel updates would be different group types.
  • gid - Message Group Field - a field in the request that identifies a specific stream of messages to be sequenced
  • id - Message Identifier Field - a unique identifier for this message.
  • sequenceId - Message Sequence Field - a field in the request or a timestamp that is used to determine sequencing of messages in a stream
  • Parking Time - amount of time that messages may be delayed in order to ensure messages are processed in order
  • Message Concurrency - Max Number of Message Groups to be processed in parallel

Solution Outline

  1. Process the input message based on the desired sequence id rather than the order they come in.
  2. Each message will be parked in the storage for a certain period of time (parking time) so that out of the sequence messages have a chance to be processed in the desired order.
  3. The max number of message groups being processed in parallel can be configured to throttle the outgoing calls.
  4. The solution is delivered as a set of integrations/connections/db scripts which use the current out of the box OIC features.
  5. The solution is generic. Meaning it can be used to re-sequence different typed business integrations without modification to the provided integrations.
  6. Error handling of both system/network error and bad requests.

Feature Flags

The solution uses a number of feature flags.  These will become available in the product over the next few months.  In the mean time they can be specifically requested as explained in the feature flag blog.

  • next generation mapper (oic.ics.mapper.jetmap-enablement) – needed for modifying Dispatcher to convert opaque payload to typed payload
  • Adapter for ATP CS (oic.cloudadapter.adapters.atpdatabase) – needed for accessing ATP db instance.
  • REST multiple resource support in inbound. (oic.cloudadapter.adapter.rest.mvrp) – needed for design RSQManager's multiple operations.

Solution Architecture

Business Front-end Integrations

A integration in this tier front-ends the re-sequencer and the real business integration so that the message can go through the re-sequencer to be re-sequenced.  The front end integrations are specific to the use case and act as a means of converting typed requests into a common format, including group and sequencing fields.

They receive the typed business payload and extract id and group id, from the message before calling the standardized producer message.

Resequencer Integrations

Note all these integrations are generic and can be reused as is.

Producer

Entry point of the re-sequencer.  This receives the resequencing message, creates a new row in group table if it's not already there, and sets the status of the group to 'N'.  It then creates a message in the message table.  A sample message payload is shown below.

{ "gid": "gid", "gtype": "order", "id": "mid", "sequenceId": 123, "payload": "string repesentation of the payload" }

Group Consumer

Detects the active groups and kicks off the message consumers.  Scheduled to run every 1 minute. When scheduling, use this expression: FREQ=MINUTELY;INTERVAL=1;

This finds active groups, limiting parallelism to throttle outgoing calls to prevent overloading the target system.  For each active group this will invoke a message consumer.

Message Consumer

Processes active messages of the given group.  It receives the group id and type from the group consumer.  It loads active messages of the group ordered by sequence-id. The messages have to be at least parking time old.  This ensures that we have a window for message to arrive out of order and be processed in order.

The consumer loops through active messages, it marks the message status as 'P' and invokes the Dispatcher. Note exceptions can occur here.

Once the dispatcher returns for a given message it is delete and the group status may be updated to mark the group status to be c if there are no active messages, or 'N' if there are new active messages.

A sample message payload is shown below:

{ "id": "Engineering", "type": "employee" }

Dispatcher

This is a request/response integration which reconstructs the original payload and send to the real backend integration.  Unlike the Resequencer Integrations, the dispatcher is not generic because it needs to invoke specific business integrations.

It receives the message and converts the payload to the original typed business payload.  It uses the group id to find the business end point and invoke it synchronously. Exceptions can happen here.

The dispatchers interface is shown below:

{ "id": "Engineering", "gid" : "Zebra", "gtype" : "order", "sequenceId" : 123, "payload" : "original payload"}

Manager

This integration is used to manage the resequencer. Currently this manager supports three operations:

Get configs

Path: /configs Method: GET

This operation returns the config of all the types.

Example of invocation:

$ curl https://my.integration.cloud/ic/api/integration/v1/flows/rest/RSQMANAGER/1.0/configs -v -u username:password

Update config

Path: /configs/{type} Method: PUT

This operation update the config for the given type.

Example of invocation:

$ curl -X PUT https://my.integration.cloud/ic/api/integration/v1/flows/rest/RSQMANAGER/1.0/configs/employee -v -u username:password  -H "Content-Type: application/json" -d@config.json

config.json example:

{ "maxConcurrent": 5, "timeWindow": 11 }

Recover Group

Path: /types/{type}/groups/{group}/recover Method: PUT

This operation deletes stuck messages in the message table and reactivates the group by setting its status to 'N'.

Example of invocation:

$ curl -X PUT https://my.integration.cloud/ic/api/integration/v1/flows/rest/RSQMANAGER/1.0/types/employee/groups/eng/recover -v -u username:password -H "Content-length: 0"

Business Integrations

A business integration is the real integration that processes the business messages. It has its own typed interface.  For each Business Frontend integration there should be a corresponding Business Integration.

Error handling

Exception can happen when the dispatcher invokes the business integration.  This exception will bubble up to the message consumer and cause the message consumer instance to fail.  When this happens. The group status will stay in status 'P in the database'.

In the Monitoring Integration page you can see the failed dispatcher instance and the message consumer instance.

Recovery

Recover System/network error

If the problem is caused by some system error like networking issues then after fixing the problem, you can recover by resubmitting the failed message consumer instance.

Recover bad request

If the error is caused by a bad request, resubmitting the request will not help. In this case we would need to skip the bad request and move on. To do this you can invoke the resequencer manager integration to remove the stuck message and reactivate the group:

$ curl -X PUT https://my.integration.cloud/ic/api/integration/v1/flows/rest/RSQMANAGER/1.0/types/employee/groups/eng/recover -v -u username:password -H "Content-length: 0"

Notes

We used the ATP adapter to minimize operations overhead (autonomous database manages itself) and reduce latency (no need for additional agent hop).This requires us to enable feature oic.cloudadapter.adapters.atpdatabase.

To be able to convert an opaque string to type XML in RSQDispatcher, it is easiest to enable feature oic.ics.mapper.jetmap-enablement, which allows you to easily modify the mapping xsl as explained below:

In mapping select to map the opaque string, and convert it using parseEscapedXml() function. Then view the mapping xsl, modify it to change value-of to copy-of. and then import it.

Mapping Before Mapping After
<ns2:Employee xml:id="id_16">     <xsl:value-of xml:id="id_21" select="oraext:parseEscapedXML (/nssrcmpr:execute/ns0:request-wrapper/ns0:payload )"/> </ns2:Employee> <ns2:Employee xml:id="id_16">     <xsl:copy-of xml:id="id_21" select="oraext:parseEscapedXML (/nssrcmpr:execute/ns0:request-wrapper/ns0:payload )"/> </ns2:Employee>

Set Up

You can deploy this par file.  It has the following connections that need configuring and activating.

  • RSQ DB
    • Invoke
    • ATP database used by resequencer
  • RSQMessageConsumer
    • Trigger & Invoke
    • Used to cause load distribution of calls to message consumer
  • RSQManager
    • Trigger
    • Used to invoke manager interface
  • RSQProducer
    • Trigger
    • Used to invoke producer interface
  • RSQDispatcher
    • Trigger
    • Used to invoke dispatcher interface
  • TestService
    • Invoke
    • Used to invoke a sample test service

The solution uses a database.  You can execute this SQL script to create the required tables in the database.

Acknowledgements

This article was written with assistance from Glenn Mi & David Craft

 

 

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.