Spring Integration based Dispatcher-Worker with Worker Queues

In the back-office world the central concept in most of the systems is one of a Trade. A Trade has many events (e.g. Inception, Amend, Novation, Termination). Generally events from different trades can be processed in parallel because they have no interdependencies, however, events from the same trade cannot be processed in parallel due to the fact that they modify the same internal instance of a Trade.

A useful pattern for this kind of scenario is dispatcher-worker with worker queues. Each worker has a job queue which it processes in a sequential fashion. Each job queue only contains events for a single trade. This allows parallel processing across trades while maintaining sequential processing on events for a single trade.

Image

I’ve developed simple version of this concept using Spring Integration. The first step is to create a Router that routes inbound trade events into channels that are specific to a trade. If the channel doesn’t exist then the Router will create a new one and register it with the Spring framework.

@Router
public String dispatch(CustomMessage inputMessage) {
  String channelName = inputMessage.getId() + CHANNEL_SUFFIX;

  synchronized (channelName.intern()) {
    if (activeChannels.get(channelName) == null) {
      QueueChannel activeChannel = createNewChannel(channelName);
      PollingConsumer activeConsumer = createConsumerWithWorker(inputMessage, activeChannel);
      activeConsumer.start();
    }
  }

  return channelName;
}

Creation of a channel is the only place where synchronisation is required. We only synchronise on the channel name which corresponds to the trade id. Hence contention is minimal. I also attach a Polling Consumer to the channel at the point that the channel is created. Creation of a channel and registering it to Spring framework is quite straight forward as shown in the snippet below:

private QueueChannel createNewChannel(String channelName) {
  QueueChannel activeChannel = new QueueChannel();
  activeChannel.setBeanName(channelName);
  activeChannels.put(channelName, activeChannel);
  applicationContext.getBeanFactory().registerSingleton(channelName, activeChannel);
  return activeChannel;
}

Although I attach a Polling Consumer to each channel. We don’t have to have a thread per channel. We can use a Task Executor to run the polling consumers which will allow much better control over the number of concurrent threads in the system using a thread pool:

private void startConsumingFromChannel(final String consumerName, final PollingConsumer activeConsumer) {
  activeConsumer.setBeanName(consumerName);
  activeConsumer.setAutoStartup(true);
  activeConsumer.setBeanFactory(applicationContext.getBeanFactory());
  activeConsumer.setTaskExecutor(consumerExecutorPool);
  applicationContext.getBeanFactory().registerSingleton(consumerName, activeConsumer);
}

Finally (not yet implemented) you can run a Reaper Thread that can remove channels and consumers that have not seen activity for a specified threshold. You can also back the inbound channel with a Message Store to ensure that the system can come backup in a consistent state on failure.

The source code is at Github.

Advertisements

Embeded Glassfish for Integration Tests

Sun Glassfish server can be run in embeded mode, very useful for automated integration tests. Here’s how:

Maven Dependencies:

<dependency>
  <groupId>org.glassfish.distributions</groupId>
  <artifactId>web-all</artifactId>
  <version>10.0-build-20080430</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.glassfish.embedded</groupId>
  <artifactId>gf-embedded-api</artifactId>
  <version>1.0-alpha-4</version>
  <scope>test</scope>
</dependency>

If you are not using maven then add the above libs to you classpath. Getting the server started in simple

final URI APP_URI= UriBuilder.fromUri("http://localhost/").port(8888).path("myapp").build();
GlassFish server= new GlassFish(BASE_URI.getPort());
ScatteredWar war = new ScatteredWar(APP_URI.getRawPath(),
   new File("src/main/webapp"),
   new File("src/main/webapp/WEB-INF/web.xml"),
   Collections.singleton(new File("target/classes").toURI().toURL()));
server.deploy(war);

You can then use any HTTP Client to send a HTTP Request to the server on a defined port. For automated integration tests use TestNG “dependsOnMethods” feature to ensure that the server was started before executing you integration test group(s). You can then have anther test which depends on your integration test group(s) to ensure that the server is stopped when tests have finished.

You can stop the server with server.stop(). Note: If you are using Maven then using cargo to bring up the server may be more advisable.

JVM Profiling: Thread Dumps

Thread dumps are possilby the most useful tool in diagnosing concurrancy related issues. You can get a thread dump at any time by sending a break signal to the JVM. In Windows it’s CTRL-Break and in Unix/Linux it’s kill -3 <pid>

The possible issues to look for are:

  • Deadlocks: You ‘ll find something similar to “Found one Java-level deadlock:" in the tread dump. Have a look here for an explanation of deadlocks.
  • Blocked: If there a number of threads blocked then look in the thread dump if they are blocked on the same monitor. This will indicate a heavily contented resource. You can see which thread has locked on the monitor by searching for “locked <monitor_id>“. You may want to review the design of this code (if you can change it) to ensure that you’re not over-zealous with locking or that the locking thread is not blocking on another monitor. Have a look at Software Transactional Memory for a possible alternative to locking.

You may also use “Thread Telemetry” view avialable in most JVM profiling tools (e.g. JProbe, JProfiler, YourKit etc.). This gives you an historical view of thread states in the JVM. Very useful for monitoring JVM performance over a period of time and spotting live-lock situations.

Controlling JBoss Deployments Using MBeans

JBoss allows you to programmatically control deployments using the MBean interface. Note: the same can be done using the JMX Console and Twiddle.

Approach 1: Copy the app to deploy folder and temporarily turn on hot-deployment scanner. This is not recommended for production since the deploy folder may contain other changes you may not have wanted deployed.

The JNDI name for the Deployment Scanner MBean is jboss.deployment:type=DeploymentScanner,flavor=URL. You can then use the following code to get a reference to the scanner. Note: this code will only work inside the container (you can initialise the JDNI context a performa  lookup if you want the code to run outside the container):

URLDeploymentScannerMBean deploymentScanner = 
   (URLDeploymentScannerMBean) MBeanProxyExt.create(
      URLDeploymentScannerMBean.class, "jboss.deployment:type=DeploymentScanner,flavor=URL",
      MBeanServerLocator.locateJBoss());

You can then deploymentScanner.stop() and deploymentScanner.start(). To determine if the scanner is already started you can check for it’s scannerMBean.getState() to be ServiceMBean.STARTED or ServiceMBean.STARTING

Approach 2: You can use the main deployer to deploy the app specifing the URL to it’s location. To obtain a reference to the deployer in the container:

MainDeployerMBean mainDeployer = (MainDeployerMBean) MBeanProxyExt.create(
   MainDeployerMBean.class, "jboss.system:service=MainDeployer",
   MBeanServerLocator.locateJBoss());

You can then deploy the app:

String webappUrl = "file:///home/username/apps/myapp.war";
if (deployerMBean.isDeployed(webappUrl)) {
   deployerMBean.redeploy(webappUrl);
} else {
   deployerMBean.deploy(webappUrl);
}

Find the jar file containing a class

I often need to know which jar file contains a particular class and I’m sure most people have that problem at some point. You can use jarFinder but it’s not always up-to-date or the class might be in a jar private to your organisation. You can use the following command to do a search on your file systems for all jars, and search in them for the class you’re trying to find:

find -name *.jar -print -exec jar tvf {} \; | egrep "\.jar|<Class Name>"

Replace <Class Name> with the name of your class (Case sensitive). Use cygwin if you are on windows.