Discussion:
[mule-scm] [mule][24786] branches/mule-3.2.x: MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy
Daniel Feist
2012-08-21 12:20:07 UTC
Permalink
You currently need to add everything you add to default-mule-config.xml to org.mule.config.builders.DefaultsConfigurationBuilder also.

Dan
Revision
24786
Author
pablo.lagreca
Date
2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012)
Log Message
MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy
Modified Paths
branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml
Added Paths
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
Diff
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -148,6 +148,7 @@
public static final String OBJECT_DEFAULT_RETRY_POLICY_TEMPLATE = "_defaultRetryPolicyTemplate";
public static final String OBJECT_MULE_CONFIGURATION = "_muleConfiguration";
public static final String OBJECT_MULE_NAMESPACE_MANAGER = "_muleNamespaceManager";
+ public static final String OBJECT_LOCK_FACTORY = "_muleLockFactory";
// Not currently used as these need to be instance variables of the MuleContext.
public static final String OBJECT_WORK_MANAGER = "_muleWorkManager";
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -49,7 +49,7 @@
@Override
public void initialise() throws InitialisationException
{
- if (maxRedeliveryCount < 1)
+ if (maxRedeliveryCount < 0)
{
throw new InitialisationException(
CoreMessages.initialisationFailure(
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -25,10 +25,13 @@
import org.mule.transformer.simple.ObjectToByteArray;
import java.io.InputStream;
+import java.io.Serializable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.mule.util.lock.Lock;
+import org.mule.util.lock.LockFactory;
import org.mule.util.store.ObjectStorePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@
private String messageDigestAlgorithm;
private String idExpression;
private ObjectStore<AtomicInteger> store;
+ private Lock<Serializable> lock;
@Override
public void initialise() throws InitialisationException
@@ -95,6 +99,11 @@
}
}
+ String appName = muleContext.getConfiguration().getId();
+ String flowName = flowConstruct.getName();
+ String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+ lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+
store = createStore();
}
@@ -165,45 +174,55 @@
exceptionSeen = true;
}
- if (!exceptionSeen)
+ lock.lock(messageId);
+ try
{
- counter = findCounter(messageId);
- tooMany = counter != null && counter.get() > maxRedeliveryCount;
- }
- if (tooMany || exceptionSeen)
- {
- try
+ if (!exceptionSeen)
{
- return deadLetterQueue.process(event);
+ counter = findCounter(messageId);
+ tooMany = counter != null && counter.get() > maxRedeliveryCount;
}
- catch (Exception ex)
+
+ if (tooMany || exceptionSeen)
{
- logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+ try
+ {
+ return deadLetterQueue.process(event);
+ }
+ catch (Exception ex)
+ {
+ logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+ }
+ return null;
}
- return null;
- }
- try
- {
- MuleEvent returnEvent = processNext(event);
- counter = findCounter(messageId);
- if (counter != null)
+ try
{
- resetCounter(messageId);
+ MuleEvent returnEvent = processNext(event);
+ counter = findCounter(messageId);
+ if (counter != null)
+ {
+ resetCounter(messageId);
+ }
+ return returnEvent;
}
- return returnEvent;
+ catch (MuleException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ }
+ catch (RuntimeException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ }
}
- catch (MuleException ex)
+ finally
{
- incrementCounter(messageId);
- throw ex;
+ lock.unlock(messageId);
}
- catch (RuntimeException ex)
- {
- incrementCounter(messageId);
- throw ex;
- }
+
}
private void resetCounter(String messageId) throws ObjectStoreException
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,30 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+ /*
+ * Gets a lock over the resource identified with lockId
+ */
+ void lock(T lockId);
+
+ /*
+ * Releases lock over the resource identified with lockId
+ */
+ void unlock(T lockId);
+
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,25 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+ /**
+ * Creates a Lock for a given resource using the resource unique identifier.
+ */
+ Lock<T> createLock(String lockResourceName);
+
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,104 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+ private Map<T, LockEntry> locks;
+ private Object acquireLock = new Object();
+
+ public ServerLock()
+ {
+ this.locks = new HashMap<T,LockEntry>();
+ }
+
+ public void lock(T key)
+ {
+ LockEntry lock;
+ synchronized (acquireLock)
+ {
+ if (this.locks.containsKey(key))
+ {
+ lock = this.locks.get(key);
+ }
+ else
+ {
+ lock = new LockEntry();
+ this.locks.put(key,lock);
+ }
+ lock.incrementLockCount();
+ acquireLock.notifyAll();
+ }
+ lock.lock();
+ }
+
+ public void unlock(T key)
+ {
+ synchronized (acquireLock)
+ {
+ LockEntry lock = this.locks.get(key);
+ if (lock != null)
+ {
+ lock.decrementLockCount();
+ if (!lock.hasPendingLocks())
+ {
+ this.locks.remove(key);
+ }
+ lock.unlock();
+ }
+ acquireLock.notifyAll();
+ }
+ }
+
+ public static class LockEntry
+ {
+ private AtomicInteger lockCount = new AtomicInteger(0);
+ private ReentrantLock lock = new ReentrantLock(true);
+
+ public void lock()
+ {
+ lock.lock();
+ }
+
+ public void incrementLockCount()
+ {
+ lockCount.incrementAndGet();
+ }
+
+ public void decrementLockCount()
+ {
+ lockCount.decrementAndGet();
+ }
+
+ public void unlock()
+ {
+ lock.unlock();
+ }
+
+ public boolean hasPendingLocks()
+ {
+ return lockCount.get() > 0;
+ }
+ }
+
+ public void dispose()
+ {
+ locks.clear();
+ }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,19 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+ public Lock<T> createLock(String lockResourceName)
+ {
+ return new ServerLock<T>();
+ }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -10,71 +10,97 @@
package org.mule.processor;
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
import org.mockito.Answers;
-import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
import org.mule.api.config.MuleProperties;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.store.ObjectStore;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
{
public static final String STRING_MESSAGE = "message";
- public static final int MAX_REDELIVERY_COUNT = 1;
+ public static final int MAX_REDELIVERY_COUNT = 0;
private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
- private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+ private Latch waitLatch = new Latch();
+ private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+ private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
+ public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
@Before
public void setUpTest() throws MuleException
{
when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
- System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+ when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+ {
+ public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+ {
+ waitingMessageProcessorExecutionLatch.countDown();
+ waitLatch.await(2000, TimeUnit.MILLISECONDS);
+ return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+ }
+ });
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+ InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+ when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+ when(event.getMessage()).thenReturn(message);
+ irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+ irp.setUseSecureHash(true);
+ irp.setFlowConstruct(mock(FlowConstruct.class));
+ irp.setMuleContext(mockMuleContext);
+ irp.setListener(mockFailingMessageProcessor);
+ irp.setMessageProcessor(mockDlqMessageProcessor);
+
}
@Test
public void messageDigestFailure() throws Exception
{
- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
+ when(message.getPayload()).thenReturn(new Object());
irp.initialise();
-
-
- when(message.getPayload()).thenReturn(new Object());
-
- when(event.getMessage()).thenReturn(message);
MuleEvent process = irp.process(event);
Assert.assertNull(process);
}
@@ -82,22 +108,42 @@
@Test
public void testMessageRedeliveryUsingMemory() throws Exception
{
- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
+ irp.initialise();
+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
- irp.setUseSecureHash(true);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
- irp.setDeadLetterQueue(mockDlqMessageProcessor);
+ public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ {
+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
irp.initialise();
+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
+ public void testThreadSafeObjectStoreUsage() throws Exception
+ {
when(message.getPayload()).thenReturn(STRING_MESSAGE);
- when(event.getMessage()).thenReturn(message);
+ irp.setListener(mockWaitingMessageProcessor);
+ irp.initialise();
- for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+ ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+ firstIrpExecutionThread.start();
+ ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+ threadCausingRedeliveryException.start();
+ waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+ waitLatch.release();
+ firstIrpExecutionThread.join();
+ threadCausingRedeliveryException.join();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
+
+ private void processUntilFailure()
+ {
+ for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
{
try
{
@@ -107,28 +153,14 @@
{
}
}
- verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
}
- public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ public class ExecuteIrpThread extends Thread
{
- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
- irp.setDeadLetterQueue(mockDlqMessageProcessor);
- irp.initialise();
-
- when(message.getPayload()).thenReturn(STRING_MESSAGE);
- when(event.getMessage()).thenReturn(message);
-
- for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+ public Exception exception;
+
+ public void run()
{
try
{
@@ -136,15 +168,18 @@
}
catch (Exception e)
{
+ exception = e;
}
}
- verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
}
+
+
public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
{
private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+ private ServerLock lockableObjectStore = new ServerLock();
@Override
public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +212,13 @@
{
return false;
}
+
}
public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
{
private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+ private ServerLock lockableObjectStore = new ServerLock();
@Override
public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +249,7 @@
{
return false;
}
+
}
}
Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+ private final String name;
+ private String value;
+ private boolean initialized;
+ private String oldValue;
+
+ public SystemProperty(String name)
+ {
+ this(name, null);
+ }
+
+ public SystemProperty(String name, String value)
+ {
+ this.name = name;
+ this.value = value;
+ }
+
+ protected void before() throws Throwable
+ {
+ if (initialized)
+ {
+ throw new IllegalArgumentException("System property was already initialized");
+ }
+
+ oldValue = System.setProperty(name, getValue());
+ initialized = true;
+ }
+
+ protected void after()
+ {
+ if (!initialized)
+ {
+ throw new IllegalArgumentException("System property was not initialized");
+ }
+
+ doCleanUp();
+ restoreOldValue();
+
+ initialized = false;
+ }
+
+ protected void restoreOldValue()
+ {
+ if (oldValue == null)
+ {
+ System.clearProperty(name);
+ }
+ else
+ {
+ System.setProperty(name, oldValue);
+ }
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ protected void doCleanUp()
+ {
+ // Nothing to do
+ };
+
+ public String getValue()
+ {
+ return value;
+ };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,151 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+ public static final int THREAD_COUNT = 100;
+ public static final int ITERATIONS_PER_THREAD = 100;
+ private Latch threadStartLatch = new Latch();
+ private String sharedKeyA = "A";
+ private String sharedKeyB = "B";
+ private ServerLock<String> serverLock = new ServerLock<String>();
+ private InMemoryObjectStore objectStore = new InMemoryObjectStore();
+
+ public void testHighConcurrency() throws Exception
+ {
+ List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+ for (int i = 0; i < THREAD_COUNT; i++)
+ {
+ IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ }
+ threadStartLatch.release();
+ for (Thread thread : threads)
+ {
+ thread.join();
+ }
+ assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ }
+
+ public class IncrementKeyValueThread extends Thread
+ {
+ private String key;
+
+ public IncrementKeyValueThread(String key)
+ {
+ super("Thread-" + key);
+ this.key = key;
+ }
+
+ public void run()
+ {
+ try
+ {
+ threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+ for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+ {
+ if (Thread.interrupted())
+ {
+ break;
+ }
+ serverLock.lock(key);
+ try
+ {
+ Integer value;
+ if (objectStore.contains(key))
+ {
+ value = objectStore.retrieve(key);
+ objectStore.remove(key);
+ }
+ else
+ {
+ value = 0;
+ }
+ objectStore.store(key,value + 1);
+ }
+ finally
+ {
+ serverLock.unlock(key);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class InMemoryObjectStore implements ObjectStore<Integer>
+ {
+ private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+ public boolean contains(Serializable key) throws ObjectStoreException
+ {
+ return store.containsKey(key);
+ }
+
+ public void store(Serializable key, Integer value) throws ObjectStoreException
+ {
+ if (store.containsKey(key))
+ {
+ throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+ }
+ store.put(key,value);
+ }
+
+ public Integer retrieve(Serializable key) throws ObjectStoreException
+ {
+ return store.get(key);
+ }
+
+ public Integer remove(Serializable key) throws ObjectStoreException
+ {
+ return store.remove(key);
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+ }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml (24785 => 24786)
--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml 2012-08-21 05:18:14 UTC (rev 24786)
@@ -86,6 +86,8 @@
<bean name="_defaultRetryPolicyTemplate" class="org.mule.retry.policies.NoRetryPolicyTemplate"/>
+ <bean name="_muleLockFactory" class="org.mule.util.lock.ServerLockFactory"/>
+
<!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that
the transformers will be available even when using the TransientRegistry only -->
http://xircles.codehaus.org/manage_email
Daniel Feist
2012-08-21 12:22:42 UTC
Permalink
Does IdempotentMessageFilter need the same?

Dan
Revision
24786
Author
pablo.lagreca
Date
2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012)
Log Message
MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy
Modified Paths
branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml
Added Paths
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
Diff
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -148,6 +148,7 @@
public static final String OBJECT_DEFAULT_RETRY_POLICY_TEMPLATE = "_defaultRetryPolicyTemplate";
public static final String OBJECT_MULE_CONFIGURATION = "_muleConfiguration";
public static final String OBJECT_MULE_NAMESPACE_MANAGER = "_muleNamespaceManager";
+ public static final String OBJECT_LOCK_FACTORY = "_muleLockFactory";
// Not currently used as these need to be instance variables of the MuleContext.
public static final String OBJECT_WORK_MANAGER = "_muleWorkManager";
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -49,7 +49,7 @@
@Override
public void initialise() throws InitialisationException
{
- if (maxRedeliveryCount < 1)
+ if (maxRedeliveryCount < 0)
{
throw new InitialisationException(
CoreMessages.initialisationFailure(
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -25,10 +25,13 @@
import org.mule.transformer.simple.ObjectToByteArray;
import java.io.InputStream;
+import java.io.Serializable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.mule.util.lock.Lock;
+import org.mule.util.lock.LockFactory;
import org.mule.util.store.ObjectStorePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@
private String messageDigestAlgorithm;
private String idExpression;
private ObjectStore<AtomicInteger> store;
+ private Lock<Serializable> lock;
@Override
public void initialise() throws InitialisationException
@@ -95,6 +99,11 @@
}
}
+ String appName = muleContext.getConfiguration().getId();
+ String flowName = flowConstruct.getName();
+ String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+ lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+
store = createStore();
}
@@ -165,45 +174,55 @@
exceptionSeen = true;
}
- if (!exceptionSeen)
+ lock.lock(messageId);
+ try
{
- counter = findCounter(messageId);
- tooMany = counter != null && counter.get() > maxRedeliveryCount;
- }
- if (tooMany || exceptionSeen)
- {
- try
+ if (!exceptionSeen)
{
- return deadLetterQueue.process(event);
+ counter = findCounter(messageId);
+ tooMany = counter != null && counter.get() > maxRedeliveryCount;
}
- catch (Exception ex)
+
+ if (tooMany || exceptionSeen)
{
- logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+ try
+ {
+ return deadLetterQueue.process(event);
+ }
+ catch (Exception ex)
+ {
+ logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+ }
+ return null;
}
- return null;
- }
- try
- {
- MuleEvent returnEvent = processNext(event);
- counter = findCounter(messageId);
- if (counter != null)
+ try
{
- resetCounter(messageId);
+ MuleEvent returnEvent = processNext(event);
+ counter = findCounter(messageId);
+ if (counter != null)
+ {
+ resetCounter(messageId);
+ }
+ return returnEvent;
}
- return returnEvent;
+ catch (MuleException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ }
+ catch (RuntimeException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ }
}
- catch (MuleException ex)
+ finally
{
- incrementCounter(messageId);
- throw ex;
+ lock.unlock(messageId);
}
- catch (RuntimeException ex)
- {
- incrementCounter(messageId);
- throw ex;
- }
+
}
private void resetCounter(String messageId) throws ObjectStoreException
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,30 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+ /*
+ * Gets a lock over the resource identified with lockId
+ */
+ void lock(T lockId);
+
+ /*
+ * Releases lock over the resource identified with lockId
+ */
+ void unlock(T lockId);
+
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,25 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+ /**
+ * Creates a Lock for a given resource using the resource unique identifier.
+ */
+ Lock<T> createLock(String lockResourceName);
+
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,104 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+ private Map<T, LockEntry> locks;
+ private Object acquireLock = new Object();
+
+ public ServerLock()
+ {
+ this.locks = new HashMap<T,LockEntry>();
+ }
+
+ public void lock(T key)
+ {
+ LockEntry lock;
+ synchronized (acquireLock)
+ {
+ if (this.locks.containsKey(key))
+ {
+ lock = this.locks.get(key);
+ }
+ else
+ {
+ lock = new LockEntry();
+ this.locks.put(key,lock);
+ }
+ lock.incrementLockCount();
+ acquireLock.notifyAll();
+ }
+ lock.lock();
+ }
+
+ public void unlock(T key)
+ {
+ synchronized (acquireLock)
+ {
+ LockEntry lock = this.locks.get(key);
+ if (lock != null)
+ {
+ lock.decrementLockCount();
+ if (!lock.hasPendingLocks())
+ {
+ this.locks.remove(key);
+ }
+ lock.unlock();
+ }
+ acquireLock.notifyAll();
+ }
+ }
+
+ public static class LockEntry
+ {
+ private AtomicInteger lockCount = new AtomicInteger(0);
+ private ReentrantLock lock = new ReentrantLock(true);
+
+ public void lock()
+ {
+ lock.lock();
+ }
+
+ public void incrementLockCount()
+ {
+ lockCount.incrementAndGet();
+ }
+
+ public void decrementLockCount()
+ {
+ lockCount.decrementAndGet();
+ }
+
+ public void unlock()
+ {
+ lock.unlock();
+ }
+
+ public boolean hasPendingLocks()
+ {
+ return lockCount.get() > 0;
+ }
+ }
+
+ public void dispose()
+ {
+ locks.clear();
+ }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,19 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+ public Lock<T> createLock(String lockResourceName)
+ {
+ return new ServerLock<T>();
+ }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -10,71 +10,97 @@
package org.mule.processor;
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
import org.mockito.Answers;
-import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
import org.mule.api.config.MuleProperties;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.store.ObjectStore;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
{
public static final String STRING_MESSAGE = "message";
- public static final int MAX_REDELIVERY_COUNT = 1;
+ public static final int MAX_REDELIVERY_COUNT = 0;
private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
- private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+ private Latch waitLatch = new Latch();
+ private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+ private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
+ public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
@Before
public void setUpTest() throws MuleException
{
when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
- System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+ when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+ {
+ public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+ {
+ waitingMessageProcessorExecutionLatch.countDown();
+ waitLatch.await(2000, TimeUnit.MILLISECONDS);
+ return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+ }
+ });
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+ InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+ when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+ when(event.getMessage()).thenReturn(message);
+ irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+ irp.setUseSecureHash(true);
+ irp.setFlowConstruct(mock(FlowConstruct.class));
+ irp.setMuleContext(mockMuleContext);
+ irp.setListener(mockFailingMessageProcessor);
+ irp.setMessageProcessor(mockDlqMessageProcessor);
+
}
@Test
public void messageDigestFailure() throws Exception
{
- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
+ when(message.getPayload()).thenReturn(new Object());
irp.initialise();
-
-
- when(message.getPayload()).thenReturn(new Object());
-
- when(event.getMessage()).thenReturn(message);
MuleEvent process = irp.process(event);
Assert.assertNull(process);
}
@@ -82,22 +108,42 @@
@Test
public void testMessageRedeliveryUsingMemory() throws Exception
{
- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
+ irp.initialise();
+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
- irp.setUseSecureHash(true);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
- irp.setDeadLetterQueue(mockDlqMessageProcessor);
+ public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ {
+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
irp.initialise();
+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
+ public void testThreadSafeObjectStoreUsage() throws Exception
+ {
when(message.getPayload()).thenReturn(STRING_MESSAGE);
- when(event.getMessage()).thenReturn(message);
+ irp.setListener(mockWaitingMessageProcessor);
+ irp.initialise();
- for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+ ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+ firstIrpExecutionThread.start();
+ ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+ threadCausingRedeliveryException.start();
+ waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+ waitLatch.release();
+ firstIrpExecutionThread.join();
+ threadCausingRedeliveryException.join();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
+
+ private void processUntilFailure()
+ {
+ for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
{
try
{
@@ -107,28 +153,14 @@
{
}
}
- verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
}
- public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ public class ExecuteIrpThread extends Thread
{
- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
- irp.setDeadLetterQueue(mockDlqMessageProcessor);
- irp.initialise();
-
- when(message.getPayload()).thenReturn(STRING_MESSAGE);
- when(event.getMessage()).thenReturn(message);
-
- for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+ public Exception exception;
+
+ public void run()
{
try
{
@@ -136,15 +168,18 @@
}
catch (Exception e)
{
+ exception = e;
}
}
- verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
}
+
+
public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
{
private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+ private ServerLock lockableObjectStore = new ServerLock();
@Override
public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +212,13 @@
{
return false;
}
+
}
public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
{
private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+ private ServerLock lockableObjectStore = new ServerLock();
@Override
public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +249,7 @@
{
return false;
}
+
}
}
Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+ private final String name;
+ private String value;
+ private boolean initialized;
+ private String oldValue;
+
+ public SystemProperty(String name)
+ {
+ this(name, null);
+ }
+
+ public SystemProperty(String name, String value)
+ {
+ this.name = name;
+ this.value = value;
+ }
+
+ protected void before() throws Throwable
+ {
+ if (initialized)
+ {
+ throw new IllegalArgumentException("System property was already initialized");
+ }
+
+ oldValue = System.setProperty(name, getValue());
+ initialized = true;
+ }
+
+ protected void after()
+ {
+ if (!initialized)
+ {
+ throw new IllegalArgumentException("System property was not initialized");
+ }
+
+ doCleanUp();
+ restoreOldValue();
+
+ initialized = false;
+ }
+
+ protected void restoreOldValue()
+ {
+ if (oldValue == null)
+ {
+ System.clearProperty(name);
+ }
+ else
+ {
+ System.setProperty(name, oldValue);
+ }
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ protected void doCleanUp()
+ {
+ // Nothing to do
+ };
+
+ public String getValue()
+ {
+ return value;
+ };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,151 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+ public static final int THREAD_COUNT = 100;
+ public static final int ITERATIONS_PER_THREAD = 100;
+ private Latch threadStartLatch = new Latch();
+ private String sharedKeyA = "A";
+ private String sharedKeyB = "B";
+ private ServerLock<String> serverLock = new ServerLock<String>();
+ private InMemoryObjectStore objectStore = new InMemoryObjectStore();
+
+ public void testHighConcurrency() throws Exception
+ {
+ List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+ for (int i = 0; i < THREAD_COUNT; i++)
+ {
+ IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ }
+ threadStartLatch.release();
+ for (Thread thread : threads)
+ {
+ thread.join();
+ }
+ assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ }
+
+ public class IncrementKeyValueThread extends Thread
+ {
+ private String key;
+
+ public IncrementKeyValueThread(String key)
+ {
+ super("Thread-" + key);
+ this.key = key;
+ }
+
+ public void run()
+ {
+ try
+ {
+ threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+ for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+ {
+ if (Thread.interrupted())
+ {
+ break;
+ }
+ serverLock.lock(key);
+ try
+ {
+ Integer value;
+ if (objectStore.contains(key))
+ {
+ value = objectStore.retrieve(key);
+ objectStore.remove(key);
+ }
+ else
+ {
+ value = 0;
+ }
+ objectStore.store(key,value + 1);
+ }
+ finally
+ {
+ serverLock.unlock(key);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class InMemoryObjectStore implements ObjectStore<Integer>
+ {
+ private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+ public boolean contains(Serializable key) throws ObjectStoreException
+ {
+ return store.containsKey(key);
+ }
+
+ public void store(Serializable key, Integer value) throws ObjectStoreException
+ {
+ if (store.containsKey(key))
+ {
+ throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+ }
+ store.put(key,value);
+ }
+
+ public Integer retrieve(Serializable key) throws ObjectStoreException
+ {
+ return store.get(key);
+ }
+
+ public Integer remove(Serializable key) throws ObjectStoreException
+ {
+ return store.remove(key);
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+ }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml (24785 => 24786)
--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml 2012-08-21 05:18:14 UTC (rev 24786)
@@ -86,6 +86,8 @@
<bean name="_defaultRetryPolicyTemplate" class="org.mule.retry.policies.NoRetryPolicyTemplate"/>
+ <bean name="_muleLockFactory" class="org.mule.util.lock.ServerLockFactory"/>
+
<!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that
the transformers will be available even when using the TransientRegistry only -->
http://xircles.codehaus.org/manage_email
Pablo La Greca
2012-08-21 17:25:55 UTC
Permalink
LockFactory was added to org.mule.config.builders.DefaultsConfigurationBuilder
as suggested.

IdempotentMessageFilter relies on the fact that storing twice the same key
in an object store will fail. So it's not the same case.
Post by Daniel Feist
Does IdempotentMessageFilter need the same?
Dan
Revision 24786 <http://fisheye.codehaus.org/changelog/mule/?cs=24786>
Author pablo.lagreca Date 2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012) Log
Message
MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy
Modified Paths
-
branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
-
branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml
Added Paths
- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/
- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/
-
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
Diff
branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2012-08-21 03:52:07 UTC (rev 24785)
branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
+ String flowName = flowConstruct.getName();
+ String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+ lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+ try {- counter = findCounter(messageId);
- tooMany = counter != null && counter.get() > maxRedeliveryCount;
- } - if (tooMany || exceptionSeen)
- {
- try+ if (!exceptionSeen) {- return deadLetterQueue.process(event);+ counter = findCounter(messageId);
+ tooMany = counter != null && counter.get() > maxRedeliveryCount; }- catch (Exception ex)+
+ if (tooMany || exceptionSeen) {- logger.info("Exception thrown from failed message processing for message " + messageId, ex);+ try
+ {
+ return deadLetterQueue.process(event);
+ }
+ catch (Exception ex)
+ {
+ logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+ }
+ return null; }- return null;
- } - try
- {
- MuleEvent returnEvent = processNext(event);
- counter = findCounter(messageId);
- if (counter != null)+ try {- resetCounter(messageId);+ MuleEvent returnEvent = processNext(event);
+ counter = findCounter(messageId);
+ if (counter != null)
+ {
+ resetCounter(messageId);
+ }
+ return returnEvent; }- return returnEvent;+ catch (MuleException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ }
+ catch (RuntimeException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ } }- catch (MuleException ex)+ finally {- incrementCounter(messageId);
- throw ex;+ lock.unlock(messageId); }- catch (RuntimeException ex)
- {
- incrementCounter(messageId);
- throw ex;
- }+ } private void resetCounter(String messageId) throws ObjectStoreException
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 =>
24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+ /*
+ * Gets a lock over the resource identified with lockId
+ */
+ void lock(T lockId);
+
+ /*
+ * Releases lock over the resource identified with lockId
+ */
+ void unlock(T lockId);
+
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+ /**
+ * Creates a Lock for a given resource using the resource unique identifier.
+ */
+ Lock<T> createLock(String lockResourceName);
+
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+ private Map<T, LockEntry> locks;
+ private Object acquireLock = new Object();
+
+ public ServerLock()
+ {
+ this.locks = new HashMap<T,LockEntry>();
+ }
+
+ public void lock(T key)
+ {
+ LockEntry lock;
+ synchronized (acquireLock)
+ {
+ if (this.locks.containsKey(key))
+ {
+ lock = this.locks.get(key);
+ }
+ else
+ {
+ lock = new LockEntry();
+ this.locks.put(key,lock);
+ }
+ lock.incrementLockCount();
+ acquireLock.notifyAll();
+ }
+ lock.lock();
+ }
+
+ public void unlock(T key)
+ {
+ synchronized (acquireLock)
+ {
+ LockEntry lock = this.locks.get(key);
+ if (lock != null)
+ {
+ lock.decrementLockCount();
+ if (!lock.hasPendingLocks())
+ {
+ this.locks.remove(key);
+ }
+ lock.unlock();
+ }
+ acquireLock.notifyAll();
+ }
+ }
+
+ public static class LockEntry
+ {
+ private AtomicInteger lockCount = new AtomicInteger(0);
+ private ReentrantLock lock = new ReentrantLock(true);
+
+ public void lock()
+ {
+ lock.lock();
+ }
+
+ public void incrementLockCount()
+ {
+ lockCount.incrementAndGet();
+ }
+
+ public void decrementLockCount()
+ {
+ lockCount.decrementAndGet();
+ }
+
+ public void unlock()
+ {
+ lock.unlock();
+ }
+
+ public boolean hasPendingLocks()
+ {
+ return lockCount.get() > 0;
+ }
+ }
+
+ public void dispose()
+ {
+ locks.clear();
+ }
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+ public Lock<T> createLock(String lockResourceName)
+ {
+ return new ServerLock<T>();
+ }
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-21 03:52:07 UTC (rev 24785)
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when;-+import junit.framework.Assert; import org.junit.Before;+import org.junit.Rule;
+import org.junit.Test; import org.mockito.Answers;-import org.mockito.Mockito; import org.mockito.internal.verification.VerificationModeFactory;-import org.mule.api.*;+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage; import org.mule.api.config.MuleProperties; import org.mule.api.construct.FlowConstruct; import org.mule.api.processor.MessageProcessor; import org.mule.api.store.ObjectStore; import org.mule.api.store.ObjectStoreException; import org.mule.api.store.ObjectStoreManager;-import org.mule.routing.MessageProcessorFilterPair; import org.mule.tck.junit4.AbstractMuleTestCase;-
-import org.junit.Test;
-
-import junit.framework.Assert;+import org.mule.tck.junit4.rule.SystemProperty; import org.mule.util.SerializationUtils;+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory; import java.io.Serializable; import java.util.HashMap; import java.util.Map;+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase { public static final String STRING_MESSAGE = "message";- public static final int MAX_REDELIVERY_COUNT = 1;+ public static final int MAX_REDELIVERY_COUNT = 0; private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get()); private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get()); private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());- private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());+ private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get()); private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get()); private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());+ private Latch waitLatch = new Latch();
+ private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+ public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+ {
+ public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+ {
+ waitingMessageProcessorExecutionLatch.countDown();
+ waitLatch.await(2000, TimeUnit.MILLISECONDS);
+ return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+ }
+ });
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+ InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+ when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+ when(event.getMessage()).thenReturn(message);
+ irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+ irp.setUseSecureHash(true);
+ irp.setFlowConstruct(mock(FlowConstruct.class));
+ irp.setMuleContext(mockMuleContext);
+ irp.setListener(mockFailingMessageProcessor);
+ irp.setMessageProcessor(mockDlqMessageProcessor);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);+ when(message.getPayload()).thenReturn(new Object()); irp.initialise();-
-
- when(message.getPayload()).thenReturn(new Object());
-
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
+ irp.initialise();
+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ } - IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
- irp.setUseSecureHash(true);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
+ public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ {
+ when(message.getPayload()).thenReturn(STRING_MESSAGE); irp.initialise();+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ public void testThreadSafeObjectStoreUsage() throws Exception
+ { when(message.getPayload()).thenReturn(STRING_MESSAGE);- when(event.getMessage()).thenReturn(message);+ irp.setListener(mockWaitingMessageProcessor);
+ irp.initialise(); - for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)+ ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+ firstIrpExecutionThread.start();
+ ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+ threadCausingRedeliveryException.start();
+ waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+ waitLatch.release();
+ firstIrpExecutionThread.join();
+ threadCausingRedeliveryException.join();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
+
+ private void processUntilFailure()
+ {
- public void testMessageRedeliveryUsingSerializationStore() throws Exception+ public class ExecuteIrpThread extends Thread {- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
- irp.setDeadLetterQueue(mockDlqMessageProcessor);
- irp.initialise();
-
- when(message.getPayload()).thenReturn(STRING_MESSAGE);
- when(event.getMessage()).thenReturn(message);
-
- for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)+ public Exception exception;
+
branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+ private final String name;
+ private String value;
+ private boolean initialized;
+ private String oldValue;
+
+ public SystemProperty(String name)
+ {
+ this(name, null);
+ }
+
+ public SystemProperty(String name, String value)
+ {
+ this.name = name;
+ this.value = value;
+ }
+
+ protected void before() throws Throwable
+ {
+ if (initialized)
+ {
+ throw new IllegalArgumentException("System property was already initialized");
+ }
+
+ oldValue = System.setProperty(name, getValue());
+ initialized = true;
+ }
+
+ protected void after()
+ {
+ if (!initialized)
+ {
+ throw new IllegalArgumentException("System property was not initialized");
+ }
+
+ doCleanUp();
+ restoreOldValue();
+
+ initialized = false;
+ }
+
+ protected void restoreOldValue()
+ {
+ if (oldValue == null)
+ {
+ System.clearProperty(name);
+ }
+ else
+ {
+ System.setProperty(name, oldValue);
+ }
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ protected void doCleanUp()
+ {
+ // Nothing to do
+ };
+
+ public String getValue()
+ {
+ return value;
+ };
+}Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+ public static final int THREAD_COUNT = 100;
+ public static final int ITERATIONS_PER_THREAD = 100;
+ private Latch threadStartLatch = new Latch();
+ private String sharedKeyA = "A";
+ private String sharedKeyB = "B";
+ private ServerLock<String> serverLock = new ServerLock<String>();
+ private InMemoryObjectStore objectStore = new InMemoryObjectStore();
+
+ public void testHighConcurrency() throws Exception
+ {
+ List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+ for (int i = 0; i < THREAD_COUNT; i++)
+ {
+ IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ }
+ threadStartLatch.release();
+ for (Thread thread : threads)
+ {
+ thread.join();
+ }
+ assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ }
+
+ public class IncrementKeyValueThread extends Thread
+ {
+ private String key;
+
+ public IncrementKeyValueThread(String key)
+ {
+ super("Thread-" + key);
+ this.key = key;
+ }
+
+ public void run()
+ {
+ try
+ {
+ threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+ for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+ {
+ if (Thread.interrupted())
+ {
+ break;
+ }
+ serverLock.lock(key);
+ try
+ {
+ Integer value;
+ if (objectStore.contains(key))
+ {
+ value = objectStore.retrieve(key);
+ objectStore.remove(key);
+ }
+ else
+ {
+ value = 0;
+ }
+ objectStore.store(key,value + 1);
+ }
+ finally
+ {
+ serverLock.unlock(key);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class InMemoryObjectStore implements ObjectStore<Integer>
+ {
+ private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+ public boolean contains(Serializable key) throws ObjectStoreException
+ {
+ return store.containsKey(key);
+ }
+
+ public void store(Serializable key, Integer value) throws ObjectStoreException
+ {
+ if (store.containsKey(key))
+ {
+ throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+ }
+ store.put(key,value);
+ }
+
+ public Integer retrieve(Serializable key) throws ObjectStoreException
+ {
+ return store.get(key);
+ }
+
+ public Integer remove(Serializable key) throws ObjectStoreException
+ {
+ return store.remove(key);
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+ }
+
+}Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml
(24785 => 24786)
--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml 2012-08-21 03:52:07 UTC (rev 24785)
+ <!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that the transformers will be available even when using the TransientRegistry only -->
------------------------------
http://xircles.codehaus.org/manage_email
Pablo Kraan
2012-08-21 19:12:56 UTC
Permalink
Some comments:


Lock: I think the javadoc needs more information. What happens when there
is no object with the given key? what happens if the caller to the release
method does not have the lock on passed key?

LockFactory: is not clear to me what is the purpose of the lockResourceName

ServerLock: acquireLock field must be final

Remove redundant usages of "this" keyword

Fix imports order


Pablo K

On Tue, Aug 21, 2012 at 2:25 PM, Pablo La Greca <
Post by Pablo La Greca
LockFactory was added to org.mule.config.builders.DefaultsConfigurationBuilder
as suggested.
IdempotentMessageFilter relies on the fact that storing twice the same key
in an object store will fail. So it's not the same case.
Post by Daniel Feist
Does IdempotentMessageFilter need the same?
Dan
Revision 24786 <http://fisheye.codehaus.org/changelog/mule/?cs=24786>
Author pablo.lagreca Date 2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012) Log
Message
MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy
Modified Paths
-
branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
-
branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml
Added Paths
- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/
- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/
-
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
Diff
branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2012-08-21 03:52:07 UTC (rev 24785)
branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
+ String flowName = flowConstruct.getName();
+ String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+ lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+ try {- counter = findCounter(messageId);
- tooMany = counter != null && counter.get() > maxRedeliveryCount;
- } - if (tooMany || exceptionSeen)
- {
- try+ if (!exceptionSeen) {- return deadLetterQueue.process(event);+ counter = findCounter(messageId);
+ tooMany = counter != null && counter.get() > maxRedeliveryCount; }- catch (Exception ex)+
+ if (tooMany || exceptionSeen) {- logger.info("Exception thrown from failed message processing for message " + messageId, ex);+ try
+ {
+ return deadLetterQueue.process(event);
+ }
+ catch (Exception ex)
+ {
+ logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+ }
+ return null; }- return null;
- } - try
- {
- MuleEvent returnEvent = processNext(event);
- counter = findCounter(messageId);
- if (counter != null)+ try {- resetCounter(messageId);+ MuleEvent returnEvent = processNext(event);
+ counter = findCounter(messageId);
+ if (counter != null)
+ {
+ resetCounter(messageId);
+ }
+ return returnEvent; }- return returnEvent;+ catch (MuleException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ }
+ catch (RuntimeException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ } }- catch (MuleException ex)+ finally {- incrementCounter(messageId);
- throw ex;+ lock.unlock(messageId); }- catch (RuntimeException ex)
- {
- incrementCounter(messageId);
- throw ex;
- }+ } private void resetCounter(String messageId) throws ObjectStoreException
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 =>
24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+ /*
+ * Gets a lock over the resource identified with lockId
+ */
+ void lock(T lockId);
+
+ /*
+ * Releases lock over the resource identified with lockId
+ */
+ void unlock(T lockId);
+
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+ /**
+ * Creates a Lock for a given resource using the resource unique identifier.
+ */
+ Lock<T> createLock(String lockResourceName);
+
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+ private Map<T, LockEntry> locks;
+ private Object acquireLock = new Object();
+
+ public ServerLock()
+ {
+ this.locks = new HashMap<T,LockEntry>();
+ }
+
+ public void lock(T key)
+ {
+ LockEntry lock;
+ synchronized (acquireLock)
+ {
+ if (this.locks.containsKey(key))
+ {
+ lock = this.locks.get(key);
+ }
+ else
+ {
+ lock = new LockEntry();
+ this.locks.put(key,lock);
+ }
+ lock.incrementLockCount();
+ acquireLock.notifyAll();
+ }
+ lock.lock();
+ }
+
+ public void unlock(T key)
+ {
+ synchronized (acquireLock)
+ {
+ LockEntry lock = this.locks.get(key);
+ if (lock != null)
+ {
+ lock.decrementLockCount();
+ if (!lock.hasPendingLocks())
+ {
+ this.locks.remove(key);
+ }
+ lock.unlock();
+ }
+ acquireLock.notifyAll();
+ }
+ }
+
+ public static class LockEntry
+ {
+ private AtomicInteger lockCount = new AtomicInteger(0);
+ private ReentrantLock lock = new ReentrantLock(true);
+
+ public void lock()
+ {
+ lock.lock();
+ }
+
+ public void incrementLockCount()
+ {
+ lockCount.incrementAndGet();
+ }
+
+ public void decrementLockCount()
+ {
+ lockCount.decrementAndGet();
+ }
+
+ public void unlock()
+ {
+ lock.unlock();
+ }
+
+ public boolean hasPendingLocks()
+ {
+ return lockCount.get() > 0;
+ }
+ }
+
+ public void dispose()
+ {
+ locks.clear();
+ }
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+ public Lock<T> createLock(String lockResourceName)
+ {
+ return new ServerLock<T>();
+ }
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-21 03:52:07 UTC (rev 24785)
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when;-+import junit.framework.Assert; import org.junit.Before;+import org.junit.Rule;
+import org.junit.Test; import org.mockito.Answers;-import org.mockito.Mockito; import org.mockito.internal.verification.VerificationModeFactory;-import org.mule.api.*;+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage; import org.mule.api.config.MuleProperties; import org.mule.api.construct.FlowConstruct; import org.mule.api.processor.MessageProcessor; import org.mule.api.store.ObjectStore; import org.mule.api.store.ObjectStoreException; import org.mule.api.store.ObjectStoreManager;-import org.mule.routing.MessageProcessorFilterPair; import org.mule.tck.junit4.AbstractMuleTestCase;-
-import org.junit.Test;
-
-import junit.framework.Assert;+import org.mule.tck.junit4.rule.SystemProperty; import org.mule.util.SerializationUtils;+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory; import java.io.Serializable; import java.util.HashMap; import java.util.Map;+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase { public static final String STRING_MESSAGE = "message";- public static final int MAX_REDELIVERY_COUNT = 1;+ public static final int MAX_REDELIVERY_COUNT = 0; private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get()); private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get()); private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());- private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());+ private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get()); private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get()); private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());+ private Latch waitLatch = new Latch();
+ private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+ public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+ {
+ public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+ {
+ waitingMessageProcessorExecutionLatch.countDown();
+ waitLatch.await(2000, TimeUnit.MILLISECONDS);
+ return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+ }
+ });
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+ InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+ when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+ when(event.getMessage()).thenReturn(message);
+ irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+ irp.setUseSecureHash(true);
+ irp.setFlowConstruct(mock(FlowConstruct.class));
+ irp.setMuleContext(mockMuleContext);
+ irp.setListener(mockFailingMessageProcessor);
+ irp.setMessageProcessor(mockDlqMessageProcessor);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);+ when(message.getPayload()).thenReturn(new Object()); irp.initialise();-
-
- when(message.getPayload()).thenReturn(new Object());
-
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
+ irp.initialise();
+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ } - IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
- irp.setUseSecureHash(true);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
+ public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ {
+ when(message.getPayload()).thenReturn(STRING_MESSAGE); irp.initialise();+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ public void testThreadSafeObjectStoreUsage() throws Exception
+ { when(message.getPayload()).thenReturn(STRING_MESSAGE);- when(event.getMessage()).thenReturn(message);+ irp.setListener(mockWaitingMessageProcessor);
+ irp.initialise(); - for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)+ ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+ firstIrpExecutionThread.start();
+ ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+ threadCausingRedeliveryException.start();
+ waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+ waitLatch.release();
+ firstIrpExecutionThread.join();
+ threadCausingRedeliveryException.join();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
+
+ private void processUntilFailure()
+ {
- public void testMessageRedeliveryUsingSerializationStore() throws Exception+ public class ExecuteIrpThread extends Thread {- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
- irp.setDeadLetterQueue(mockDlqMessageProcessor);
- irp.initialise();
-
- when(message.getPayload()).thenReturn(STRING_MESSAGE);
- when(event.getMessage()).thenReturn(message);
-
- for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)+ public Exception exception;
+
branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+ private final String name;
+ private String value;
+ private boolean initialized;
+ private String oldValue;
+
+ public SystemProperty(String name)
+ {
+ this(name, null);
+ }
+
+ public SystemProperty(String name, String value)
+ {
+ this.name = name;
+ this.value = value;
+ }
+
+ protected void before() throws Throwable
+ {
+ if (initialized)
+ {
+ throw new IllegalArgumentException("System property was already initialized");
+ }
+
+ oldValue = System.setProperty(name, getValue());
+ initialized = true;
+ }
+
+ protected void after()
+ {
+ if (!initialized)
+ {
+ throw new IllegalArgumentException("System property was not initialized");
+ }
+
+ doCleanUp();
+ restoreOldValue();
+
+ initialized = false;
+ }
+
+ protected void restoreOldValue()
+ {
+ if (oldValue == null)
+ {
+ System.clearProperty(name);
+ }
+ else
+ {
+ System.setProperty(name, oldValue);
+ }
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ protected void doCleanUp()
+ {
+ // Nothing to do
+ };
+
+ public String getValue()
+ {
+ return value;
+ };
+}Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+ public static final int THREAD_COUNT = 100;
+ public static final int ITERATIONS_PER_THREAD = 100;
+ private Latch threadStartLatch = new Latch();
+ private String sharedKeyA = "A";
+ private String sharedKeyB = "B";
+ private ServerLock<String> serverLock = new ServerLock<String>();
+ private InMemoryObjectStore objectStore = new InMemoryObjectStore();
+
+ public void testHighConcurrency() throws Exception
+ {
+ List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+ for (int i = 0; i < THREAD_COUNT; i++)
+ {
+ IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ }
+ threadStartLatch.release();
+ for (Thread thread : threads)
+ {
+ thread.join();
+ }
+ assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ }
+
+ public class IncrementKeyValueThread extends Thread
+ {
+ private String key;
+
+ public IncrementKeyValueThread(String key)
+ {
+ super("Thread-" + key);
+ this.key = key;
+ }
+
+ public void run()
+ {
+ try
+ {
+ threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+ for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+ {
+ if (Thread.interrupted())
+ {
+ break;
+ }
+ serverLock.lock(key);
+ try
+ {
+ Integer value;
+ if (objectStore.contains(key))
+ {
+ value = objectStore.retrieve(key);
+ objectStore.remove(key);
+ }
+ else
+ {
+ value = 0;
+ }
+ objectStore.store(key,value + 1);
+ }
+ finally
+ {
+ serverLock.unlock(key);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class InMemoryObjectStore implements ObjectStore<Integer>
+ {
+ private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+ public boolean contains(Serializable key) throws ObjectStoreException
+ {
+ return store.containsKey(key);
+ }
+
+ public void store(Serializable key, Integer value) throws ObjectStoreException
+ {
+ if (store.containsKey(key))
+ {
+ throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+ }
+ store.put(key,value);
+ }
+
+ public Integer retrieve(Serializable key) throws ObjectStoreException
+ {
+ return store.get(key);
+ }
+
+ public Integer remove(Serializable key) throws ObjectStoreException
+ {
+ return store.remove(key);
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+ }
+
+}Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml
(24785 => 24786)
--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml 2012-08-21 03:52:07 UTC (rev 24785)
+ <!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that the transformers will be available even when using the TransientRegistry only -->
------------------------------
http://xircles.codehaus.org/manage_email
Pablo Kraan
2012-08-21 19:19:56 UTC
Permalink
Post by Pablo Kraan
- What happens if the lock holder dies? Is the lock eternal or does it
time out?
Post by Pablo Kraan
Lock: I think the javadoc needs more information. What happens when there
is no object with the given key? what happens if the caller to the release
method does not have the lock on passed key?
LockFactory: is not clear to me what is the purpose of the lockResourceName
ServerLock: acquireLock field must be final
Remove redundant usages of "this" keyword
Fix imports order
Pablo K
On Tue, Aug 21, 2012 at 2:25 PM, Pablo La Greca <
LockFactory was added to org.mule.config.builders.DefaultsConfigurationBuilder
as suggested.
IdempotentMessageFilter relies on the fact that storing twice the same
key in an object store will fail. So it's not the same case.
Post by Daniel Feist
Does IdempotentMessageFilter need the same?
Dan
Revision 24786 <http://fisheye.codehaus.org/changelog/mule/?cs=24786>
Author pablo.lagreca Date 2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012) Log
Message
MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy
Modified Paths
-
branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
-
branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml
Added Paths
- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/
- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/
-
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
Diff
branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2012-08-21 03:52:07 UTC (rev 24785)
branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
+ String flowName = flowConstruct.getName();
+ String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+ lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+ try {- counter = findCounter(messageId);
- tooMany = counter != null && counter.get() > maxRedeliveryCount;
- } - if (tooMany || exceptionSeen)
- {
- try+ if (!exceptionSeen) {- return deadLetterQueue.process(event);+ counter = findCounter(messageId);
+ tooMany = counter != null && counter.get() > maxRedeliveryCount; }- catch (Exception ex)+
+ if (tooMany || exceptionSeen) {- logger.info("Exception thrown from failed message processing for message " + messageId, ex);+ try
+ {
+ return deadLetterQueue.process(event);
+ }
+ catch (Exception ex)
+ {
+ logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+ }
+ return null; }- return null;
- } - try
- {
- MuleEvent returnEvent = processNext(event);
- counter = findCounter(messageId);
- if (counter != null)+ try {- resetCounter(messageId);+ MuleEvent returnEvent = processNext(event);
+ counter = findCounter(messageId);
+ if (counter != null)
+ {
+ resetCounter(messageId);
+ }
+ return returnEvent; }- return returnEvent;+ catch (MuleException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ }
+ catch (RuntimeException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ } }- catch (MuleException ex)+ finally {- incrementCounter(messageId);
- throw ex;+ lock.unlock(messageId); }- catch (RuntimeException ex)
- {
- incrementCounter(messageId);
- throw ex;
- }+ } private void resetCounter(String messageId) throws ObjectStoreException
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 =>
24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+ /*
+ * Gets a lock over the resource identified with lockId
+ */
+ void lock(T lockId);
+
+ /*
+ * Releases lock over the resource identified with lockId
+ */
+ void unlock(T lockId);
+
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+ /**
+ * Creates a Lock for a given resource using the resource unique identifier.
+ */
+ Lock<T> createLock(String lockResourceName);
+
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+ private Map<T, LockEntry> locks;
+ private Object acquireLock = new Object();
+
+ public ServerLock()
+ {
+ this.locks = new HashMap<T,LockEntry>();
+ }
+
+ public void lock(T key)
+ {
+ LockEntry lock;
+ synchronized (acquireLock)
+ {
+ if (this.locks.containsKey(key))
+ {
+ lock = this.locks.get(key);
+ }
+ else
+ {
+ lock = new LockEntry();
+ this.locks.put(key,lock);
+ }
+ lock.incrementLockCount();
+ acquireLock.notifyAll();
+ }
+ lock.lock();
+ }
+
+ public void unlock(T key)
+ {
+ synchronized (acquireLock)
+ {
+ LockEntry lock = this.locks.get(key);
+ if (lock != null)
+ {
+ lock.decrementLockCount();
+ if (!lock.hasPendingLocks())
+ {
+ this.locks.remove(key);
+ }
+ lock.unlock();
+ }
+ acquireLock.notifyAll();
+ }
+ }
+
+ public static class LockEntry
+ {
+ private AtomicInteger lockCount = new AtomicInteger(0);
+ private ReentrantLock lock = new ReentrantLock(true);
+
+ public void lock()
+ {
+ lock.lock();
+ }
+
+ public void incrementLockCount()
+ {
+ lockCount.incrementAndGet();
+ }
+
+ public void decrementLockCount()
+ {
+ lockCount.decrementAndGet();
+ }
+
+ public void unlock()
+ {
+ lock.unlock();
+ }
+
+ public boolean hasPendingLocks()
+ {
+ return lockCount.get() > 0;
+ }
+ }
+
+ public void dispose()
+ {
+ locks.clear();
+ }
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+ public Lock<T> createLock(String lockResourceName)
+ {
+ return new ServerLock<T>();
+ }
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-21 03:52:07 UTC (rev 24785)
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when;-+import junit.framework.Assert; import org.junit.Before;+import org.junit.Rule;
+import org.junit.Test; import org.mockito.Answers;-import org.mockito.Mockito; import org.mockito.internal.verification.VerificationModeFactory;-import org.mule.api.*;+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage; import org.mule.api.config.MuleProperties; import org.mule.api.construct.FlowConstruct; import org.mule.api.processor.MessageProcessor; import org.mule.api.store.ObjectStore; import org.mule.api.store.ObjectStoreException; import org.mule.api.store.ObjectStoreManager;-import org.mule.routing.MessageProcessorFilterPair; import org.mule.tck.junit4.AbstractMuleTestCase;-
-import org.junit.Test;
-
-import junit.framework.Assert;+import org.mule.tck.junit4.rule.SystemProperty; import org.mule.util.SerializationUtils;+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory; import java.io.Serializable; import java.util.HashMap; import java.util.Map;+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase { public static final String STRING_MESSAGE = "message";- public static final int MAX_REDELIVERY_COUNT = 1;+ public static final int MAX_REDELIVERY_COUNT = 0; private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get()); private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get()); private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());- private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());+ private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get()); private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get()); private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());+ private Latch waitLatch = new Latch();
+ private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+ public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+ {
+ public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+ {
+ waitingMessageProcessorExecutionLatch.countDown();
+ waitLatch.await(2000, TimeUnit.MILLISECONDS);
+ return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+ }
+ });
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+ InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+ when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+ when(event.getMessage()).thenReturn(message);
+ irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+ irp.setUseSecureHash(true);
+ irp.setFlowConstruct(mock(FlowConstruct.class));
+ irp.setMuleContext(mockMuleContext);
+ irp.setListener(mockFailingMessageProcessor);
+ irp.setMessageProcessor(mockDlqMessageProcessor);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);+ when(message.getPayload()).thenReturn(new Object()); irp.initialise();-
-
- when(message.getPayload()).thenReturn(new Object());
-
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
+ irp.initialise();
+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ } - IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
- irp.setUseSecureHash(true);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
+ public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ {
+ when(message.getPayload()).thenReturn(STRING_MESSAGE); irp.initialise();+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ public void testThreadSafeObjectStoreUsage() throws Exception
+ { when(message.getPayload()).thenReturn(STRING_MESSAGE);- when(event.getMessage()).thenReturn(message);+ irp.setListener(mockWaitingMessageProcessor);
+ irp.initialise(); - for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)+ ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+ firstIrpExecutionThread.start();
+ ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+ threadCausingRedeliveryException.start();
+ waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+ waitLatch.release();
+ firstIrpExecutionThread.join();
+ threadCausingRedeliveryException.join();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
+
+ private void processUntilFailure()
+ {
- public void testMessageRedeliveryUsingSerializationStore() throws Exception+ public class ExecuteIrpThread extends Thread {- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
- irp.setDeadLetterQueue(mockDlqMessageProcessor);
- irp.initialise();
-
- when(message.getPayload()).thenReturn(STRING_MESSAGE);
- when(event.getMessage()).thenReturn(message);
-
- for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)+ public Exception exception;
+
branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+ private final String name;
+ private String value;
+ private boolean initialized;
+ private String oldValue;
+
+ public SystemProperty(String name)
+ {
+ this(name, null);
+ }
+
+ public SystemProperty(String name, String value)
+ {
+ this.name = name;
+ this.value = value;
+ }
+
+ protected void before() throws Throwable
+ {
+ if (initialized)
+ {
+ throw new IllegalArgumentException("System property was already initialized");
+ }
+
+ oldValue = System.setProperty(name, getValue());
+ initialized = true;
+ }
+
+ protected void after()
+ {
+ if (!initialized)
+ {
+ throw new IllegalArgumentException("System property was not initialized");
+ }
+
+ doCleanUp();
+ restoreOldValue();
+
+ initialized = false;
+ }
+
+ protected void restoreOldValue()
+ {
+ if (oldValue == null)
+ {
+ System.clearProperty(name);
+ }
+ else
+ {
+ System.setProperty(name, oldValue);
+ }
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ protected void doCleanUp()
+ {
+ // Nothing to do
+ };
+
+ public String getValue()
+ {
+ return value;
+ };
+}Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+ public static final int THREAD_COUNT = 100;
+ public static final int ITERATIONS_PER_THREAD = 100;
+ private Latch threadStartLatch = new Latch();
+ private String sharedKeyA = "A";
+ private String sharedKeyB = "B";
+ private ServerLock<String> serverLock = new ServerLock<String>();
+ private InMemoryObjectStore objectStore = new InMemoryObjectStore();
+
+ public void testHighConcurrency() throws Exception
+ {
+ List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+ for (int i = 0; i < THREAD_COUNT; i++)
+ {
+ IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ }
+ threadStartLatch.release();
+ for (Thread thread : threads)
+ {
+ thread.join();
+ }
+ assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ }
+
+ public class IncrementKeyValueThread extends Thread
+ {
+ private String key;
+
+ public IncrementKeyValueThread(String key)
+ {
+ super("Thread-" + key);
+ this.key = key;
+ }
+
+ public void run()
+ {
+ try
+ {
+ threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+ for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+ {
+ if (Thread.interrupted())
+ {
+ break;
+ }
+ serverLock.lock(key);
+ try
+ {
+ Integer value;
+ if (objectStore.contains(key))
+ {
+ value = objectStore.retrieve(key);
+ objectStore.remove(key);
+ }
+ else
+ {
+ value = 0;
+ }
+ objectStore.store(key,value + 1);
+ }
+ finally
+ {
+ serverLock.unlock(key);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class InMemoryObjectStore implements ObjectStore<Integer>
+ {
+ private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+ public boolean contains(Serializable key) throws ObjectStoreException
+ {
+ return store.containsKey(key);
+ }
+
+ public void store(Serializable key, Integer value) throws ObjectStoreException
+ {
+ if (store.containsKey(key))
+ {
+ throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+ }
+ store.put(key,value);
+ }
+
+ public Integer retrieve(Serializable key) throws ObjectStoreException
+ {
+ return store.get(key);
+ }
+
+ public Integer remove(Serializable key) throws ObjectStoreException
+ {
+ return store.remove(key);
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+ }
+
+}Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml
(24785 => 24786)
--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml 2012-08-21 03:52:07 UTC (rev 24785)
+ <!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that the transformers will be available even when using the TransientRegistry only -->
------------------------------
http://xircles.codehaus.org/manage_email
Pablo La Greca
2012-08-21 20:08:39 UTC
Permalink
"What happens if the lock holder dies? Is the lock eternal or does it time
out?" -> If it's in a cluster then it will be release, if it's in a single
mule instance then lock client must ensure proper usage through the use of
finally block.

"Lock: I think the javadoc needs more information. What happens when there
is no object with the given key? what happens if the caller to the release
method does not have the lock on passed key?" -> Will improve javadoc if
required but there's no information around non existent key since it's not
an object store. It's just a locking mechanism were a lock can be
identified using any object.

"LockFactory: is not clear to me what is the purpose of the
lockResourceName" -> Will improve javadoc

"ServerLock: acquireLock field must be final" -> Will do

"Remove redundant usages of "this" keyword" -> Don't think it's redundant.
It's a matter of preference of usage.

"Fix imports order" -> Will do.
Post by Pablo Kraan
Post by Pablo Kraan
- What happens if the lock holder dies? Is the lock eternal or does it
time out?
Post by Pablo Kraan
Lock: I think the javadoc needs more information. What happens when there
is no object with the given key? what happens if the caller to the release
method does not have the lock on passed key?
LockFactory: is not clear to me what is the purpose of the
lockResourceName
ServerLock: acquireLock field must be final
Remove redundant usages of "this" keyword
Fix imports order
Pablo K
On Tue, Aug 21, 2012 at 2:25 PM, Pablo La Greca <
LockFactory was added to org.mule.config.builders.DefaultsConfigurationBuilder
as suggested.
IdempotentMessageFilter relies on the fact that storing twice the same
key in an object store will fail. So it's not the same case.
Post by Daniel Feist
Does IdempotentMessageFilter need the same?
Dan
Revision 24786 <http://fisheye.codehaus.org/changelog/mule/?cs=24786>
Author pablo.lagreca Date 2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012) Log
Message
MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy
Modified Paths
-
branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
-
branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml
Added Paths
- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/
-
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
Diff
branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2012-08-21 03:52:07 UTC (rev 24785)
branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
+ String flowName = flowConstruct.getName();
+ String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+ lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+ try {- counter = findCounter(messageId);
- tooMany = counter != null && counter.get() > maxRedeliveryCount;
- } - if (tooMany || exceptionSeen)
- {
- try+ if (!exceptionSeen) {- return deadLetterQueue.process(event);+ counter = findCounter(messageId);
+ tooMany = counter != null && counter.get() > maxRedeliveryCount; }- catch (Exception ex)+
+ if (tooMany || exceptionSeen) {- logger.info("Exception thrown from failed message processing for message " + messageId, ex);+ try
+ {
+ return deadLetterQueue.process(event);
+ }
+ catch (Exception ex)
+ {
+ logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+ }
+ return null; }- return null;
- } - try
- {
- MuleEvent returnEvent = processNext(event);
- counter = findCounter(messageId);
- if (counter != null)+ try {- resetCounter(messageId);+ MuleEvent returnEvent = processNext(event);
+ counter = findCounter(messageId);
+ if (counter != null)
+ {
+ resetCounter(messageId);
+ }
+ return returnEvent; }- return returnEvent;+ catch (MuleException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ }
+ catch (RuntimeException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ } }- catch (MuleException ex)+ finally {- incrementCounter(messageId);
- throw ex;+ lock.unlock(messageId); }- catch (RuntimeException ex)
- {
- incrementCounter(messageId);
- throw ex;
- }+ } private void resetCounter(String messageId) throws ObjectStoreException
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 =>
24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+ /*
+ * Gets a lock over the resource identified with lockId
+ */
+ void lock(T lockId);
+
+ /*
+ * Releases lock over the resource identified with lockId
+ */
+ void unlock(T lockId);
+
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+ /**
+ * Creates a Lock for a given resource using the resource unique identifier.
+ */
+ Lock<T> createLock(String lockResourceName);
+
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+ private Map<T, LockEntry> locks;
+ private Object acquireLock = new Object();
+
+ public ServerLock()
+ {
+ this.locks = new HashMap<T,LockEntry>();
+ }
+
+ public void lock(T key)
+ {
+ LockEntry lock;
+ synchronized (acquireLock)
+ {
+ if (this.locks.containsKey(key))
+ {
+ lock = this.locks.get(key);
+ }
+ else
+ {
+ lock = new LockEntry();
+ this.locks.put(key,lock);
+ }
+ lock.incrementLockCount();
+ acquireLock.notifyAll();
+ }
+ lock.lock();
+ }
+
+ public void unlock(T key)
+ {
+ synchronized (acquireLock)
+ {
+ LockEntry lock = this.locks.get(key);
+ if (lock != null)
+ {
+ lock.decrementLockCount();
+ if (!lock.hasPendingLocks())
+ {
+ this.locks.remove(key);
+ }
+ lock.unlock();
+ }
+ acquireLock.notifyAll();
+ }
+ }
+
+ public static class LockEntry
+ {
+ private AtomicInteger lockCount = new AtomicInteger(0);
+ private ReentrantLock lock = new ReentrantLock(true);
+
+ public void lock()
+ {
+ lock.lock();
+ }
+
+ public void incrementLockCount()
+ {
+ lockCount.incrementAndGet();
+ }
+
+ public void decrementLockCount()
+ {
+ lockCount.decrementAndGet();
+ }
+
+ public void unlock()
+ {
+ lock.unlock();
+ }
+
+ public boolean hasPendingLocks()
+ {
+ return lockCount.get() > 0;
+ }
+ }
+
+ public void dispose()
+ {
+ locks.clear();
+ }
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+ public Lock<T> createLock(String lockResourceName)
+ {
+ return new ServerLock<T>();
+ }
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
(24785 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-21 03:52:07 UTC (rev 24785)
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when;-+import junit.framework.Assert; import org.junit.Before;+import org.junit.Rule;
+import org.junit.Test; import org.mockito.Answers;-import org.mockito.Mockito; import org.mockito.internal.verification.VerificationModeFactory;-import org.mule.api.*;+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage; import org.mule.api.config.MuleProperties; import org.mule.api.construct.FlowConstruct; import org.mule.api.processor.MessageProcessor; import org.mule.api.store.ObjectStore; import org.mule.api.store.ObjectStoreException; import org.mule.api.store.ObjectStoreManager;-import org.mule.routing.MessageProcessorFilterPair; import org.mule.tck.junit4.AbstractMuleTestCase;-
-import org.junit.Test;
-
-import junit.framework.Assert;+import org.mule.tck.junit4.rule.SystemProperty; import org.mule.util.SerializationUtils;+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory; import java.io.Serializable; import java.util.HashMap; import java.util.Map;+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase { public static final String STRING_MESSAGE = "message";- public static final int MAX_REDELIVERY_COUNT = 1;+ public static final int MAX_REDELIVERY_COUNT = 0; private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get()); private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get()); private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());- private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());+ private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get()); private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get()); private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());+ private Latch waitLatch = new Latch();
+ private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+ public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+ {
+ public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+ {
+ waitingMessageProcessorExecutionLatch.countDown();
+ waitLatch.await(2000, TimeUnit.MILLISECONDS);
+ return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+ }
+ });
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+ InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+ when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+ when(event.getMessage()).thenReturn(message);
+ irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+ irp.setUseSecureHash(true);
+ irp.setFlowConstruct(mock(FlowConstruct.class));
+ irp.setMuleContext(mockMuleContext);
+ irp.setListener(mockFailingMessageProcessor);
+ irp.setMessageProcessor(mockDlqMessageProcessor);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);+ when(message.getPayload()).thenReturn(new Object()); irp.initialise();-
-
- when(message.getPayload()).thenReturn(new Object());
-
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
+ irp.initialise();
+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ } - IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
- irp.setUseSecureHash(true);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
+ public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ {
+ when(message.getPayload()).thenReturn(STRING_MESSAGE); irp.initialise();+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ public void testThreadSafeObjectStoreUsage() throws Exception
+ { when(message.getPayload()).thenReturn(STRING_MESSAGE);- when(event.getMessage()).thenReturn(message);+ irp.setListener(mockWaitingMessageProcessor);
+ irp.initialise(); - for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)+ ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+ firstIrpExecutionThread.start();
+ ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+ threadCausingRedeliveryException.start();
+ waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+ waitLatch.release();
+ firstIrpExecutionThread.join();
+ threadCausingRedeliveryException.join();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
+
+ private void processUntilFailure()
+ {
- public void testMessageRedeliveryUsingSerializationStore() throws Exception+ public class ExecuteIrpThread extends Thread {- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
- irp.setDeadLetterQueue(mockDlqMessageProcessor);
- irp.initialise();
-
- when(message.getPayload()).thenReturn(STRING_MESSAGE);
- when(event.getMessage()).thenReturn(message);
-
- for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)+ public Exception exception;
+
branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+ private final String name;
+ private String value;
+ private boolean initialized;
+ private String oldValue;
+
+ public SystemProperty(String name)
+ {
+ this(name, null);
+ }
+
+ public SystemProperty(String name, String value)
+ {
+ this.name = name;
+ this.value = value;
+ }
+
+ protected void before() throws Throwable
+ {
+ if (initialized)
+ {
+ throw new IllegalArgumentException("System property was already initialized");
+ }
+
+ oldValue = System.setProperty(name, getValue());
+ initialized = true;
+ }
+
+ protected void after()
+ {
+ if (!initialized)
+ {
+ throw new IllegalArgumentException("System property was not initialized");
+ }
+
+ doCleanUp();
+ restoreOldValue();
+
+ initialized = false;
+ }
+
+ protected void restoreOldValue()
+ {
+ if (oldValue == null)
+ {
+ System.clearProperty(name);
+ }
+ else
+ {
+ System.setProperty(name, oldValue);
+ }
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ protected void doCleanUp()
+ {
+ // Nothing to do
+ };
+
+ public String getValue()
+ {
+ return value;
+ };
+}Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
(0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+ public static final int THREAD_COUNT = 100;
+ public static final int ITERATIONS_PER_THREAD = 100;
+ private Latch threadStartLatch = new Latch();
+ private String sharedKeyA = "A";
+ private String sharedKeyB = "B";
+ private ServerLock<String> serverLock = new ServerLock<String>();
+ private InMemoryObjectStore objectStore = new InMemoryObjectStore();
+
+ public void testHighConcurrency() throws Exception
+ {
+ List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+ for (int i = 0; i < THREAD_COUNT; i++)
+ {
+ IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ }
+ threadStartLatch.release();
+ for (Thread thread : threads)
+ {
+ thread.join();
+ }
+ assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ }
+
+ public class IncrementKeyValueThread extends Thread
+ {
+ private String key;
+
+ public IncrementKeyValueThread(String key)
+ {
+ super("Thread-" + key);
+ this.key = key;
+ }
+
+ public void run()
+ {
+ try
+ {
+ threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+ for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+ {
+ if (Thread.interrupted())
+ {
+ break;
+ }
+ serverLock.lock(key);
+ try
+ {
+ Integer value;
+ if (objectStore.contains(key))
+ {
+ value = objectStore.retrieve(key);
+ objectStore.remove(key);
+ }
+ else
+ {
+ value = 0;
+ }
+ objectStore.store(key,value + 1);
+ }
+ finally
+ {
+ serverLock.unlock(key);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class InMemoryObjectStore implements ObjectStore<Integer>
+ {
+ private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+ public boolean contains(Serializable key) throws ObjectStoreException
+ {
+ return store.containsKey(key);
+ }
+
+ public void store(Serializable key, Integer value) throws ObjectStoreException
+ {
+ if (store.containsKey(key))
+ {
+ throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+ }
+ store.put(key,value);
+ }
+
+ public Integer retrieve(Serializable key) throws ObjectStoreException
+ {
+ return store.get(key);
+ }
+
+ public Integer remove(Serializable key) throws ObjectStoreException
+ {
+ return store.remove(key);
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+ }
+
+}Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml
(24785 => 24786)
--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml 2012-08-21 03:52:07 UTC (rev 24785)
+ <!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that the transformers will be available even when using the TransientRegistry only -->
------------------------------
http://xircles.codehaus.org/manage_email
Loading...