Daniel Feist
2012-08-21 12:20:07 UTC
You currently need to add everything you add to default-mule-config.xml to org.mule.config.builders.DefaultsConfigurationBuilder also.
Dan
Dan
Revision
24786
Author
pablo.lagreca
Date
2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012)
Log Message
MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy
Modified Paths
branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml
Added Paths
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
Diff
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -148,6 +148,7 @@
public static final String OBJECT_DEFAULT_RETRY_POLICY_TEMPLATE = "_defaultRetryPolicyTemplate";
public static final String OBJECT_MULE_CONFIGURATION = "_muleConfiguration";
public static final String OBJECT_MULE_NAMESPACE_MANAGER = "_muleNamespaceManager";
+ public static final String OBJECT_LOCK_FACTORY = "_muleLockFactory";
// Not currently used as these need to be instance variables of the MuleContext.
public static final String OBJECT_WORK_MANAGER = "_muleWorkManager";
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -49,7 +49,7 @@
@Override
public void initialise() throws InitialisationException
{
- if (maxRedeliveryCount < 1)
+ if (maxRedeliveryCount < 0)
{
throw new InitialisationException(
CoreMessages.initialisationFailure(
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -25,10 +25,13 @@
import org.mule.transformer.simple.ObjectToByteArray;
import java.io.InputStream;
+import java.io.Serializable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.mule.util.lock.Lock;
+import org.mule.util.lock.LockFactory;
import org.mule.util.store.ObjectStorePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@
private String messageDigestAlgorithm;
private String idExpression;
private ObjectStore<AtomicInteger> store;
+ private Lock<Serializable> lock;
@Override
public void initialise() throws InitialisationException
@@ -95,6 +99,11 @@
}
}
+ String appName = muleContext.getConfiguration().getId();
+ String flowName = flowConstruct.getName();
+ String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+ lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+
store = createStore();
}
@@ -165,45 +174,55 @@
exceptionSeen = true;
}
- if (!exceptionSeen)
+ lock.lock(messageId);
+ try
{
- counter = findCounter(messageId);
- tooMany = counter != null && counter.get() > maxRedeliveryCount;
- }
- if (tooMany || exceptionSeen)
- {
- try
+ if (!exceptionSeen)
{
- return deadLetterQueue.process(event);
+ counter = findCounter(messageId);
+ tooMany = counter != null && counter.get() > maxRedeliveryCount;
}
- catch (Exception ex)
+
+ if (tooMany || exceptionSeen)
{
- logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+ try
+ {
+ return deadLetterQueue.process(event);
+ }
+ catch (Exception ex)
+ {
+ logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+ }
+ return null;
}
- return null;
- }
- try
- {
- MuleEvent returnEvent = processNext(event);
- counter = findCounter(messageId);
- if (counter != null)
+ try
{
- resetCounter(messageId);
+ MuleEvent returnEvent = processNext(event);
+ counter = findCounter(messageId);
+ if (counter != null)
+ {
+ resetCounter(messageId);
+ }
+ return returnEvent;
}
- return returnEvent;
+ catch (MuleException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ }
+ catch (RuntimeException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ }
}
- catch (MuleException ex)
+ finally
{
- incrementCounter(messageId);
- throw ex;
+ lock.unlock(messageId);
}
- catch (RuntimeException ex)
- {
- incrementCounter(messageId);
- throw ex;
- }
+
}
private void resetCounter(String messageId) throws ObjectStoreException
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,30 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+ /*
+ * Gets a lock over the resource identified with lockId
+ */
+ void lock(T lockId);
+
+ /*
+ * Releases lock over the resource identified with lockId
+ */
+ void unlock(T lockId);
+
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,25 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+ /**
+ * Creates a Lock for a given resource using the resource unique identifier.
+ */
+ Lock<T> createLock(String lockResourceName);
+
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,104 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+ private Map<T, LockEntry> locks;
+ private Object acquireLock = new Object();
+
+ public ServerLock()
+ {
+ this.locks = new HashMap<T,LockEntry>();
+ }
+
+ public void lock(T key)
+ {
+ LockEntry lock;
+ synchronized (acquireLock)
+ {
+ if (this.locks.containsKey(key))
+ {
+ lock = this.locks.get(key);
+ }
+ else
+ {
+ lock = new LockEntry();
+ this.locks.put(key,lock);
+ }
+ lock.incrementLockCount();
+ acquireLock.notifyAll();
+ }
+ lock.lock();
+ }
+
+ public void unlock(T key)
+ {
+ synchronized (acquireLock)
+ {
+ LockEntry lock = this.locks.get(key);
+ if (lock != null)
+ {
+ lock.decrementLockCount();
+ if (!lock.hasPendingLocks())
+ {
+ this.locks.remove(key);
+ }
+ lock.unlock();
+ }
+ acquireLock.notifyAll();
+ }
+ }
+
+ public static class LockEntry
+ {
+ private AtomicInteger lockCount = new AtomicInteger(0);
+ private ReentrantLock lock = new ReentrantLock(true);
+
+ public void lock()
+ {
+ lock.lock();
+ }
+
+ public void incrementLockCount()
+ {
+ lockCount.incrementAndGet();
+ }
+
+ public void decrementLockCount()
+ {
+ lockCount.decrementAndGet();
+ }
+
+ public void unlock()
+ {
+ lock.unlock();
+ }
+
+ public boolean hasPendingLocks()
+ {
+ return lockCount.get() > 0;
+ }
+ }
+
+ public void dispose()
+ {
+ locks.clear();
+ }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,19 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+ public Lock<T> createLock(String lockResourceName)
+ {
+ return new ServerLock<T>();
+ }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -10,71 +10,97 @@
package org.mule.processor;
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
import org.mockito.Answers;
-import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
import org.mule.api.config.MuleProperties;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.store.ObjectStore;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
{
public static final String STRING_MESSAGE = "message";
- public static final int MAX_REDELIVERY_COUNT = 1;
+ public static final int MAX_REDELIVERY_COUNT = 0;
private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
- private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+ private Latch waitLatch = new Latch();
+ private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+ private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
+ public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
@Before
public void setUpTest() throws MuleException
{
when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
- System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+ when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+ {
+ public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+ {
+ waitingMessageProcessorExecutionLatch.countDown();
+ waitLatch.await(2000, TimeUnit.MILLISECONDS);
+ return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+ }
+ });
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+ InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+ when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+ when(event.getMessage()).thenReturn(message);
+ irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+ irp.setUseSecureHash(true);
+ irp.setFlowConstruct(mock(FlowConstruct.class));
+ irp.setMuleContext(mockMuleContext);
+ irp.setListener(mockFailingMessageProcessor);
+ irp.setMessageProcessor(mockDlqMessageProcessor);
+
}
@Test
public void messageDigestFailure() throws Exception
{
- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
+ when(message.getPayload()).thenReturn(new Object());
irp.initialise();
-
-
- when(message.getPayload()).thenReturn(new Object());
-
- when(event.getMessage()).thenReturn(message);
MuleEvent process = irp.process(event);
Assert.assertNull(process);
}
@@ -82,22 +108,42 @@
@Test
public void testMessageRedeliveryUsingMemory() throws Exception
{
- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
+ irp.initialise();
+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
- irp.setUseSecureHash(true);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
- irp.setDeadLetterQueue(mockDlqMessageProcessor);
+ public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ {
+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
irp.initialise();
+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
+ public void testThreadSafeObjectStoreUsage() throws Exception
+ {
when(message.getPayload()).thenReturn(STRING_MESSAGE);
- when(event.getMessage()).thenReturn(message);
+ irp.setListener(mockWaitingMessageProcessor);
+ irp.initialise();
- for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+ ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+ firstIrpExecutionThread.start();
+ ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+ threadCausingRedeliveryException.start();
+ waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+ waitLatch.release();
+ firstIrpExecutionThread.join();
+ threadCausingRedeliveryException.join();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
+
+ private void processUntilFailure()
+ {
+ for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
{
try
{
@@ -107,28 +153,14 @@
{
}
}
- verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
}
- public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ public class ExecuteIrpThread extends Thread
{
- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
- irp.setDeadLetterQueue(mockDlqMessageProcessor);
- irp.initialise();
-
- when(message.getPayload()).thenReturn(STRING_MESSAGE);
- when(event.getMessage()).thenReturn(message);
-
- for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+ public Exception exception;
+
+ public void run()
{
try
{
@@ -136,15 +168,18 @@
}
catch (Exception e)
{
+ exception = e;
}
}
- verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
}
+
+
public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
{
private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+ private ServerLock lockableObjectStore = new ServerLock();
@Override
public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +212,13 @@
{
return false;
}
+
}
public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
{
private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+ private ServerLock lockableObjectStore = new ServerLock();
@Override
public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +249,7 @@
{
return false;
}
+
}
}
Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+ private final String name;
+ private String value;
+ private boolean initialized;
+ private String oldValue;
+
+ public SystemProperty(String name)
+ {
+ this(name, null);
+ }
+
+ public SystemProperty(String name, String value)
+ {
+ this.name = name;
+ this.value = value;
+ }
+
+ protected void before() throws Throwable
+ {
+ if (initialized)
+ {
+ throw new IllegalArgumentException("System property was already initialized");
+ }
+
+ oldValue = System.setProperty(name, getValue());
+ initialized = true;
+ }
+
+ protected void after()
+ {
+ if (!initialized)
+ {
+ throw new IllegalArgumentException("System property was not initialized");
+ }
+
+ doCleanUp();
+ restoreOldValue();
+
+ initialized = false;
+ }
+
+ protected void restoreOldValue()
+ {
+ if (oldValue == null)
+ {
+ System.clearProperty(name);
+ }
+ else
+ {
+ System.setProperty(name, oldValue);
+ }
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ protected void doCleanUp()
+ {
+ // Nothing to do
+ };
+
+ public String getValue()
+ {
+ return value;
+ };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,151 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+ public static final int THREAD_COUNT = 100;
+ public static final int ITERATIONS_PER_THREAD = 100;
+ private Latch threadStartLatch = new Latch();
+ private String sharedKeyA = "A";
+ private String sharedKeyB = "B";
+ private ServerLock<String> serverLock = new ServerLock<String>();
+ private InMemoryObjectStore objectStore = new InMemoryObjectStore();
+
+ public void testHighConcurrency() throws Exception
+ {
+ List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+ for (int i = 0; i < THREAD_COUNT; i++)
+ {
+ IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ }
+ threadStartLatch.release();
+ for (Thread thread : threads)
+ {
+ thread.join();
+ }
+ assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ }
+
+ public class IncrementKeyValueThread extends Thread
+ {
+ private String key;
+
+ public IncrementKeyValueThread(String key)
+ {
+ super("Thread-" + key);
+ this.key = key;
+ }
+
+ public void run()
+ {
+ try
+ {
+ threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+ for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+ {
+ if (Thread.interrupted())
+ {
+ break;
+ }
+ serverLock.lock(key);
+ try
+ {
+ Integer value;
+ if (objectStore.contains(key))
+ {
+ value = objectStore.retrieve(key);
+ objectStore.remove(key);
+ }
+ else
+ {
+ value = 0;
+ }
+ objectStore.store(key,value + 1);
+ }
+ finally
+ {
+ serverLock.unlock(key);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class InMemoryObjectStore implements ObjectStore<Integer>
+ {
+ private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+ public boolean contains(Serializable key) throws ObjectStoreException
+ {
+ return store.containsKey(key);
+ }
+
+ public void store(Serializable key, Integer value) throws ObjectStoreException
+ {
+ if (store.containsKey(key))
+ {
+ throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+ }
+ store.put(key,value);
+ }
+
+ public Integer retrieve(Serializable key) throws ObjectStoreException
+ {
+ return store.get(key);
+ }
+
+ public Integer remove(Serializable key) throws ObjectStoreException
+ {
+ return store.remove(key);
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+ }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml (24785 => 24786)
--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml 2012-08-21 05:18:14 UTC (rev 24786)
@@ -86,6 +86,8 @@
<bean name="_defaultRetryPolicyTemplate" class="org.mule.retry.policies.NoRetryPolicyTemplate"/>
+ <bean name="_muleLockFactory" class="org.mule.util.lock.ServerLockFactory"/>
+
<!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that
the transformers will be available even when using the TransientRegistry only -->
http://xircles.codehaus.org/manage_email
24786
Author
pablo.lagreca
Date
2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012)
Log Message
MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy
Modified Paths
branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml
Added Paths
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/
branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
Diff
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -148,6 +148,7 @@
public static final String OBJECT_DEFAULT_RETRY_POLICY_TEMPLATE = "_defaultRetryPolicyTemplate";
public static final String OBJECT_MULE_CONFIGURATION = "_muleConfiguration";
public static final String OBJECT_MULE_NAMESPACE_MANAGER = "_muleNamespaceManager";
+ public static final String OBJECT_LOCK_FACTORY = "_muleLockFactory";
// Not currently used as these need to be instance variables of the MuleContext.
public static final String OBJECT_WORK_MANAGER = "_muleWorkManager";
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -49,7 +49,7 @@
@Override
public void initialise() throws InitialisationException
{
- if (maxRedeliveryCount < 1)
+ if (maxRedeliveryCount < 0)
{
throw new InitialisationException(
CoreMessages.initialisationFailure(
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -25,10 +25,13 @@
import org.mule.transformer.simple.ObjectToByteArray;
import java.io.InputStream;
+import java.io.Serializable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.mule.util.lock.Lock;
+import org.mule.util.lock.LockFactory;
import org.mule.util.store.ObjectStorePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@
private String messageDigestAlgorithm;
private String idExpression;
private ObjectStore<AtomicInteger> store;
+ private Lock<Serializable> lock;
@Override
public void initialise() throws InitialisationException
@@ -95,6 +99,11 @@
}
}
+ String appName = muleContext.getConfiguration().getId();
+ String flowName = flowConstruct.getName();
+ String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+ lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+
store = createStore();
}
@@ -165,45 +174,55 @@
exceptionSeen = true;
}
- if (!exceptionSeen)
+ lock.lock(messageId);
+ try
{
- counter = findCounter(messageId);
- tooMany = counter != null && counter.get() > maxRedeliveryCount;
- }
- if (tooMany || exceptionSeen)
- {
- try
+ if (!exceptionSeen)
{
- return deadLetterQueue.process(event);
+ counter = findCounter(messageId);
+ tooMany = counter != null && counter.get() > maxRedeliveryCount;
}
- catch (Exception ex)
+
+ if (tooMany || exceptionSeen)
{
- logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+ try
+ {
+ return deadLetterQueue.process(event);
+ }
+ catch (Exception ex)
+ {
+ logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+ }
+ return null;
}
- return null;
- }
- try
- {
- MuleEvent returnEvent = processNext(event);
- counter = findCounter(messageId);
- if (counter != null)
+ try
{
- resetCounter(messageId);
+ MuleEvent returnEvent = processNext(event);
+ counter = findCounter(messageId);
+ if (counter != null)
+ {
+ resetCounter(messageId);
+ }
+ return returnEvent;
}
- return returnEvent;
+ catch (MuleException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ }
+ catch (RuntimeException ex)
+ {
+ incrementCounter(messageId);
+ throw ex;
+ }
}
- catch (MuleException ex)
+ finally
{
- incrementCounter(messageId);
- throw ex;
+ lock.unlock(messageId);
}
- catch (RuntimeException ex)
- {
- incrementCounter(messageId);
- throw ex;
- }
+
}
private void resetCounter(String messageId) throws ObjectStoreException
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,30 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+ /*
+ * Gets a lock over the resource identified with lockId
+ */
+ void lock(T lockId);
+
+ /*
+ * Releases lock over the resource identified with lockId
+ */
+ void unlock(T lockId);
+
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,25 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+ /**
+ * Creates a Lock for a given resource using the resource unique identifier.
+ */
+ Lock<T> createLock(String lockResourceName);
+
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,104 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+ private Map<T, LockEntry> locks;
+ private Object acquireLock = new Object();
+
+ public ServerLock()
+ {
+ this.locks = new HashMap<T,LockEntry>();
+ }
+
+ public void lock(T key)
+ {
+ LockEntry lock;
+ synchronized (acquireLock)
+ {
+ if (this.locks.containsKey(key))
+ {
+ lock = this.locks.get(key);
+ }
+ else
+ {
+ lock = new LockEntry();
+ this.locks.put(key,lock);
+ }
+ lock.incrementLockCount();
+ acquireLock.notifyAll();
+ }
+ lock.lock();
+ }
+
+ public void unlock(T key)
+ {
+ synchronized (acquireLock)
+ {
+ LockEntry lock = this.locks.get(key);
+ if (lock != null)
+ {
+ lock.decrementLockCount();
+ if (!lock.hasPendingLocks())
+ {
+ this.locks.remove(key);
+ }
+ lock.unlock();
+ }
+ acquireLock.notifyAll();
+ }
+ }
+
+ public static class LockEntry
+ {
+ private AtomicInteger lockCount = new AtomicInteger(0);
+ private ReentrantLock lock = new ReentrantLock(true);
+
+ public void lock()
+ {
+ lock.lock();
+ }
+
+ public void incrementLockCount()
+ {
+ lockCount.incrementAndGet();
+ }
+
+ public void decrementLockCount()
+ {
+ lockCount.decrementAndGet();
+ }
+
+ public void unlock()
+ {
+ lock.unlock();
+ }
+
+ public boolean hasPendingLocks()
+ {
+ return lockCount.get() > 0;
+ }
+ }
+
+ public void dispose()
+ {
+ locks.clear();
+ }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (0 => 24786)
--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,19 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+ public Lock<T> createLock(String lockResourceName)
+ {
+ return new ServerLock<T>();
+ }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24785 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -10,71 +10,97 @@
package org.mule.processor;
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
import org.mockito.Answers;
-import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
import org.mule.api.config.MuleProperties;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.store.ObjectStore;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
{
public static final String STRING_MESSAGE = "message";
- public static final int MAX_REDELIVERY_COUNT = 1;
+ public static final int MAX_REDELIVERY_COUNT = 0;
private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
- private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+ private Latch waitLatch = new Latch();
+ private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+ private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
+ public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
@Before
public void setUpTest() throws MuleException
{
when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
- System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+ when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+ {
+ public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+ {
+ waitingMessageProcessorExecutionLatch.countDown();
+ waitLatch.await(2000, TimeUnit.MILLISECONDS);
+ return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+ }
+ });
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+ when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+ InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+ when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+ when(event.getMessage()).thenReturn(message);
+ irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+ irp.setUseSecureHash(true);
+ irp.setFlowConstruct(mock(FlowConstruct.class));
+ irp.setMuleContext(mockMuleContext);
+ irp.setListener(mockFailingMessageProcessor);
+ irp.setMessageProcessor(mockDlqMessageProcessor);
+
}
@Test
public void messageDigestFailure() throws Exception
{
- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
+ when(message.getPayload()).thenReturn(new Object());
irp.initialise();
-
-
- when(message.getPayload()).thenReturn(new Object());
-
- when(event.getMessage()).thenReturn(message);
MuleEvent process = irp.process(event);
Assert.assertNull(process);
}
@@ -82,22 +108,42 @@
@Test
public void testMessageRedeliveryUsingMemory() throws Exception
{
- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
+ irp.initialise();
+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
- irp.setUseSecureHash(true);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
- irp.setDeadLetterQueue(mockDlqMessageProcessor);
+ public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ {
+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
irp.initialise();
+ processUntilFailure();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
+ public void testThreadSafeObjectStoreUsage() throws Exception
+ {
when(message.getPayload()).thenReturn(STRING_MESSAGE);
- when(event.getMessage()).thenReturn(message);
+ irp.setListener(mockWaitingMessageProcessor);
+ irp.initialise();
- for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+ ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+ firstIrpExecutionThread.start();
+ ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+ threadCausingRedeliveryException.start();
+ waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+ waitLatch.release();
+ firstIrpExecutionThread.join();
+ threadCausingRedeliveryException.join();
+ verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+ }
+
+ private void processUntilFailure()
+ {
+ for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
{
try
{
@@ -107,28 +153,14 @@
{
}
}
- verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
}
- public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ public class ExecuteIrpThread extends Thread
{
- Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
- Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
- IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
- irp.setUseSecureHash(true);
- irp.setMaxRedeliveryCount(1);
- irp.setFlowConstruct(mock(FlowConstruct.class));
- irp.setMuleContext(mockMuleContext);
- irp.setListener(mockFailingMessageProcessor);
- irp.setDeadLetterQueue(mockDlqMessageProcessor);
- irp.initialise();
-
- when(message.getPayload()).thenReturn(STRING_MESSAGE);
- when(event.getMessage()).thenReturn(message);
-
- for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+ public Exception exception;
+
+ public void run()
{
try
{
@@ -136,15 +168,18 @@
}
catch (Exception e)
{
+ exception = e;
}
}
- verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
}
+
+
public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
{
private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+ private ServerLock lockableObjectStore = new ServerLock();
@Override
public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +212,13 @@
{
return false;
}
+
}
public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
{
private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+ private ServerLock lockableObjectStore = new ServerLock();
@Override
public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +249,7 @@
{
return false;
}
+
}
}
Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+ private final String name;
+ private String value;
+ private boolean initialized;
+ private String oldValue;
+
+ public SystemProperty(String name)
+ {
+ this(name, null);
+ }
+
+ public SystemProperty(String name, String value)
+ {
+ this.name = name;
+ this.value = value;
+ }
+
+ protected void before() throws Throwable
+ {
+ if (initialized)
+ {
+ throw new IllegalArgumentException("System property was already initialized");
+ }
+
+ oldValue = System.setProperty(name, getValue());
+ initialized = true;
+ }
+
+ protected void after()
+ {
+ if (!initialized)
+ {
+ throw new IllegalArgumentException("System property was not initialized");
+ }
+
+ doCleanUp();
+ restoreOldValue();
+
+ initialized = false;
+ }
+
+ protected void restoreOldValue()
+ {
+ if (oldValue == null)
+ {
+ System.clearProperty(name);
+ }
+ else
+ {
+ System.setProperty(name, oldValue);
+ }
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ protected void doCleanUp()
+ {
+ // Nothing to do
+ };
+
+ public String getValue()
+ {
+ return value;
+ };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (0 => 24786)
--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java 2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,151 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+ public static final int THREAD_COUNT = 100;
+ public static final int ITERATIONS_PER_THREAD = 100;
+ private Latch threadStartLatch = new Latch();
+ private String sharedKeyA = "A";
+ private String sharedKeyB = "B";
+ private ServerLock<String> serverLock = new ServerLock<String>();
+ private InMemoryObjectStore objectStore = new InMemoryObjectStore();
+
+ public void testHighConcurrency() throws Exception
+ {
+ List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+ for (int i = 0; i < THREAD_COUNT; i++)
+ {
+ IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+ threads.add(incrementKeyValueThread);
+ incrementKeyValueThread.start();
+ }
+ threadStartLatch.release();
+ for (Thread thread : threads)
+ {
+ thread.join();
+ }
+ assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+ }
+
+ public class IncrementKeyValueThread extends Thread
+ {
+ private String key;
+
+ public IncrementKeyValueThread(String key)
+ {
+ super("Thread-" + key);
+ this.key = key;
+ }
+
+ public void run()
+ {
+ try
+ {
+ threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+ for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+ {
+ if (Thread.interrupted())
+ {
+ break;
+ }
+ serverLock.lock(key);
+ try
+ {
+ Integer value;
+ if (objectStore.contains(key))
+ {
+ value = objectStore.retrieve(key);
+ objectStore.remove(key);
+ }
+ else
+ {
+ value = 0;
+ }
+ objectStore.store(key,value + 1);
+ }
+ finally
+ {
+ serverLock.unlock(key);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class InMemoryObjectStore implements ObjectStore<Integer>
+ {
+ private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+ public boolean contains(Serializable key) throws ObjectStoreException
+ {
+ return store.containsKey(key);
+ }
+
+ public void store(Serializable key, Integer value) throws ObjectStoreException
+ {
+ if (store.containsKey(key))
+ {
+ throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+ }
+ store.put(key,value);
+ }
+
+ public Integer retrieve(Serializable key) throws ObjectStoreException
+ {
+ return store.get(key);
+ }
+
+ public Integer remove(Serializable key) throws ObjectStoreException
+ {
+ return store.remove(key);
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+ }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml (24785 => 24786)
--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml 2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml 2012-08-21 05:18:14 UTC (rev 24786)
@@ -86,6 +86,8 @@
<bean name="_defaultRetryPolicyTemplate" class="org.mule.retry.policies.NoRetryPolicyTemplate"/>
+ <bean name="_muleLockFactory" class="org.mule.util.lock.ServerLockFactory"/>
+
<!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that
the transformers will be available even when using the TransientRegistry only -->
http://xircles.codehaus.org/manage_email