Discussion:
[mule-scm] [mule][24613] branches/mule-3.2.x: EE-2770 - in a cluster, only one connection to jms will be stablished if connector has client id.
Daniel Feist
2012-07-13 13:14:07 UTC
Permalink
This commit has lots of badly formatted code :-( Please fix it.

Dan
Revision
24613
Author
pablo.lagreca
Date
2012-07-11 08:58:26 -0500 (Wed, 11 Jul 2012)
Log Message
EE-2770 - in a cluster, only one connection to jms will be stablished if connector has client id. Also, only one topic message receiver will be subscribed
Modified Paths
branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java
branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java
branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java
branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java
branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm
branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java
branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java
branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java
branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java
branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java
branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java
branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java
branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java
branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java
Added Paths
branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java
branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java
branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java
branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java
Removed Paths
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java
Diff
Added: branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java (0 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -0,0 +1,100 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.lifecycle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleException;
+import org.mule.api.context.notification.ClusterNodeNotificationListener;
+import org.mule.api.context.notification.ServerNotification;
+import org.mule.api.lifecycle.LifecycleState;
+import org.mule.api.lifecycle.LifecycleStateEnabled;
+import org.mule.api.lifecycle.Startable;
+import org.mule.context.notification.NotificationException;
+
+/**
+ *
+ * This class will start an Startable mule object that must only be started in the primary node.
+ *
+ */
+public class PrimaryNodeLifecycleNotificationListener implements ClusterNodeNotificationListener {
+
+ protected transient Log logger = LogFactory.getLog(getClass());
+ private Startable startMeOnPrimaryNodeNotification;
+ private MuleContext muleContext;
+
+ public PrimaryNodeLifecycleNotificationListener(Startable startMeOnPrimaryNodeNotification, MuleContext muleContext) {
+ this.startMeOnPrimaryNodeNotification = startMeOnPrimaryNodeNotification;
+ this.muleContext = muleContext;
+ }
+
+ public void register()
+ {
+ try
+ {
+ if (muleContext != null)
+ {
+ muleContext.registerListener(this);
+ }
+ }
+ catch (NotificationException e)
+ {
+ throw new RuntimeException("Unable to register listener", e);
+ }
+ }
+
+ public void onNotification(ServerNotification notification) {
+ try
+ {
+ if (startMeOnPrimaryNodeNotification instanceof LifecycleState)
+ {
+ if (((LifecycleState)startMeOnPrimaryNodeNotification).isStarted())
+ {
+ startMeOnPrimaryNodeNotification.start();
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Not starting Startable since it's not in started state");
+ }
+ }
+ }
+ else if (startMeOnPrimaryNodeNotification instanceof LifecycleStateEnabled)
+ {
+ if (((LifecycleStateEnabled)startMeOnPrimaryNodeNotification).getLifecycleState().isStarted())
+ {
+ startMeOnPrimaryNodeNotification.start();
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Not starting Startable since it's not in started state");
+ }
+ }
+ }
+ else
+ {
+ startMeOnPrimaryNodeNotification.start();
+ }
+ }
+ catch (MuleException e)
+ {
+ throw new RuntimeException("Error starting wrapped message source", e);
+ }
+ }
+
+ public void unregister() {
+ muleContext.unregisterListener(this);
+ }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java (24612 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -15,8 +15,6 @@
import org.mule.api.construct.FlowConstruct;
import org.mule.api.construct.FlowConstructAware;
import org.mule.api.context.MuleContextAware;
-import org.mule.api.context.notification.ClusterNodeNotificationListener;
-import org.mule.api.context.notification.ServerNotification;
import org.mule.api.lifecycle.Disposable;
import org.mule.api.lifecycle.Initialisable;
import org.mule.api.lifecycle.InitialisationException;
@@ -26,21 +24,22 @@
import org.mule.api.processor.MessageProcessor;
import org.mule.api.source.ClusterizableMessageSource;
import org.mule.api.source.MessageSource;
-import org.mule.context.notification.NotificationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.mule.lifecycle.PrimaryNodeLifecycleNotificationListener;
/**
* of the wrapped instance differently depending if the node is primary or not
* inside a cluster. Non clustered nodes are always primary.
*/
-public class ClusterizableMessageSourceWrapper implements MessageSource, Lifecycle, ClusterNodeNotificationListener, MuleContextAware, FlowConstructAware
+public class ClusterizableMessageSourceWrapper implements MessageSource, Lifecycle, MuleContextAware, FlowConstructAware
{
protected static final Log logger = LogFactory.getLog(ClusterizableMessageSourceWrapper.class);
+ private PrimaryNodeLifecycleNotificationListener primaryNodeLifecycleNotificationListener;
private final ClusterizableMessageSource messageSource;
private MuleContext muleContext;
private FlowConstruct flowConstruct;
@@ -68,8 +67,18 @@
@Override
public void initialise() throws InitialisationException
{
- registerNotificationListener();
+ primaryNodeLifecycleNotificationListener = new PrimaryNodeLifecycleNotificationListener(new Startable() {
+ public void start() throws MuleException {
+ if (ClusterizableMessageSourceWrapper.this.isStarted())
+ {
+ ClusterizableMessageSourceWrapper.this.start();
+ }
+ }
+ },muleContext);
+ primaryNodeLifecycleNotificationListener.register();
+
if (messageSource instanceof Initialisable)
{
((Initialisable) messageSource).initialise();
@@ -92,8 +101,6 @@
logger.info("Starting clusterizable message source");
}
((Startable) messageSource).start();
-
- started = true;
}
else
{
@@ -103,6 +110,7 @@
}
}
}
+ started = true;
}
}
}
@@ -131,55 +139,10 @@
((Disposable) messageSource).dispose();
}
- unregisterNotificationListener();
+ primaryNodeLifecycleNotificationListener.unregister();
}
- public void onNotification(ServerNotification notification)
- {
- if (flowConstruct != null && flowConstruct.getLifecycleState().isStarted())
- {
- try
- {
- start();
- }
- catch (MuleException e)
- {
- throw new RuntimeException("Error starting wrapped message source", e);
- }
- }
- else
- {
- if (logger.isDebugEnabled())
- {
- logger.debug("Clusterizable message source no started on stopped flow");
- }
- }
- }
- protected void registerNotificationListener()
- {
- try
- {
- if (muleContext != null)
- {
- muleContext.registerListener(this);
- }
- }
- catch (NotificationException e)
- {
- throw new RuntimeException("Unable to register listener", e);
- }
- }
-
- protected void unregisterNotificationListener()
- {
- if (muleContext != null)
- {
- muleContext.unregisterListener(this);
- }
- }
-
@Override
public void setFlowConstruct(FlowConstruct flowConstruct)
{
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java (24612 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -14,6 +14,7 @@
import org.mule.MessageExchangePattern;
import org.mule.OptimizedRequestContext;
import org.mule.ResponseOutputStream;
+import org.mule.api.DefaultMuleException;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
@@ -25,6 +26,7 @@
import org.mule.api.endpoint.InboundEndpoint;
import org.mule.api.lifecycle.CreateException;
import org.mule.api.lifecycle.InitialisationException;
+import org.mule.api.lifecycle.Startable;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.routing.filter.FilterUnacceptedException;
import org.mule.api.transaction.Transaction;
@@ -34,6 +36,7 @@
import org.mule.api.transport.PropertyScope;
import org.mule.api.transport.ReplyToHandler;
import org.mule.context.notification.EndpointMessageNotification;
+import org.mule.lifecycle.PrimaryNodeLifecycleNotificationListener;
import org.mule.session.DefaultMuleSession;
import org.mule.session.LegacySessionHandler;
import org.mule.transaction.TransactionCoordination;
@@ -82,6 +85,7 @@
protected List<Transformer> defaultResponseTransformers;
protected ReplyToHandler replyToHandler;
+ private PrimaryNodeLifecycleNotificationListener primaryNodeLifecycleNotificationListener;
/**
* Creates the Message Receiver
@@ -135,7 +139,26 @@
defaultResponseTransformers = connector.getDefaultResponseTransformers(endpoint);
replyToHandler = getReplyToHandler();
-
+
+ if (!shouldConsumeInEveryNode() && !flowConstruct.getMuleContext().isPrimaryPollingInstance())
+ {
+ primaryNodeLifecycleNotificationListener = new PrimaryNodeLifecycleNotificationListener(new Startable() {
+ public void start() throws MuleException {
+ if (AbstractMessageReceiver.this.isStarted())
+ {
+ try {
+ AbstractMessageReceiver.this.doConnect();
+ } catch (Exception e) {
+ throw new DefaultMuleException(e);
+ }
+ AbstractMessageReceiver.this.doStart();
+ }
+ }
+ },flowConstruct.getMuleContext());
+ primaryNodeLifecycleNotificationListener.register();
+ }
+
super.initialise();
}
@@ -374,6 +397,10 @@
{
this.listener = null;
this.flowConstruct = null;
+ if (primaryNodeLifecycleNotificationListener != null)
+ {
+ primaryNodeLifecycleNotificationListener.unregister();
+ }
super.doDispose();
}
@@ -381,4 +408,50 @@
{
return ((AbstractConnector) endpoint.getConnector()).getReplyToHandler(endpoint);
}
+
+ /**
+ * Determines whether to start or not the MessageSource base on the running node state.
+ *
+ */
+ public boolean shouldConsumeInEveryNode()
+ {
+ return true;
+ }
+
+ final protected void connectHandler() throws Exception {
+ if (shouldConsumeInEveryNode() || getFlowConstruct().getMuleContext().isPrimaryPollingInstance())
+ {
+ if (logger.isInfoEnabled())
+ {
+ logger.info("Connecting clusterizable message receiver");
+ }
+ doConnect();
+ }
+ else
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Clusterizable message receiver not connected on this node");
+ }
+ }
+ }
+
+ final protected void doStartHandler() throws MuleException {
+ if (shouldConsumeInEveryNode() || getFlowConstruct().getMuleContext().isPrimaryPollingInstance())
+ {
+ if (logger.isInfoEnabled())
+ {
+ logger.info("Starting clusterizable message receiver");
+ }
+ doStart();
+ }
+ else
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Clusterizable message receiver not started on this node");
+ }
+ }
+ }
}
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java (24612 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -215,7 +215,7 @@
logger.debug("Connecting: " + this);
}
- doConnect();
+ connectHandler();
connected.set(true);
if (logger.isDebugEnabled())
@@ -224,6 +224,12 @@
}
}
+ //TODO - This template method belongs to AbstractMessageReceiver. Not possible to move it in mule 3.x - bc compatibility.
+ protected void connectHandler() throws Exception
+ {
+ this.doConnect();
+ }
+
public RetryContext validateConnection(RetryContext retryContext)
{
retryContext.setOk();
@@ -311,13 +317,17 @@
lifecycleManager.fireStartPhase(new LifecycleCallback<O>()
{
- public void onTransition(String phaseName, O object) throws MuleException
- {
- doStart();
+ public void onTransition(String phaseName, O object) throws MuleException {
+ doStartHandler();
}
});
}
+ protected void doStartHandler() throws MuleException
+ {
+ doStart();
+ }
+
public final void stop() throws MuleException
{
lifecycleManager.fireStopPhase(new LifecycleCallback<O>()
Added: branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java (0 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -0,0 +1,109 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.lifecycle;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleException;
+import org.mule.api.context.notification.ServerNotification;
+import org.mule.api.lifecycle.LifecycleState;
+import org.mule.api.lifecycle.LifecycleStateEnabled;
+import org.mule.api.lifecycle.Startable;
+import org.mule.context.notification.NotificationException;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PrimaryNodeLifecycleNotificationListenerTestCase extends AbstractMuleTestCase {
+
+ private MuleContext mockMuleContext;
+ private Startable mockStartable;
+ private ServerNotification mockServerNotification;
+ private StartableAndLifecycleState mockStartableAndLifecycleState;
+ private StartableAndLifecycleStateEnabled mockStartableAndLifecycleStateEnabled;
+ private PrimaryNodeLifecycleNotificationListener notificationListener;
+
+ public void setUpTest()
+ {
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartable, mockMuleContext);
+ }
+
+ public void testRegister() throws NotificationException
+ {
+ this.notificationListener.register();
+ verify(mockMuleContext, times(1)).registerListener(notificationListener);
+ }
+
+ public void testUnregister() throws NotificationException
+ {
+ this.notificationListener.unregister();
+ verify(mockMuleContext, times(1)).unregisterListener(notificationListener);
+ }
+
+ public void testOnNotificationWithStartable() throws MuleException {
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartable,times(1)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateStarted() throws MuleException {
+ mockStartableAndLifecycleState = mock(StartableAndLifecycleState.class);
+ when(mockStartableAndLifecycleState.isStarted()).thenReturn(true);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleState,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleState,times(1)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateStopped() throws MuleException {
+ mockStartableAndLifecycleState = mock(StartableAndLifecycleState.class);
+ when(mockStartableAndLifecycleState.isStarted()).thenReturn(false);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleState,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleState,times(0)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateEnabledStarted() throws MuleException {
+ mockStartableAndLifecycleStateEnabled = mock(StartableAndLifecycleStateEnabled.class, Answers.RETURNS_DEEP_STUBS.get());
+ when(mockStartableAndLifecycleStateEnabled.getLifecycleState().isStarted()).thenReturn(true);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleStateEnabled,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleStateEnabled,times(1)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateEnabledStopped() throws MuleException {
+ mockStartableAndLifecycleStateEnabled = mock(StartableAndLifecycleStateEnabled.class, Answers.RETURNS_DEEP_STUBS.get());
+ when(mockStartableAndLifecycleStateEnabled.getLifecycleState().isStarted()).thenReturn(false);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleStateEnabled,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleStateEnabled,times(0)).start();
+ }
+
+ private interface StartableAndLifecycleStateEnabled extends Startable, LifecycleStateEnabled{}
+ private interface StartableAndLifecycleState extends Startable, LifecycleState{}
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -10,17 +10,21 @@
package org.mule.source;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.mule.api.MuleContext;
-import org.mule.api.construct.FlowConstruct;
import org.mule.api.lifecycle.Lifecycle;
-import org.mule.api.lifecycle.LifecycleState;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.source.ClusterizableMessageSource;
import org.mule.context.notification.ClusterNodeNotification;
+import org.mule.lifecycle.PrimaryNodeLifecycleNotificationListener;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.junit.Test;
@@ -45,6 +49,10 @@
@Test
public void delegatesDispose() throws Exception
{
+ wrapper.setMuleContext(muleContext);
+
+ wrapper.initialise();
+
wrapper.dispose();
verify(messageSource, times(1)).dispose();
@@ -85,7 +93,7 @@
wrapper.initialise();
- verify(muleContext, times(1)).registerListener(wrapper);
+ verify(muleContext, times(1)).registerListener(Mockito.any(PrimaryNodeLifecycleNotificationListener.class));
}
@Test
@@ -114,16 +122,16 @@
public void ignoresMessageSourceOnNotificationIfFlowIsStopped() throws Exception
{
when(muleContext.isPrimaryPollingInstance()).thenReturn(true);
- LifecycleState lifecycleState = mock(LifecycleState.class);
- when(lifecycleState.isStarted()).thenReturn(false);
- FlowConstruct flowConstruct = mock(FlowConstruct.class);
- when(flowConstruct.getLifecycleState()).thenReturn(lifecycleState);
-
wrapper.setMuleContext(muleContext);
- wrapper.setFlowConstruct(flowConstruct);
+ Mockito.doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((PrimaryNodeLifecycleNotificationListener) invocationOnMock.getArguments()[0]).onNotification(new ClusterNodeNotification("",1));
+ return null;
+ }
+ }).when(muleContext).registerListener(isA(PrimaryNodeLifecycleNotificationListener.class));
- wrapper.onNotification(mock(ClusterNodeNotification.class));
-
+ wrapper.initialise();
verify(messageSource, times(0)).start();
}
@@ -131,16 +139,17 @@
public void startsMessageSourceOnNotificationIfFlowIsStarted() throws Exception
{
when(muleContext.isPrimaryPollingInstance()).thenReturn(true);
- LifecycleState lifecycleState = mock(LifecycleState.class);
- when(lifecycleState.isStarted()).thenReturn(true);
- FlowConstruct flowConstruct = mock(FlowConstruct.class);
- when(flowConstruct.getLifecycleState()).thenReturn(lifecycleState);
-
wrapper.setMuleContext(muleContext);
- wrapper.setFlowConstruct(flowConstruct);
+ Mockito.doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((PrimaryNodeLifecycleNotificationListener) invocationOnMock.getArguments()[0]).onNotification(new ClusterNodeNotification("",1));
+ return null;
+ }
+ }).when(muleContext).registerListener(isA(PrimaryNodeLifecycleNotificationListener.class));
- wrapper.onNotification(mock(ClusterNodeNotification.class));
-
+ wrapper.start();
+ wrapper.initialise();
verify(messageSource, times(1)).start();
}
@@ -160,10 +169,9 @@
public void unregistersListenerOnDispose() throws Exception
{
wrapper.setMuleContext(muleContext);
-
+ wrapper.initialise();
wrapper.dispose();
-
- verify(muleContext, times(1)).unregisterListener(wrapper);
+ verify(muleContext, times(1)).unregisterListener(isA(PrimaryNodeLifecycleNotificationListener.class));
}
private interface TestMessageSource extends ClusterizableMessageSource, Lifecycle
Copied: branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java (from rev 24564, branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java) (0 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -0,0 +1,69 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.transport;
+
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.api.service.Service;
+import org.mule.api.transport.MessageReceiver;
+import org.mule.tck.junit4.AbstractMuleContextTestCase;
+import org.mule.tck.testmodels.fruit.Orange;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public abstract class AbstractMessageReceiverTemplateTestCase extends AbstractMuleContextTestCase
+{
+ protected Service service;
+ protected InboundEndpoint endpoint;
+
+ protected void doSetUp() throws Exception
+ {
+ service = getTestService("orange", Orange.class);
+ endpoint = getEndpoint();
+ }
+
+ public void testCreate() throws Exception
+ {
+ Service service = getTestService("orange", Orange.class);
+ InboundEndpoint endpoint = getTestInboundEndpoint("Test");
+ MessageReceiver receiver = getMessageReceiver();
+
+ assertNotNull(receiver.getEndpoint());
+
+ try
+ {
+ receiver.setEndpoint(null);
+ fail("Provider cannot be set to null");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // expected
+ }
+
+ receiver.setEndpoint(endpoint);
+ assertNotNull(receiver.getEndpoint());
+
+ receiver.dispose();
+ }
+
+ public abstract MessageReceiver getMessageReceiver() throws Exception;
+
+ /**
+ * Implementations of this method should ensure that the correct connector is set
+ * on the endpoint
+ *
+ */
+ public abstract InboundEndpoint getEndpoint() throws Exception;
+}
Property changes: branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java
Added: svn:keywords
Added: svn:eol-style
Deleted: branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -1,69 +0,0 @@
-/*
- * $Id$
- * --------------------------------------------------------------------------------------
- * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
- *
- * The software in this package is published under the terms of the CPAL v1.0
- * license, a copy of which has been included with this distribution in the
- * LICENSE.txt file.
- */
-
-package org.mule.transport;
-
-import org.mule.api.endpoint.InboundEndpoint;
-import org.mule.api.service.Service;
-import org.mule.api.transport.MessageReceiver;
-import org.mule.tck.junit4.AbstractMuleContextTestCase;
-import org.mule.tck.testmodels.fruit.Orange;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-public abstract class AbstractMessageReceiverTestCase extends AbstractMuleContextTestCase
-{
- protected Service service;
- protected InboundEndpoint endpoint;
-
- protected void doSetUp() throws Exception
- {
- service = getTestService("orange", Orange.class);
- endpoint = getEndpoint();
- }
-
- public void testCreate() throws Exception
- {
- Service service = getTestService("orange", Orange.class);
- InboundEndpoint endpoint = getTestInboundEndpoint("Test");
- MessageReceiver receiver = getMessageReceiver();
-
- assertNotNull(receiver.getEndpoint());
-
- try
- {
- receiver.setEndpoint(null);
- fail("Provider cannot be set to null");
- }
- catch (IllegalArgumentException e)
- {
- // expected
- }
-
- receiver.setEndpoint(endpoint);
- assertNotNull(receiver.getEndpoint());
-
- receiver.dispose();
- }
-
- public abstract MessageReceiver getMessageReceiver() throws Exception;
-
- /**
- * Implementations of this method should ensure that the correct connector is set
- * on the endpoint
- *
- */
- public abstract InboundEndpoint getEndpoint() throws Exception;
-}
Modified: branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm (24612 => 24613)
--- branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm 2012-07-11 13:58:26 UTC (rev 24613)
@@ -15,7 +15,7 @@
import org.mule.api.endpoint.InboundEndpoint;
import org.mule.api.service.Service;
import org.mule.api.transport.MessageReceiver;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import com.mockobjects.dynamic.Mock;
Modified: branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -14,14 +14,14 @@
import org.mule.api.service.Service;
import org.mule.api.transport.MessageReceiver;
import org.mule.tck.MuleTestUtils;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import org.mule.util.FileUtils;
import com.mockobjects.dynamic.Mock;
import java.io.File;
-public class FileMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class FileMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
File read = FileUtils.newFile("testcasedata/read");
File move = FileUtils.newFile("testcasedata/move");
Modified: branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -16,13 +16,13 @@
import org.mule.api.transport.MessageReceiver;
import org.mule.endpoint.EndpointURIEndpointBuilder;
import org.mule.service.ServiceCompositeMessageSource;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import org.mule.transport.http.transformers.MuleMessageToHttpResponse;
import org.mule.util.CollectionUtils;
import com.mockobjects.dynamic.Mock;
-public class HttpMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class HttpMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
public MessageReceiver getMessageReceiver() throws Exception
{
Modified: branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java (24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -14,6 +14,7 @@
import org.mule.api.MuleException;
import org.mule.api.MuleRuntimeException;
import org.mule.api.construct.FlowConstruct;
+import org.mule.api.context.notification.ClusterNodeNotificationListener;
import org.mule.api.context.notification.ConnectionNotificationListener;
import org.mule.api.endpoint.ImmutableEndpoint;
import org.mule.api.endpoint.InboundEndpoint;
@@ -26,6 +27,7 @@
import org.mule.config.ExceptionHelper;
import org.mule.config.i18n.CoreMessages;
import org.mule.config.i18n.MessageFactory;
+import org.mule.context.notification.ClusterNodeNotification;
import org.mule.context.notification.ConnectionNotification;
import org.mule.context.notification.NotificationException;
import org.mule.routing.MessageFilter;
@@ -450,6 +452,27 @@
return connection;
}
+ public void connect() throws Exception {
+ if (muleContext.isPrimaryPollingInstance() || clientId == null)
+ {
+ super.connect();
+ }
+ else
+ {
+ muleContext.registerListener(new ClusterNodeNotificationListener<ClusterNodeNotification>() {
+ public void onNotification(ClusterNodeNotification notification) {
+ try {
+ JmsConnector.this.connect();
+ } catch (Exception e) {
+ throw new MuleRuntimeException(e);
+ }
+ }
+ });
+ }
+ }
+
public void onException(JMSException jmsException)
{
final JmsConnector jmsConnector = JmsConnector.this;
Modified: branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java (24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -10,9 +10,7 @@
package org.mule.transport.jms;
-import org.mule.MessageExchangePattern;
import org.mule.api.MuleException;
-import org.mule.api.MuleMessage;
import org.mule.api.MuleRuntimeException;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.endpoint.InboundEndpoint;
@@ -21,7 +19,6 @@
import org.mule.api.transaction.Transaction;
import org.mule.api.transaction.TransactionException;
import org.mule.api.transport.Connector;
-import org.mule.api.transport.ReplyToHandler;
import org.mule.transport.AbstractMessageReceiver;
import org.mule.transport.AbstractReceiverWorker;
import org.mule.transport.ConnectException;
@@ -55,12 +52,14 @@
protected MessageConsumer consumer;
protected Session session;
protected boolean startOnConnect = false;
+ private final boolean topic;
public JmsMessageReceiver(Connector connector, FlowConstruct flowConstruct, InboundEndpoint endpoint)
throws CreateException
{
super(connector, flowConstruct, endpoint);
this.connector = (JmsConnector) connector;
+ topic = this.connector.getTopicResolver().isTopic(endpoint);
try
{
@@ -99,6 +98,11 @@
}
}
+ public boolean shouldConsumeInEveryNode() {
+ return !this.topic;
+ }
+
protected class JmsWorker extends AbstractReceiverWorker
{
public JmsWorker(Message message, AbstractMessageReceiver receiver)
@@ -234,8 +238,6 @@
session = this.connector.getSession(endpoint);
}
- boolean topic = connector.getTopicResolver().isTopic(endpoint);
-
// Create destination
Destination dest = jmsSupport.createDestination(session, endpoint);
Modified: branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java (24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -10,7 +10,6 @@
package org.mule.transport.jms;
-import org.mule.MessageExchangePattern;
import org.mule.api.MessagingException;
import org.mule.api.MuleException;
import org.mule.api.construct.FlowConstruct;
@@ -21,7 +20,6 @@
import org.mule.api.transaction.Transaction;
import org.mule.api.transaction.TransactionException;
import org.mule.api.transport.Connector;
-import org.mule.api.transport.ReplyToHandler;
import org.mule.transaction.TransactionCollection;
import org.mule.transport.AbstractMessageReceiver;
import org.mule.transport.AbstractReceiverWorker;
@@ -58,6 +56,8 @@
private final JmsConnector jmsConnector;
+ final boolean isTopic;
+
public MultiConsumerJmsMessageReceiver(Connector connector, FlowConstruct flowConstruct, InboundEndpoint endpoint)
throws CreateException
{
@@ -65,7 +65,7 @@
jmsConnector = (JmsConnector) connector;
- final boolean isTopic = jmsConnector.getTopicResolver().isTopic(endpoint, true);
+ isTopic = jmsConnector.getTopicResolver().isTopic(endpoint, true);
if (isTopic && jmsConnector.getNumberOfConsumers() != 1)
{
if (logger.isInfoEnabled())
@@ -152,14 +152,19 @@
}
}
consumers.clear();
- }
+ }
@Override
protected void doDispose()
{
logger.debug("doDispose()");
- }
+ }
+ public boolean shouldConsumeInEveryNode() {
+ return !this.isTopic;
+ }
+
private class SubReceiver implements MessageListener
{
private final Log subLogger = LogFactory.getLog(getClass());
Modified: branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java (24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -53,8 +53,9 @@
/** determines whether Multiple receivers are created to improve throughput */
protected boolean useMultipleReceivers = true;
+ private final boolean topic;
-
+
public TransactedSingleResourceJmsMessageReceiver(Connector connector,
FlowConstruct flowConstruct,
InboundEndpoint endpoint) throws CreateException
@@ -63,7 +64,7 @@
super(connector, flowConstruct, endpoint);
this.connector = (JmsConnector) connector;
-
+ topic = this.connector.getTopicResolver().isTopic(endpoint);
// TODO check which properties being set in the TransecteJmsMessage receiver
// are needed...
@@ -97,8 +98,6 @@
}
// Create destination
- boolean topic = connector.getTopicResolver().isTopic(endpoint);
-
Destination dest = jmsSupport.createDestination(session, endpoint);
// Extract jms selector
String selector = null;
@@ -283,4 +282,9 @@
routeMessage(messageToRoute);
}
}
+
+ public boolean shouldConsumeInEveryNode() {
+ return !this.topic;
+ }
}
Modified: branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java (24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -49,7 +49,13 @@
protected final ThreadContextLocal context = new ThreadContextLocal();
protected final long timeout;
private final AtomicReference<RedeliveryHandler> redeliveryHandler = new AtomicReference<RedeliveryHandler>();
+ private final boolean topic;
+ public boolean shouldConsumeInEveryNode() {
+ return !this.topic;
+ }
+
/**
* Holder receiving the session and consumer for this thread.
*/
@@ -105,7 +111,7 @@
this.reuseSession);
// Do extra validation, XA Topic & reuse are incompatible. See MULE-2622
- boolean topic = this.connector.getTopicResolver().isTopic(getEndpoint());
+ topic = this.connector.getTopicResolver().isTopic(getEndpoint());
if (topic && (reuseConsumer || reuseSession))
{
logger.warn("Destination " + getEndpoint().getEndpointURI() + " is a topic and XA transaction was " +
Added: branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java (0 => 24613)
--- branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java (rev 0)
+++ branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -0,0 +1,55 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.jms;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+public class MultiConsumerJmsMessageReceiverTest extends AbstractMuleTestCase {
+
+
+ private JmsConnector mockJmsConnector;
+ private FlowConstruct mockFlowConstruct;
+ private InboundEndpoint mockInboundEndpoint;
+
+ public void testTopicReceiverShouldBeStartedOnlyInPrimaryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint,true)).thenReturn(true);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ MultiConsumerJmsMessageReceiver messageReceiver = new MultiConsumerJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(false));
+ }
+
+ public void testQueueReceiverShouldBeStartedInEveryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint,true)).thenReturn(false);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ MultiConsumerJmsMessageReceiver messageReceiver = new MultiConsumerJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(true));
+ }
+
+
+}
Property changes on: branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java (0 => 24613)
--- branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java (rev 0)
+++ branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -0,0 +1,55 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.jms;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+public class XaTransactedJmsMessageReceiverTest extends AbstractMuleTestCase {
+
+
+ private JmsConnector mockJmsConnector;
+ private FlowConstruct mockFlowConstruct;
+ private InboundEndpoint mockInboundEndpoint;
+
+ public void testTopicReceiverShouldBeStartedOnlyInPrimaryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint)).thenReturn(true);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ XaTransactedJmsMessageReceiver messageReceiver = new XaTransactedJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(false));
+ }
+
+ public void testQueueReceiverShouldBeStartedInEveryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint)).thenReturn(false);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ XaTransactedJmsMessageReceiver messageReceiver = new XaTransactedJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(true));
+ }
+
+
+}
Property changes on: branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -15,11 +15,11 @@
import org.mule.api.transport.MessageReceiver;
import org.mule.service.ServiceCompositeMessageSource;
import org.mule.transport.AbstractConnector;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import com.mockobjects.dynamic.Mock;
-public class MulticastMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class MulticastMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
public MessageReceiver getMessageReceiver() throws Exception
{
Modified: branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -15,7 +15,7 @@
import org.mule.api.lifecycle.InitialisationException;
import org.mule.config.i18n.MessageFactory;
import org.mule.endpoint.EndpointURIEndpointBuilder;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import org.mule.util.concurrent.Latch;
import java.rmi.Naming;
@@ -44,7 +44,7 @@
*
*/
-public class RmiMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class RmiMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
private static Log LOGGER = LogFactory.getLog(RmiMessageReceiverTestCase.class);
Modified: branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -14,11 +14,11 @@
import org.mule.api.service.Service;
import org.mule.api.transport.MessageReceiver;
import org.mule.service.ServiceCompositeMessageSource;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import com.mockobjects.dynamic.Mock;
-public class SslMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class SslMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
public MessageReceiver getMessageReceiver() throws Exception
{
Modified: branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -15,14 +15,14 @@
import org.mule.api.service.Service;
import org.mule.api.transport.MessageReceiver;
import org.mule.tck.testmodels.fruit.Orange;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-public class StdioMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class StdioMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
@Test
Modified: branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -15,11 +15,11 @@
import org.mule.api.transport.MessageReceiver;
import org.mule.service.ServiceCompositeMessageSource;
import org.mule.transport.AbstractConnector;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import com.mockobjects.dynamic.Mock;
-public class TcpMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class TcpMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
@Override
Modified: branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -16,11 +16,11 @@
import org.mule.api.transport.MessageReceiver;
import org.mule.endpoint.EndpointURIEndpointBuilder;
import org.mule.service.ServiceCompositeMessageSource;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import com.mockobjects.dynamic.Mock;
-public class UdpMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class UdpMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
@Override
public MessageReceiver getMessageReceiver() throws Exception
Modified: branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -12,9 +12,9 @@
import org.mule.api.endpoint.InboundEndpoint;
import org.mule.api.transport.MessageReceiver;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
-public class VMMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class VMMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
VMMessageReceiver receiver;
http://xircles.codehaus.org/manage_email
Pablo Kraan
2012-07-14 22:24:32 UTC
Permalink
This revision breaks the build of the "Test for Mule Transport Archetype"
module
Also remove the duplicate code at
PrimaryNodeLifecycleNotificationListener#onNotification

Pablo
Post by Daniel Feist
This commit has lots of badly formatted code :-( Please fix it.
Dan
Revision 24613 <http://fisheye.codehaus.org/changelog/mule/?cs=24613>
Author pablo.lagreca Date 2012-07-11 08:58:26 -0500 (Wed, 11 Jul 2012) Log
Message
EE-2770 - in a cluster, only one connection to jms will be stablished if connector has client id. Also, only one topic message receiver will be subscribed
Modified Paths
-
branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java
-
branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm
-
branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java
-
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java
-
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java
-
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java
-
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java
-
branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java
Added Paths
-
branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java
-
branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java
-
branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java
Removed Paths
-
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java
Diff
branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java
(0 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.lifecycle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleException;
+import org.mule.api.context.notification.ClusterNodeNotificationListener;
+import org.mule.api.context.notification.ServerNotification;
+import org.mule.api.lifecycle.LifecycleState;
+import org.mule.api.lifecycle.LifecycleStateEnabled;
+import org.mule.api.lifecycle.Startable;
+import org.mule.context.notification.NotificationException;
+
+/**
+ *
+ * This class will start an Startable mule object that must only be started in the primary node.
+ *
+ */
+public class PrimaryNodeLifecycleNotificationListener implements ClusterNodeNotificationListener {
+
+ protected transient Log logger = LogFactory.getLog(getClass());
+ private Startable startMeOnPrimaryNodeNotification;
+ private MuleContext muleContext;
+
+ public PrimaryNodeLifecycleNotificationListener(Startable startMeOnPrimaryNodeNotification, MuleContext muleContext) {
+ this.startMeOnPrimaryNodeNotification = startMeOnPrimaryNodeNotification;
+ this.muleContext = muleContext;
+ }
+
+ public void register()
+ {
+ try
+ {
+ if (muleContext != null)
+ {
+ muleContext.registerListener(this);
+ }
+ }
+ catch (NotificationException e)
+ {
+ throw new RuntimeException("Unable to register listener", e);
+ }
+ }
+
+ public void onNotification(ServerNotification notification) {
+ try
+ {
+ if (startMeOnPrimaryNodeNotification instanceof LifecycleState)
+ {
+ if (((LifecycleState)startMeOnPrimaryNodeNotification).isStarted())
+ {
+ startMeOnPrimaryNodeNotification.start();
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Not starting Startable since it's not in started state");
+ }
+ }
+ }
+ else if (startMeOnPrimaryNodeNotification instanceof LifecycleStateEnabled)
+ {
+ if (((LifecycleStateEnabled)startMeOnPrimaryNodeNotification).getLifecycleState().isStarted())
+ {
+ startMeOnPrimaryNodeNotification.start();
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Not starting Startable since it's not in started state");
+ }
+ }
+ }
+ else
+ {
+ startMeOnPrimaryNodeNotification.start();
+ }
+ }
+ catch (MuleException e)
+ {
+ throw new RuntimeException("Error starting wrapped message source", e);
+ }
+ }
+
+ public void unregister() {
+ muleContext.unregisterListener(this);
+ }
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java
(24612 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java 2012-07-06 20:23:46 UTC (rev 24612)
+ public void start() throws MuleException {
+ if (ClusterizableMessageSourceWrapper.this.isStarted())
+ {
+ ClusterizableMessageSourceWrapper.this.start();
+ }
+ }
+ },muleContext); + primaryNodeLifecycleNotificationListener.register();
- public void onNotification(ServerNotification notification)
- {
- if (flowConstruct != null && flowConstruct.getLifecycleState().isStarted())
- {
- try
- {
- start();
- }
- catch (MuleException e)
- {
- throw new RuntimeException("Error starting wrapped message source", e);
- }
- }
- else
- {
- if (logger.isDebugEnabled())
- {
- logger.debug("Clusterizable message source no started on stopped flow");
- }
- }
- } - protected void registerNotificationListener()
- {
- try
- {
- if (muleContext != null)
- {
- muleContext.registerListener(this);
- }
- }
- catch (NotificationException e)
- {
- throw new RuntimeException("Unable to register listener", e);
- }
- }
-
- protected void unregisterNotificationListener()
- {
- if (muleContext != null)
- {
- muleContext.unregisterListener(this);
- }
- }
branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java
(24612 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+ if (!shouldConsumeInEveryNode() && !flowConstruct.getMuleContext().isPrimaryPollingInstance())
+ {
+ primaryNodeLifecycleNotificationListener = new PrimaryNodeLifecycleNotificationListener(new Startable() {
+ public void start() throws MuleException {
+ if (AbstractMessageReceiver.this.isStarted())
+ {
+ try {
+ AbstractMessageReceiver.this.doConnect();
+ } catch (Exception e) {
+ throw new DefaultMuleException(e);
+ }
+ AbstractMessageReceiver.this.doStart();
+ }
+ }
+ },flowConstruct.getMuleContext());
+ primaryNodeLifecycleNotificationListener.register();
+ }
+ {
+ primaryNodeLifecycleNotificationListener.unregister();
+ /**
+ * Determines whether to start or not the MessageSource base on the running node state.
+ *
+ */
+ public boolean shouldConsumeInEveryNode()
+ {
+ return true;
+ }
+
+ final protected void connectHandler() throws Exception {
+ if (shouldConsumeInEveryNode() || getFlowConstruct().getMuleContext().isPrimaryPollingInstance())
+ {
+ if (logger.isInfoEnabled())
+ {
+ logger.info("Connecting clusterizable message receiver");
+ }
+ doConnect();
+ }
+ else
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Clusterizable message receiver not connected on this node");
+ }
+ }
+ }
+
+ final protected void doStartHandler() throws MuleException {
+ if (shouldConsumeInEveryNode() || getFlowConstruct().getMuleContext().isPrimaryPollingInstance())
+ {
+ if (logger.isInfoEnabled())
+ {
+ logger.info("Starting clusterizable message receiver");
+ }
+ doStart();
+ }
+ else
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Clusterizable message receiver not started on this node");
+ }
+ }
+ } }
branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java
(24612 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java 2012-07-06 20:23:46 UTC (rev 24612)
+ protected void connectHandler() throws Exception
+ {
+ this.doConnect();
+ }
- {
- doStart();+ public void onTransition(String phaseName, O object) throws MuleException {
+ doStartHandler(); } }); } + protected void doStartHandler() throws MuleException
+ {
+ doStart();
+ }
+ public final void stop() throws MuleException { lifecycleManager.fireStopPhase(new LifecycleCallback<O>()
branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java
(0 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.lifecycle;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleException;
+import org.mule.api.context.notification.ServerNotification;
+import org.mule.api.lifecycle.LifecycleState;
+import org.mule.api.lifecycle.LifecycleStateEnabled;
+import org.mule.api.lifecycle.Startable;
+import org.mule.context.notification.NotificationException;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PrimaryNodeLifecycleNotificationListenerTestCase extends AbstractMuleTestCase {
+
+ private MuleContext mockMuleContext;
+ private Startable mockStartable;
+ private ServerNotification mockServerNotification;
+ private StartableAndLifecycleState mockStartableAndLifecycleState;
+ private StartableAndLifecycleStateEnabled mockStartableAndLifecycleStateEnabled;
+ private PrimaryNodeLifecycleNotificationListener notificationListener;
+
+ public void setUpTest()
+ {
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartable, mockMuleContext);
+ }
+
+ public void testRegister() throws NotificationException
+ {
+ this.notificationListener.register();
+ verify(mockMuleContext, times(1)).registerListener(notificationListener);
+ }
+
+ public void testUnregister() throws NotificationException
+ {
+ this.notificationListener.unregister();
+ verify(mockMuleContext, times(1)).unregisterListener(notificationListener);
+ }
+
+ public void testOnNotificationWithStartable() throws MuleException {
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartable,times(1)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateStarted() throws MuleException {
+ mockStartableAndLifecycleState = mock(StartableAndLifecycleState.class);
+ when(mockStartableAndLifecycleState.isStarted()).thenReturn(true);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleState,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleState,times(1)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateStopped() throws MuleException {
+ mockStartableAndLifecycleState = mock(StartableAndLifecycleState.class);
+ when(mockStartableAndLifecycleState.isStarted()).thenReturn(false);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleState,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleState,times(0)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateEnabledStarted() throws MuleException {
+ mockStartableAndLifecycleStateEnabled = mock(StartableAndLifecycleStateEnabled.class, Answers.RETURNS_DEEP_STUBS.get());
+ when(mockStartableAndLifecycleStateEnabled.getLifecycleState().isStarted()).thenReturn(true);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleStateEnabled,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleStateEnabled,times(1)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateEnabledStopped() throws MuleException {
+ mockStartableAndLifecycleStateEnabled = mock(StartableAndLifecycleStateEnabled.class, Answers.RETURNS_DEEP_STUBS.get());
+ when(mockStartableAndLifecycleStateEnabled.getLifecycleState().isStarted()).thenReturn(false);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleStateEnabled,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleStateEnabled,times(0)).start();
+ }
+
+ private interface StartableAndLifecycleStateEnabled extends Startable, LifecycleStateEnabled{}
+ private interface StartableAndLifecycleState extends Startable, LifecycleState{}
+}Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+
+ wrapper.initialise();
- when(lifecycleState.isStarted()).thenReturn(false);
- FlowConstruct flowConstruct = mock(FlowConstruct.class);
- when(flowConstruct.getLifecycleState()).thenReturn(lifecycleState);
- wrapper.setMuleContext(muleContext);- wrapper.setFlowConstruct(flowConstruct);+ Mockito.doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((PrimaryNodeLifecycleNotificationListener) invocationOnMock.getArguments()[0]).onNotification(new ClusterNodeNotification("",1));
+ return null;
+ }
+ }).when(muleContext).registerListener(isA(PrimaryNodeLifecycleNotificationListener.class)); - wrapper.onNotification(mock(ClusterNodeNotification.class));
- when(lifecycleState.isStarted()).thenReturn(true);
- FlowConstruct flowConstruct = mock(FlowConstruct.class);
- when(flowConstruct.getLifecycleState()).thenReturn(lifecycleState);
- wrapper.setMuleContext(muleContext);- wrapper.setFlowConstruct(flowConstruct);+ Mockito.doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((PrimaryNodeLifecycleNotificationListener) invocationOnMock.getArguments()[0]).onNotification(new ClusterNodeNotification("",1));
+ return null;
+ }
+ }).when(muleContext).registerListener(isA(PrimaryNodeLifecycleNotificationListener.class)); - wrapper.onNotification(mock(ClusterNodeNotification.class));
-+ wrapper.start();
- verify(muleContext, times(1)).unregisterListener(wrapper);+ verify(muleContext, times(1)).unregisterListener(isA(PrimaryNodeLifecycleNotificationListener.class)); } private interface TestMessageSource extends ClusterizableMessageSource, Lifecycle
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java
(from rev 24564,
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java)
(0 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.transport;
+
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.api.service.Service;
+import org.mule.api.transport.MessageReceiver;
+import org.mule.tck.junit4.AbstractMuleContextTestCase;
+import org.mule.tck.testmodels.fruit.Orange;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public abstract class AbstractMessageReceiverTemplateTestCase extends AbstractMuleContextTestCase
+{
+ protected Service service;
+ protected InboundEndpoint endpoint;
+
+ protected void doSetUp() throws Exception
+ {
+ service = getTestService("orange", Orange.class);
+ endpoint = getEndpoint();
+ }
+
+ public void testCreate() throws Exception
+ {
+ Service service = getTestService("orange", Orange.class);
+ InboundEndpoint endpoint = getTestInboundEndpoint("Test");
+ MessageReceiver receiver = getMessageReceiver();
+
+ assertNotNull(receiver.getEndpoint());
+
+ try
+ {
+ receiver.setEndpoint(null);
+ fail("Provider cannot be set to null");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // expected
+ }
+
+ receiver.setEndpoint(endpoint);
+ assertNotNull(receiver.getEndpoint());
+
+ receiver.dispose();
+ }
+
+ public abstract MessageReceiver getMessageReceiver() throws Exception;
+
+ /**
+ * Implementations of this method should ensure that the correct connector is set
+ * on the endpoint
+ *
+ */
+ public abstract InboundEndpoint getEndpoint() throws Exception;
+}
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
- * $Id$
- * --------------------------------------------------------------------------------------
- * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
- *
- * The software in this package is published under the terms of the CPAL v1.0
- * license, a copy of which has been included with this distribution in the
- * LICENSE.txt file.
- */
-
-package org.mule.transport;
-
-import org.mule.api.endpoint.InboundEndpoint;
-import org.mule.api.service.Service;
-import org.mule.api.transport.MessageReceiver;
-import org.mule.tck.junit4.AbstractMuleContextTestCase;
-import org.mule.tck.testmodels.fruit.Orange;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-public abstract class AbstractMessageReceiverTestCase extends AbstractMuleContextTestCase
-{
- protected Service service;
- protected InboundEndpoint endpoint;
-
- protected void doSetUp() throws Exception
- {
- service = getTestService("orange", Orange.class);
- endpoint = getEndpoint();
- }
-
- public void testCreate() throws Exception
- {
- Service service = getTestService("orange", Orange.class);
- InboundEndpoint endpoint = getTestInboundEndpoint("Test");
- MessageReceiver receiver = getMessageReceiver();
-
- assertNotNull(receiver.getEndpoint());
-
- try
- {
- receiver.setEndpoint(null);
- fail("Provider cannot be set to null");
- }
- catch (IllegalArgumentException e)
- {
- // expected
- }
-
- receiver.setEndpoint(endpoint);
- assertNotNull(receiver.getEndpoint());
-
- receiver.dispose();
- }
-
- public abstract MessageReceiver getMessageReceiver() throws Exception;
-
- /**
- * Implementations of this method should ensure that the correct connector is set
- * on the endpoint
- *
- */
- public abstract InboundEndpoint getEndpoint() throws Exception;
-}
branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm
(24612 => 24613)
--- branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java 2012-07-06 20:23:46 UTC (rev 24612)
+ public void connect() throws Exception {
+ if (muleContext.isPrimaryPollingInstance() || clientId == null)
+ {
+ super.connect();
+ }
+ else
+ {
+ muleContext.registerListener(new ClusterNodeNotificationListener<ClusterNodeNotification>() {
+ public void onNotification(ClusterNodeNotification notification) {
+ try {
+ JmsConnector.this.connect();
+ } catch (Exception e) {
+ throw new MuleRuntimeException(e);
+ }
+ }
+ });
+ }
+ }
+ public void onException(JMSException jmsException) { final JmsConnector jmsConnector = JmsConnector.this;
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+ public boolean shouldConsumeInEveryNode() {
+ return !this.topic;
+ }
- // Create destination Destination dest = jmsSupport.createDestination(session, endpoint);
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+ public boolean shouldConsumeInEveryNode() {
+ return !this.isTopic;
+ }
+ private class SubReceiver implements MessageListener { private final Log subLogger = LogFactory.getLog(getClass());
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+ public boolean shouldConsumeInEveryNode() {
+ return !this.topic;
+ } }
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+ public boolean shouldConsumeInEveryNode() {
+ return !this.topic;
+ }
branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java
(0 => 24613)
--- branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.jms;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+public class MultiConsumerJmsMessageReceiverTest extends AbstractMuleTestCase {
+
+
+ private JmsConnector mockJmsConnector;
+ private FlowConstruct mockFlowConstruct;
+ private InboundEndpoint mockInboundEndpoint;
+
+ public void testTopicReceiverShouldBeStartedOnlyInPrimaryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint,true)).thenReturn(true);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ MultiConsumerJmsMessageReceiver messageReceiver = new MultiConsumerJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(false));
+ }
+
+ public void testQueueReceiverShouldBeStartedInEveryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint,true)).thenReturn(false);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ MultiConsumerJmsMessageReceiver messageReceiver = new MultiConsumerJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(true));
+ }
+
+
+}Property changes on: branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java
(0 => 24613)
--- branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.jms;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+public class XaTransactedJmsMessageReceiverTest extends AbstractMuleTestCase {
+
+
+ private JmsConnector mockJmsConnector;
+ private FlowConstruct mockFlowConstruct;
+ private InboundEndpoint mockInboundEndpoint;
+
+ public void testTopicReceiverShouldBeStartedOnlyInPrimaryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint)).thenReturn(true);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ XaTransactedJmsMessageReceiver messageReceiver = new XaTransactedJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(false));
+ }
+
+ public void testQueueReceiverShouldBeStartedInEveryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint)).thenReturn(false);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ XaTransactedJmsMessageReceiver messageReceiver = new XaTransactedJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(true));
+ }
+
+
+}Property changes on: branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
------------------------------
http://xircles.codehaus.org/manage_email
Pablo La Greca
2012-07-16 21:31:47 UTC
Permalink
Done.
Post by Pablo Kraan
This revision breaks the build of the "Test for Mule Transport Archetype"
module
Also remove the duplicate code at
PrimaryNodeLifecycleNotificationListener#onNotification
Pablo
Post by Daniel Feist
This commit has lots of badly formatted code :-( Please fix it.
Dan
Revision 24613 <http://fisheye.codehaus.org/changelog/mule/?cs=24613>
Author pablo.lagreca Date 2012-07-11 08:58:26 -0500 (Wed, 11 Jul 2012) Log
Message
EE-2770 - in a cluster, only one connection to jms will be stablished if connector has client id. Also, only one topic message receiver will be subscribed
Modified Paths
-
branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java
-
branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java
-
branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm
-
branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java
-
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java
-
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java
-
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java
-
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java
-
branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java
-
branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java
Added Paths
-
branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java
-
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java
-
branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java
-
branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java
Removed Paths
-
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java
Diff
branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java
(0 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.lifecycle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleException;
+import org.mule.api.context.notification.ClusterNodeNotificationListener;
+import org.mule.api.context.notification.ServerNotification;
+import org.mule.api.lifecycle.LifecycleState;
+import org.mule.api.lifecycle.LifecycleStateEnabled;
+import org.mule.api.lifecycle.Startable;
+import org.mule.context.notification.NotificationException;
+
+/**
+ *
+ * This class will start an Startable mule object that must only be started in the primary node.
+ *
+ */
+public class PrimaryNodeLifecycleNotificationListener implements ClusterNodeNotificationListener {
+
+ protected transient Log logger = LogFactory.getLog(getClass());
+ private Startable startMeOnPrimaryNodeNotification;
+ private MuleContext muleContext;
+
+ public PrimaryNodeLifecycleNotificationListener(Startable startMeOnPrimaryNodeNotification, MuleContext muleContext) {
+ this.startMeOnPrimaryNodeNotification = startMeOnPrimaryNodeNotification;
+ this.muleContext = muleContext;
+ }
+
+ public void register()
+ {
+ try
+ {
+ if (muleContext != null)
+ {
+ muleContext.registerListener(this);
+ }
+ }
+ catch (NotificationException e)
+ {
+ throw new RuntimeException("Unable to register listener", e);
+ }
+ }
+
+ public void onNotification(ServerNotification notification) {
+ try
+ {
+ if (startMeOnPrimaryNodeNotification instanceof LifecycleState)
+ {
+ if (((LifecycleState)startMeOnPrimaryNodeNotification).isStarted())
+ {
+ startMeOnPrimaryNodeNotification.start();
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Not starting Startable since it's not in started state");
+ }
+ }
+ }
+ else if (startMeOnPrimaryNodeNotification instanceof LifecycleStateEnabled)
+ {
+ if (((LifecycleStateEnabled)startMeOnPrimaryNodeNotification).getLifecycleState().isStarted())
+ {
+ startMeOnPrimaryNodeNotification.start();
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Not starting Startable since it's not in started state");
+ }
+ }
+ }
+ else
+ {
+ startMeOnPrimaryNodeNotification.start();
+ }
+ }
+ catch (MuleException e)
+ {
+ throw new RuntimeException("Error starting wrapped message source", e);
+ }
+ }
+
+ public void unregister() {
+ muleContext.unregisterListener(this);
+ }
+}Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java
(24612 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java 2012-07-06 20:23:46 UTC (rev 24612)
+ public void start() throws MuleException {
+ if (ClusterizableMessageSourceWrapper.this.isStarted())
+ {
+ ClusterizableMessageSourceWrapper.this.start();
+ }
+ }
+ },muleContext); + primaryNodeLifecycleNotificationListener.register();
- public void onNotification(ServerNotification notification)
- {
- if (flowConstruct != null && flowConstruct.getLifecycleState().isStarted())
- {
- try
- {
- start();
- }
- catch (MuleException e)
- {
- throw new RuntimeException("Error starting wrapped message source", e);
- }
- }
- else
- {
- if (logger.isDebugEnabled())
- {
- logger.debug("Clusterizable message source no started on stopped flow");
- }
- }
- } - protected void registerNotificationListener()
- {
- try
- {
- if (muleContext != null)
- {
- muleContext.registerListener(this);
- }
- }
- catch (NotificationException e)
- {
- throw new RuntimeException("Unable to register listener", e);
- }
- }
-
- protected void unregisterNotificationListener()
- {
- if (muleContext != null)
- {
- muleContext.unregisterListener(this);
- }
- }
branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java
(24612 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+ if (!shouldConsumeInEveryNode() && !flowConstruct.getMuleContext().isPrimaryPollingInstance())
+ {
+ primaryNodeLifecycleNotificationListener = new PrimaryNodeLifecycleNotificationListener(new Startable() {
+ public void start() throws MuleException {
+ if (AbstractMessageReceiver.this.isStarted())
+ {
+ try {
+ AbstractMessageReceiver.this.doConnect();
+ } catch (Exception e) {
+ throw new DefaultMuleException(e);
+ }
+ AbstractMessageReceiver.this.doStart();
+ }
+ }
+ },flowConstruct.getMuleContext());
+ primaryNodeLifecycleNotificationListener.register();
+ }
+ {
+ primaryNodeLifecycleNotificationListener.unregister();
+ /**
+ * Determines whether to start or not the MessageSource base on the running node state.
+ *
+ */
+ public boolean shouldConsumeInEveryNode()
+ {
+ return true;
+ }
+
+ final protected void connectHandler() throws Exception {
+ if (shouldConsumeInEveryNode() || getFlowConstruct().getMuleContext().isPrimaryPollingInstance())
+ {
+ if (logger.isInfoEnabled())
+ {
+ logger.info("Connecting clusterizable message receiver");
+ }
+ doConnect();
+ }
+ else
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Clusterizable message receiver not connected on this node");
+ }
+ }
+ }
+
+ final protected void doStartHandler() throws MuleException {
+ if (shouldConsumeInEveryNode() || getFlowConstruct().getMuleContext().isPrimaryPollingInstance())
+ {
+ if (logger.isInfoEnabled())
+ {
+ logger.info("Starting clusterizable message receiver");
+ }
+ doStart();
+ }
+ else
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Clusterizable message receiver not started on this node");
+ }
+ }
+ } }
branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java
(24612 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java 2012-07-06 20:23:46 UTC (rev 24612)
+ protected void connectHandler() throws Exception
+ {
+ this.doConnect();
+ }
- {
- doStart();+ public void onTransition(String phaseName, O object) throws MuleException {
+ doStartHandler(); } }); } + protected void doStartHandler() throws MuleException
+ {
+ doStart();
+ }
+ public final void stop() throws MuleException { lifecycleManager.fireStopPhase(new LifecycleCallback<O>()
branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java
(0 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.lifecycle;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleException;
+import org.mule.api.context.notification.ServerNotification;
+import org.mule.api.lifecycle.LifecycleState;
+import org.mule.api.lifecycle.LifecycleStateEnabled;
+import org.mule.api.lifecycle.Startable;
+import org.mule.context.notification.NotificationException;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PrimaryNodeLifecycleNotificationListenerTestCase extends AbstractMuleTestCase {
+
+ private MuleContext mockMuleContext;
+ private Startable mockStartable;
+ private ServerNotification mockServerNotification;
+ private StartableAndLifecycleState mockStartableAndLifecycleState;
+ private StartableAndLifecycleStateEnabled mockStartableAndLifecycleStateEnabled;
+ private PrimaryNodeLifecycleNotificationListener notificationListener;
+
+ public void setUpTest()
+ {
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartable, mockMuleContext);
+ }
+
+ public void testRegister() throws NotificationException
+ {
+ this.notificationListener.register();
+ verify(mockMuleContext, times(1)).registerListener(notificationListener);
+ }
+
+ public void testUnregister() throws NotificationException
+ {
+ this.notificationListener.unregister();
+ verify(mockMuleContext, times(1)).unregisterListener(notificationListener);
+ }
+
+ public void testOnNotificationWithStartable() throws MuleException {
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartable,times(1)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateStarted() throws MuleException {
+ mockStartableAndLifecycleState = mock(StartableAndLifecycleState.class);
+ when(mockStartableAndLifecycleState.isStarted()).thenReturn(true);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleState,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleState,times(1)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateStopped() throws MuleException {
+ mockStartableAndLifecycleState = mock(StartableAndLifecycleState.class);
+ when(mockStartableAndLifecycleState.isStarted()).thenReturn(false);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleState,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleState,times(0)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateEnabledStarted() throws MuleException {
+ mockStartableAndLifecycleStateEnabled = mock(StartableAndLifecycleStateEnabled.class, Answers.RETURNS_DEEP_STUBS.get());
+ when(mockStartableAndLifecycleStateEnabled.getLifecycleState().isStarted()).thenReturn(true);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleStateEnabled,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleStateEnabled,times(1)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateEnabledStopped() throws MuleException {
+ mockStartableAndLifecycleStateEnabled = mock(StartableAndLifecycleStateEnabled.class, Answers.RETURNS_DEEP_STUBS.get());
+ when(mockStartableAndLifecycleStateEnabled.getLifecycleState().isStarted()).thenReturn(false);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleStateEnabled,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleStateEnabled,times(0)).start();
+ }
+
+ private interface StartableAndLifecycleStateEnabled extends Startable, LifecycleStateEnabled{}
+ private interface StartableAndLifecycleState extends Startable, LifecycleState{}
+}Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+
+ wrapper.initialise();
- when(lifecycleState.isStarted()).thenReturn(false);
- FlowConstruct flowConstruct = mock(FlowConstruct.class);
- when(flowConstruct.getLifecycleState()).thenReturn(lifecycleState);
- wrapper.setMuleContext(muleContext);- wrapper.setFlowConstruct(flowConstruct);+ Mockito.doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((PrimaryNodeLifecycleNotificationListener) invocationOnMock.getArguments()[0]).onNotification(new ClusterNodeNotification("",1));
+ return null;
+ }
+ }).when(muleContext).registerListener(isA(PrimaryNodeLifecycleNotificationListener.class)); - wrapper.onNotification(mock(ClusterNodeNotification.class));
- when(lifecycleState.isStarted()).thenReturn(true);
- FlowConstruct flowConstruct = mock(FlowConstruct.class);
- when(flowConstruct.getLifecycleState()).thenReturn(lifecycleState);
- wrapper.setMuleContext(muleContext);- wrapper.setFlowConstruct(flowConstruct);+ Mockito.doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((PrimaryNodeLifecycleNotificationListener) invocationOnMock.getArguments()[0]).onNotification(new ClusterNodeNotification("",1));
+ return null;
+ }
+ }).when(muleContext).registerListener(isA(PrimaryNodeLifecycleNotificationListener.class)); - wrapper.onNotification(mock(ClusterNodeNotification.class));
-+ wrapper.start();
- verify(muleContext, times(1)).unregisterListener(wrapper);+ verify(muleContext, times(1)).unregisterListener(isA(PrimaryNodeLifecycleNotificationListener.class)); } private interface TestMessageSource extends ClusterizableMessageSource, Lifecycle
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java
(from rev 24564,
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java)
(0 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.transport;
+
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.api.service.Service;
+import org.mule.api.transport.MessageReceiver;
+import org.mule.tck.junit4.AbstractMuleContextTestCase;
+import org.mule.tck.testmodels.fruit.Orange;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public abstract class AbstractMessageReceiverTemplateTestCase extends AbstractMuleContextTestCase
+{
+ protected Service service;
+ protected InboundEndpoint endpoint;
+
+ protected void doSetUp() throws Exception
+ {
+ service = getTestService("orange", Orange.class);
+ endpoint = getEndpoint();
+ }
+
+ public void testCreate() throws Exception
+ {
+ Service service = getTestService("orange", Orange.class);
+ InboundEndpoint endpoint = getTestInboundEndpoint("Test");
+ MessageReceiver receiver = getMessageReceiver();
+
+ assertNotNull(receiver.getEndpoint());
+
+ try
+ {
+ receiver.setEndpoint(null);
+ fail("Provider cannot be set to null");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // expected
+ }
+
+ receiver.setEndpoint(endpoint);
+ assertNotNull(receiver.getEndpoint());
+
+ receiver.dispose();
+ }
+
+ public abstract MessageReceiver getMessageReceiver() throws Exception;
+
+ /**
+ * Implementations of this method should ensure that the correct connector is set
+ * on the endpoint
+ *
+ */
+ public abstract InboundEndpoint getEndpoint() throws Exception;
+}
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
- * $Id$
- * --------------------------------------------------------------------------------------
- * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
- *
- * The software in this package is published under the terms of the CPAL v1.0
- * license, a copy of which has been included with this distribution in the
- * LICENSE.txt file.
- */
-
-package org.mule.transport;
-
-import org.mule.api.endpoint.InboundEndpoint;
-import org.mule.api.service.Service;
-import org.mule.api.transport.MessageReceiver;
-import org.mule.tck.junit4.AbstractMuleContextTestCase;
-import org.mule.tck.testmodels.fruit.Orange;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-public abstract class AbstractMessageReceiverTestCase extends AbstractMuleContextTestCase
-{
- protected Service service;
- protected InboundEndpoint endpoint;
-
- protected void doSetUp() throws Exception
- {
- service = getTestService("orange", Orange.class);
- endpoint = getEndpoint();
- }
-
- public void testCreate() throws Exception
- {
- Service service = getTestService("orange", Orange.class);
- InboundEndpoint endpoint = getTestInboundEndpoint("Test");
- MessageReceiver receiver = getMessageReceiver();
-
- assertNotNull(receiver.getEndpoint());
-
- try
- {
- receiver.setEndpoint(null);
- fail("Provider cannot be set to null");
- }
- catch (IllegalArgumentException e)
- {
- // expected
- }
-
- receiver.setEndpoint(endpoint);
- assertNotNull(receiver.getEndpoint());
-
- receiver.dispose();
- }
-
- public abstract MessageReceiver getMessageReceiver() throws Exception;
-
- /**
- * Implementations of this method should ensure that the correct connector is set
- * on the endpoint
- *
- */
- public abstract InboundEndpoint getEndpoint() throws Exception;
-}
branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm
(24612 => 24613)
--- branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java 2012-07-06 20:23:46 UTC (rev 24612)
+ public void connect() throws Exception {
+ if (muleContext.isPrimaryPollingInstance() || clientId == null)
+ {
+ super.connect();
+ }
+ else
+ {
+ muleContext.registerListener(new ClusterNodeNotificationListener<ClusterNodeNotification>() {
+ public void onNotification(ClusterNodeNotification notification) {
+ try {
+ JmsConnector.this.connect();
+ } catch (Exception e) {
+ throw new MuleRuntimeException(e);
+ }
+ }
+ });
+ }
+ }
+ public void onException(JMSException jmsException) { final JmsConnector jmsConnector = JmsConnector.this;
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+ public boolean shouldConsumeInEveryNode() {
+ return !this.topic;
+ }
- // Create destination Destination dest = jmsSupport.createDestination(session, endpoint);
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+ public boolean shouldConsumeInEveryNode() {
+ return !this.isTopic;
+ }
+ private class SubReceiver implements MessageListener { private final Log subLogger = LogFactory.getLog(getClass());
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+ public boolean shouldConsumeInEveryNode() {
+ return !this.topic;
+ } }
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+ public boolean shouldConsumeInEveryNode() {
+ return !this.topic;
+ }
branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java
(0 => 24613)
--- branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.jms;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+public class MultiConsumerJmsMessageReceiverTest extends AbstractMuleTestCase {
+
+
+ private JmsConnector mockJmsConnector;
+ private FlowConstruct mockFlowConstruct;
+ private InboundEndpoint mockInboundEndpoint;
+
+ public void testTopicReceiverShouldBeStartedOnlyInPrimaryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint,true)).thenReturn(true);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ MultiConsumerJmsMessageReceiver messageReceiver = new MultiConsumerJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(false));
+ }
+
+ public void testQueueReceiverShouldBeStartedInEveryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint,true)).thenReturn(false);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ MultiConsumerJmsMessageReceiver messageReceiver = new MultiConsumerJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(true));
+ }
+
+
+}Property changes on: branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java
(0 => 24613)
--- branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java (rev 0)
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.jms;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+public class XaTransactedJmsMessageReceiverTest extends AbstractMuleTestCase {
+
+
+ private JmsConnector mockJmsConnector;
+ private FlowConstruct mockFlowConstruct;
+ private InboundEndpoint mockInboundEndpoint;
+
+ public void testTopicReceiverShouldBeStartedOnlyInPrimaryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint)).thenReturn(true);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ XaTransactedJmsMessageReceiver messageReceiver = new XaTransactedJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(false));
+ }
+
+ public void testQueueReceiverShouldBeStartedInEveryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint)).thenReturn(false);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ XaTransactedJmsMessageReceiver messageReceiver = new XaTransactedJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(true));
+ }
+
+
+}Property changes on: branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java
(24612 => 24613)
--- branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
------------------------------
http://xircles.codehaus.org/manage_email
Daniel Feist
2012-07-16 21:55:32 UTC
Permalink
How about the formatting?
Done.
This revision breaks the build of the "Test for Mule Transport Archetype" module
Also remove the duplicate code at PrimaryNodeLifecycleNotificationListener#onNotification
Pablo
This commit has lots of badly formatted code :-( Please fix it.
Dan
Revision
24613
Author
pablo.lagreca
Date
2012-07-11 08:58:26 -0500 (Wed, 11 Jul 2012)
Log Message
EE-2770 - in a cluster, only one connection to jms will be stablished if connector has client id. Also, only one topic message receiver will be subscribed
Modified Paths
branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java
branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java
branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java
branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java
branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm
branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java
branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java
branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java
branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java
branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java
branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java
branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java
branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java
branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java
branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java
Added Paths
branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java
branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java
branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java
branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java
Removed Paths
branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java
Diff
Added: branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java (0 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -0,0 +1,100 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.lifecycle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleException;
+import org.mule.api.context.notification.ClusterNodeNotificationListener;
+import org.mule.api.context.notification.ServerNotification;
+import org.mule.api.lifecycle.LifecycleState;
+import org.mule.api.lifecycle.LifecycleStateEnabled;
+import org.mule.api.lifecycle.Startable;
+import org.mule.context.notification.NotificationException;
+
+/**
+ *
+ * This class will start an Startable mule object that must only be started in the primary node.
+ *
+ */
+public class PrimaryNodeLifecycleNotificationListener implements ClusterNodeNotificationListener {
+
+ protected transient Log logger = LogFactory.getLog(getClass());
+ private Startable startMeOnPrimaryNodeNotification;
+ private MuleContext muleContext;
+
+ public PrimaryNodeLifecycleNotificationListener(Startable startMeOnPrimaryNodeNotification, MuleContext muleContext) {
+ this.startMeOnPrimaryNodeNotification = startMeOnPrimaryNodeNotification;
+ this.muleContext = muleContext;
+ }
+
+ public void register()
+ {
+ try
+ {
+ if (muleContext != null)
+ {
+ muleContext.registerListener(this);
+ }
+ }
+ catch (NotificationException e)
+ {
+ throw new RuntimeException("Unable to register listener", e);
+ }
+ }
+
+ public void onNotification(ServerNotification notification) {
+ try
+ {
+ if (startMeOnPrimaryNodeNotification instanceof LifecycleState)
+ {
+ if (((LifecycleState)startMeOnPrimaryNodeNotification).isStarted())
+ {
+ startMeOnPrimaryNodeNotification.start();
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Not starting Startable since it's not in started state");
+ }
+ }
+ }
+ else if (startMeOnPrimaryNodeNotification instanceof LifecycleStateEnabled)
+ {
+ if (((LifecycleStateEnabled)startMeOnPrimaryNodeNotification).getLifecycleState().isStarted())
+ {
+ startMeOnPrimaryNodeNotification.start();
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Not starting Startable since it's not in started state");
+ }
+ }
+ }
+ else
+ {
+ startMeOnPrimaryNodeNotification.start();
+ }
+ }
+ catch (MuleException e)
+ {
+ throw new RuntimeException("Error starting wrapped message source", e);
+ }
+ }
+
+ public void unregister() {
+ muleContext.unregisterListener(this);
+ }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListener.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java (24612 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/source/ClusterizableMessageSourceWrapper.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -15,8 +15,6 @@
import org.mule.api.construct.FlowConstruct;
import org.mule.api.construct.FlowConstructAware;
import org.mule.api.context.MuleContextAware;
-import org.mule.api.context.notification.ClusterNodeNotificationListener;
-import org.mule.api.context.notification.ServerNotification;
import org.mule.api.lifecycle.Disposable;
import org.mule.api.lifecycle.Initialisable;
import org.mule.api.lifecycle.InitialisationException;
@@ -26,21 +24,22 @@
import org.mule.api.processor.MessageProcessor;
import org.mule.api.source.ClusterizableMessageSource;
import org.mule.api.source.MessageSource;
-import org.mule.context.notification.NotificationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.mule.lifecycle.PrimaryNodeLifecycleNotificationListener;
/**
* of the wrapped instance differently depending if the node is primary or not
* inside a cluster. Non clustered nodes are always primary.
*/
-public class ClusterizableMessageSourceWrapper implements MessageSource, Lifecycle, ClusterNodeNotificationListener, MuleContextAware, FlowConstructAware
+public class ClusterizableMessageSourceWrapper implements MessageSource, Lifecycle, MuleContextAware, FlowConstructAware
{
protected static final Log logger = LogFactory.getLog(ClusterizableMessageSourceWrapper.class);
+ private PrimaryNodeLifecycleNotificationListener primaryNodeLifecycleNotificationListener;
private final ClusterizableMessageSource messageSource;
private MuleContext muleContext;
private FlowConstruct flowConstruct;
@@ -68,8 +67,18 @@
@Override
public void initialise() throws InitialisationException
{
- registerNotificationListener();
+ primaryNodeLifecycleNotificationListener = new PrimaryNodeLifecycleNotificationListener(new Startable() {
+ public void start() throws MuleException {
+ if (ClusterizableMessageSourceWrapper.this.isStarted())
+ {
+ ClusterizableMessageSourceWrapper.this.start();
+ }
+ }
+ },muleContext);
+ primaryNodeLifecycleNotificationListener.register();
+
if (messageSource instanceof Initialisable)
{
((Initialisable) messageSource).initialise();
@@ -92,8 +101,6 @@
logger.info("Starting clusterizable message source");
}
((Startable) messageSource).start();
-
- started = true;
}
else
{
@@ -103,6 +110,7 @@
}
}
}
+ started = true;
}
}
}
@@ -131,55 +139,10 @@
((Disposable) messageSource).dispose();
}
- unregisterNotificationListener();
+ primaryNodeLifecycleNotificationListener.unregister();
}
- public void onNotification(ServerNotification notification)
- {
- if (flowConstruct != null && flowConstruct.getLifecycleState().isStarted())
- {
- try
- {
- start();
- }
- catch (MuleException e)
- {
- throw new RuntimeException("Error starting wrapped message source", e);
- }
- }
- else
- {
- if (logger.isDebugEnabled())
- {
- logger.debug("Clusterizable message source no started on stopped flow");
- }
- }
- }
- protected void registerNotificationListener()
- {
- try
- {
- if (muleContext != null)
- {
- muleContext.registerListener(this);
- }
- }
- catch (NotificationException e)
- {
- throw new RuntimeException("Unable to register listener", e);
- }
- }
-
- protected void unregisterNotificationListener()
- {
- if (muleContext != null)
- {
- muleContext.unregisterListener(this);
- }
- }
-
@Override
public void setFlowConstruct(FlowConstruct flowConstruct)
{
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java (24612 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -14,6 +14,7 @@
import org.mule.MessageExchangePattern;
import org.mule.OptimizedRequestContext;
import org.mule.ResponseOutputStream;
+import org.mule.api.DefaultMuleException;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
@@ -25,6 +26,7 @@
import org.mule.api.endpoint.InboundEndpoint;
import org.mule.api.lifecycle.CreateException;
import org.mule.api.lifecycle.InitialisationException;
+import org.mule.api.lifecycle.Startable;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.routing.filter.FilterUnacceptedException;
import org.mule.api.transaction.Transaction;
@@ -34,6 +36,7 @@
import org.mule.api.transport.PropertyScope;
import org.mule.api.transport.ReplyToHandler;
import org.mule.context.notification.EndpointMessageNotification;
+import org.mule.lifecycle.PrimaryNodeLifecycleNotificationListener;
import org.mule.session.DefaultMuleSession;
import org.mule.session.LegacySessionHandler;
import org.mule.transaction.TransactionCoordination;
@@ -82,6 +85,7 @@
protected List<Transformer> defaultResponseTransformers;
protected ReplyToHandler replyToHandler;
+ private PrimaryNodeLifecycleNotificationListener primaryNodeLifecycleNotificationListener;
/**
* Creates the Message Receiver
@@ -135,7 +139,26 @@
defaultResponseTransformers = connector.getDefaultResponseTransformers(endpoint);
replyToHandler = getReplyToHandler();
-
+
+ if (!shouldConsumeInEveryNode() && !flowConstruct.getMuleContext().isPrimaryPollingInstance())
+ {
+ primaryNodeLifecycleNotificationListener = new PrimaryNodeLifecycleNotificationListener(new Startable() {
+ public void start() throws MuleException {
+ if (AbstractMessageReceiver.this.isStarted())
+ {
+ try {
+ AbstractMessageReceiver.this.doConnect();
+ } catch (Exception e) {
+ throw new DefaultMuleException(e);
+ }
+ AbstractMessageReceiver.this.doStart();
+ }
+ }
+ },flowConstruct.getMuleContext());
+ primaryNodeLifecycleNotificationListener.register();
+ }
+
super.initialise();
}
@@ -374,6 +397,10 @@
{
this.listener = null;
this.flowConstruct = null;
+ if (primaryNodeLifecycleNotificationListener != null)
+ {
+ primaryNodeLifecycleNotificationListener.unregister();
+ }
super.doDispose();
}
@@ -381,4 +408,50 @@
{
return ((AbstractConnector) endpoint.getConnector()).getReplyToHandler(endpoint);
}
+
+ /**
+ * Determines whether to start or not the MessageSource base on the running node state.
+ *
+ */
+ public boolean shouldConsumeInEveryNode()
+ {
+ return true;
+ }
+
+ final protected void connectHandler() throws Exception {
+ if (shouldConsumeInEveryNode() || getFlowConstruct().getMuleContext().isPrimaryPollingInstance())
+ {
+ if (logger.isInfoEnabled())
+ {
+ logger.info("Connecting clusterizable message receiver");
+ }
+ doConnect();
+ }
+ else
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Clusterizable message receiver not connected on this node");
+ }
+ }
+ }
+
+ final protected void doStartHandler() throws MuleException {
+ if (shouldConsumeInEveryNode() || getFlowConstruct().getMuleContext().isPrimaryPollingInstance())
+ {
+ if (logger.isInfoEnabled())
+ {
+ logger.info("Starting clusterizable message receiver");
+ }
+ doStart();
+ }
+ else
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Clusterizable message receiver not started on this node");
+ }
+ }
+ }
}
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java (24612 => 24613)
--- branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/transport/AbstractTransportMessageHandler.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -215,7 +215,7 @@
logger.debug("Connecting: " + this);
}
- doConnect();
+ connectHandler();
connected.set(true);
if (logger.isDebugEnabled())
@@ -224,6 +224,12 @@
}
}
+ //TODO - This template method belongs to AbstractMessageReceiver. Not possible to move it in mule 3.x - bc compatibility.
+ protected void connectHandler() throws Exception
+ {
+ this.doConnect();
+ }
+
public RetryContext validateConnection(RetryContext retryContext)
{
retryContext.setOk();
@@ -311,13 +317,17 @@
lifecycleManager.fireStartPhase(new LifecycleCallback<O>()
{
- public void onTransition(String phaseName, O object) throws MuleException
- {
- doStart();
+ public void onTransition(String phaseName, O object) throws MuleException {
+ doStartHandler();
}
});
}
+ protected void doStartHandler() throws MuleException
+ {
+ doStart();
+ }
+
public final void stop() throws MuleException
{
lifecycleManager.fireStopPhase(new LifecycleCallback<O>()
Added: branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java (0 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -0,0 +1,109 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.lifecycle;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleException;
+import org.mule.api.context.notification.ServerNotification;
+import org.mule.api.lifecycle.LifecycleState;
+import org.mule.api.lifecycle.LifecycleStateEnabled;
+import org.mule.api.lifecycle.Startable;
+import org.mule.context.notification.NotificationException;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PrimaryNodeLifecycleNotificationListenerTestCase extends AbstractMuleTestCase {
+
+ private MuleContext mockMuleContext;
+ private Startable mockStartable;
+ private ServerNotification mockServerNotification;
+ private StartableAndLifecycleState mockStartableAndLifecycleState;
+ private StartableAndLifecycleStateEnabled mockStartableAndLifecycleStateEnabled;
+ private PrimaryNodeLifecycleNotificationListener notificationListener;
+
+ public void setUpTest()
+ {
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartable, mockMuleContext);
+ }
+
+ public void testRegister() throws NotificationException
+ {
+ this.notificationListener.register();
+ verify(mockMuleContext, times(1)).registerListener(notificationListener);
+ }
+
+ public void testUnregister() throws NotificationException
+ {
+ this.notificationListener.unregister();
+ verify(mockMuleContext, times(1)).unregisterListener(notificationListener);
+ }
+
+ public void testOnNotificationWithStartable() throws MuleException {
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartable,times(1)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateStarted() throws MuleException {
+ mockStartableAndLifecycleState = mock(StartableAndLifecycleState.class);
+ when(mockStartableAndLifecycleState.isStarted()).thenReturn(true);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleState,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleState,times(1)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateStopped() throws MuleException {
+ mockStartableAndLifecycleState = mock(StartableAndLifecycleState.class);
+ when(mockStartableAndLifecycleState.isStarted()).thenReturn(false);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleState,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleState,times(0)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateEnabledStarted() throws MuleException {
+ mockStartableAndLifecycleStateEnabled = mock(StartableAndLifecycleStateEnabled.class, Answers.RETURNS_DEEP_STUBS.get());
+ when(mockStartableAndLifecycleStateEnabled.getLifecycleState().isStarted()).thenReturn(true);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleStateEnabled,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleStateEnabled,times(1)).start();
+ }
+
+ public void testOnNotificationWithLifecycleStateEnabledStopped() throws MuleException {
+ mockStartableAndLifecycleStateEnabled = mock(StartableAndLifecycleStateEnabled.class, Answers.RETURNS_DEEP_STUBS.get());
+ when(mockStartableAndLifecycleStateEnabled.getLifecycleState().isStarted()).thenReturn(false);
+ this.notificationListener = new PrimaryNodeLifecycleNotificationListener(mockStartableAndLifecycleStateEnabled,mockMuleContext);
+ this.notificationListener.onNotification(mockServerNotification);
+ verify(mockStartableAndLifecycleStateEnabled,times(0)).start();
+ }
+
+ private interface StartableAndLifecycleStateEnabled extends Startable, LifecycleStateEnabled{}
+ private interface StartableAndLifecycleState extends Startable, LifecycleState{}
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/lifecycle/PrimaryNodeLifecycleNotificationListenerTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/source/ClusterizableMessageSourceWrapperTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -10,17 +10,21 @@
package org.mule.source;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.mule.api.MuleContext;
-import org.mule.api.construct.FlowConstruct;
import org.mule.api.lifecycle.Lifecycle;
-import org.mule.api.lifecycle.LifecycleState;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.source.ClusterizableMessageSource;
import org.mule.context.notification.ClusterNodeNotification;
+import org.mule.lifecycle.PrimaryNodeLifecycleNotificationListener;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.junit.Test;
@@ -45,6 +49,10 @@
@Test
public void delegatesDispose() throws Exception
{
+ wrapper.setMuleContext(muleContext);
+
+ wrapper.initialise();
+
wrapper.dispose();
verify(messageSource, times(1)).dispose();
@@ -85,7 +93,7 @@
wrapper.initialise();
- verify(muleContext, times(1)).registerListener(wrapper);
+ verify(muleContext, times(1)).registerListener(Mockito.any(PrimaryNodeLifecycleNotificationListener.class));
}
@Test
@@ -114,16 +122,16 @@
public void ignoresMessageSourceOnNotificationIfFlowIsStopped() throws Exception
{
when(muleContext.isPrimaryPollingInstance()).thenReturn(true);
- LifecycleState lifecycleState = mock(LifecycleState.class);
- when(lifecycleState.isStarted()).thenReturn(false);
- FlowConstruct flowConstruct = mock(FlowConstruct.class);
- when(flowConstruct.getLifecycleState()).thenReturn(lifecycleState);
-
wrapper.setMuleContext(muleContext);
- wrapper.setFlowConstruct(flowConstruct);
+ Mockito.doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((PrimaryNodeLifecycleNotificationListener) invocationOnMock.getArguments()[0]).onNotification(new ClusterNodeNotification("",1));
+ return null;
+ }
+ }).when(muleContext).registerListener(isA(PrimaryNodeLifecycleNotificationListener.class));
- wrapper.onNotification(mock(ClusterNodeNotification.class));
-
+ wrapper.initialise();
verify(messageSource, times(0)).start();
}
@@ -131,16 +139,17 @@
public void startsMessageSourceOnNotificationIfFlowIsStarted() throws Exception
{
when(muleContext.isPrimaryPollingInstance()).thenReturn(true);
- LifecycleState lifecycleState = mock(LifecycleState.class);
- when(lifecycleState.isStarted()).thenReturn(true);
- FlowConstruct flowConstruct = mock(FlowConstruct.class);
- when(flowConstruct.getLifecycleState()).thenReturn(lifecycleState);
-
wrapper.setMuleContext(muleContext);
- wrapper.setFlowConstruct(flowConstruct);
+ Mockito.doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((PrimaryNodeLifecycleNotificationListener) invocationOnMock.getArguments()[0]).onNotification(new ClusterNodeNotification("",1));
+ return null;
+ }
+ }).when(muleContext).registerListener(isA(PrimaryNodeLifecycleNotificationListener.class));
- wrapper.onNotification(mock(ClusterNodeNotification.class));
-
+ wrapper.start();
+ wrapper.initialise();
verify(messageSource, times(1)).start();
}
@@ -160,10 +169,9 @@
public void unregistersListenerOnDispose() throws Exception
{
wrapper.setMuleContext(muleContext);
-
+ wrapper.initialise();
wrapper.dispose();
-
- verify(muleContext, times(1)).unregisterListener(wrapper);
+ verify(muleContext, times(1)).unregisterListener(isA(PrimaryNodeLifecycleNotificationListener.class));
}
private interface TestMessageSource extends ClusterizableMessageSource, Lifecycle
Copied: branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java (from rev 24564, branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java) (0 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -0,0 +1,69 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.transport;
+
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.api.service.Service;
+import org.mule.api.transport.MessageReceiver;
+import org.mule.tck.junit4.AbstractMuleContextTestCase;
+import org.mule.tck.testmodels.fruit.Orange;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public abstract class AbstractMessageReceiverTemplateTestCase extends AbstractMuleContextTestCase
+{
+ protected Service service;
+ protected InboundEndpoint endpoint;
+
+ protected void doSetUp() throws Exception
+ {
+ service = getTestService("orange", Orange.class);
+ endpoint = getEndpoint();
+ }
+
+ public void testCreate() throws Exception
+ {
+ Service service = getTestService("orange", Orange.class);
+ InboundEndpoint endpoint = getTestInboundEndpoint("Test");
+ MessageReceiver receiver = getMessageReceiver();
+
+ assertNotNull(receiver.getEndpoint());
+
+ try
+ {
+ receiver.setEndpoint(null);
+ fail("Provider cannot be set to null");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // expected
+ }
+
+ receiver.setEndpoint(endpoint);
+ assertNotNull(receiver.getEndpoint());
+
+ receiver.dispose();
+ }
+
+ public abstract MessageReceiver getMessageReceiver() throws Exception;
+
+ /**
+ * Implementations of this method should ensure that the correct connector is set
+ * on the endpoint
+ *
+ */
+ public abstract InboundEndpoint getEndpoint() throws Exception;
+}
Property changes: branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTemplateTestCase.java
Added: svn:keywords
Added: svn:eol-style
Deleted: branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/transport/AbstractMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -1,69 +0,0 @@
-/*
- * $Id$
- * --------------------------------------------------------------------------------------
- * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
- *
- * The software in this package is published under the terms of the CPAL v1.0
- * license, a copy of which has been included with this distribution in the
- * LICENSE.txt file.
- */
-
-package org.mule.transport;
-
-import org.mule.api.endpoint.InboundEndpoint;
-import org.mule.api.service.Service;
-import org.mule.api.transport.MessageReceiver;
-import org.mule.tck.junit4.AbstractMuleContextTestCase;
-import org.mule.tck.testmodels.fruit.Orange;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-public abstract class AbstractMessageReceiverTestCase extends AbstractMuleContextTestCase
-{
- protected Service service;
- protected InboundEndpoint endpoint;
-
- protected void doSetUp() throws Exception
- {
- service = getTestService("orange", Orange.class);
- endpoint = getEndpoint();
- }
-
- public void testCreate() throws Exception
- {
- Service service = getTestService("orange", Orange.class);
- InboundEndpoint endpoint = getTestInboundEndpoint("Test");
- MessageReceiver receiver = getMessageReceiver();
-
- assertNotNull(receiver.getEndpoint());
-
- try
- {
- receiver.setEndpoint(null);
- fail("Provider cannot be set to null");
- }
- catch (IllegalArgumentException e)
- {
- // expected
- }
-
- receiver.setEndpoint(endpoint);
- assertNotNull(receiver.getEndpoint());
-
- receiver.dispose();
- }
-
- public abstract MessageReceiver getMessageReceiver() throws Exception;
-
- /**
- * Implementations of this method should ensure that the correct connector is set
- * on the endpoint
- *
- */
- public abstract InboundEndpoint getEndpoint() throws Exception;
-}
Modified: branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm (24612 => 24613)
--- branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/tools/mule-transport-archetype/src/main/resources/archetype-resources/src/test/java/org/mule/transport/MessageReceiverTestCase.vm 2012-07-11 13:58:26 UTC (rev 24613)
@@ -15,7 +15,7 @@
import org.mule.api.endpoint.InboundEndpoint;
import org.mule.api.service.Service;
import org.mule.api.transport.MessageReceiver;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import com.mockobjects.dynamic.Mock;
Modified: branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/file/src/test/java/org/mule/transport/file/FileMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -14,14 +14,14 @@
import org.mule.api.service.Service;
import org.mule.api.transport.MessageReceiver;
import org.mule.tck.MuleTestUtils;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import org.mule.util.FileUtils;
import com.mockobjects.dynamic.Mock;
import java.io.File;
-public class FileMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class FileMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
File read = FileUtils.newFile("testcasedata/read");
File move = FileUtils.newFile("testcasedata/move");
Modified: branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/http/src/test/java/org/mule/transport/http/HttpMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -16,13 +16,13 @@
import org.mule.api.transport.MessageReceiver;
import org.mule.endpoint.EndpointURIEndpointBuilder;
import org.mule.service.ServiceCompositeMessageSource;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import org.mule.transport.http.transformers.MuleMessageToHttpResponse;
import org.mule.util.CollectionUtils;
import com.mockobjects.dynamic.Mock;
-public class HttpMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class HttpMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
public MessageReceiver getMessageReceiver() throws Exception
{
Modified: branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java (24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsConnector.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -14,6 +14,7 @@
import org.mule.api.MuleException;
import org.mule.api.MuleRuntimeException;
import org.mule.api.construct.FlowConstruct;
+import org.mule.api.context.notification.ClusterNodeNotificationListener;
import org.mule.api.context.notification.ConnectionNotificationListener;
import org.mule.api.endpoint.ImmutableEndpoint;
import org.mule.api.endpoint.InboundEndpoint;
@@ -26,6 +27,7 @@
import org.mule.config.ExceptionHelper;
import org.mule.config.i18n.CoreMessages;
import org.mule.config.i18n.MessageFactory;
+import org.mule.context.notification.ClusterNodeNotification;
import org.mule.context.notification.ConnectionNotification;
import org.mule.context.notification.NotificationException;
import org.mule.routing.MessageFilter;
@@ -450,6 +452,27 @@
return connection;
}
+ public void connect() throws Exception {
+ if (muleContext.isPrimaryPollingInstance() || clientId == null)
+ {
+ super.connect();
+ }
+ else
+ {
+ muleContext.registerListener(new ClusterNodeNotificationListener<ClusterNodeNotification>() {
+ public void onNotification(ClusterNodeNotification notification) {
+ try {
+ JmsConnector.this.connect();
+ } catch (Exception e) {
+ throw new MuleRuntimeException(e);
+ }
+ }
+ });
+ }
+ }
+
public void onException(JMSException jmsException)
{
final JmsConnector jmsConnector = JmsConnector.this;
Modified: branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java (24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/JmsMessageReceiver.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -10,9 +10,7 @@
package org.mule.transport.jms;
-import org.mule.MessageExchangePattern;
import org.mule.api.MuleException;
-import org.mule.api.MuleMessage;
import org.mule.api.MuleRuntimeException;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.endpoint.InboundEndpoint;
@@ -21,7 +19,6 @@
import org.mule.api.transaction.Transaction;
import org.mule.api.transaction.TransactionException;
import org.mule.api.transport.Connector;
-import org.mule.api.transport.ReplyToHandler;
import org.mule.transport.AbstractMessageReceiver;
import org.mule.transport.AbstractReceiverWorker;
import org.mule.transport.ConnectException;
@@ -55,12 +52,14 @@
protected MessageConsumer consumer;
protected Session session;
protected boolean startOnConnect = false;
+ private final boolean topic;
public JmsMessageReceiver(Connector connector, FlowConstruct flowConstruct, InboundEndpoint endpoint)
throws CreateException
{
super(connector, flowConstruct, endpoint);
this.connector = (JmsConnector) connector;
+ topic = this.connector.getTopicResolver().isTopic(endpoint);
try
{
@@ -99,6 +98,11 @@
}
}
+ public boolean shouldConsumeInEveryNode() {
+ return !this.topic;
+ }
+
protected class JmsWorker extends AbstractReceiverWorker
{
public JmsWorker(Message message, AbstractMessageReceiver receiver)
@@ -234,8 +238,6 @@
session = this.connector.getSession(endpoint);
}
- boolean topic = connector.getTopicResolver().isTopic(endpoint);
-
// Create destination
Destination dest = jmsSupport.createDestination(session, endpoint);
Modified: branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java (24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiver.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -10,7 +10,6 @@
package org.mule.transport.jms;
-import org.mule.MessageExchangePattern;
import org.mule.api.MessagingException;
import org.mule.api.MuleException;
import org.mule.api.construct.FlowConstruct;
@@ -21,7 +20,6 @@
import org.mule.api.transaction.Transaction;
import org.mule.api.transaction.TransactionException;
import org.mule.api.transport.Connector;
-import org.mule.api.transport.ReplyToHandler;
import org.mule.transaction.TransactionCollection;
import org.mule.transport.AbstractMessageReceiver;
import org.mule.transport.AbstractReceiverWorker;
@@ -58,6 +56,8 @@
private final JmsConnector jmsConnector;
+ final boolean isTopic;
+
public MultiConsumerJmsMessageReceiver(Connector connector, FlowConstruct flowConstruct, InboundEndpoint endpoint)
throws CreateException
{
@@ -65,7 +65,7 @@
jmsConnector = (JmsConnector) connector;
- final boolean isTopic = jmsConnector.getTopicResolver().isTopic(endpoint, true);
+ isTopic = jmsConnector.getTopicResolver().isTopic(endpoint, true);
if (isTopic && jmsConnector.getNumberOfConsumers() != 1)
{
if (logger.isInfoEnabled())
@@ -152,14 +152,19 @@
}
}
consumers.clear();
- }
+ }
@Override
protected void doDispose()
{
logger.debug("doDispose()");
- }
+ }
+ public boolean shouldConsumeInEveryNode() {
+ return !this.isTopic;
+ }
+
private class SubReceiver implements MessageListener
{
private final Log subLogger = LogFactory.getLog(getClass());
Modified: branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java (24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/TransactedSingleResourceJmsMessageReceiver.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -53,8 +53,9 @@
/** determines whether Multiple receivers are created to improve throughput */
protected boolean useMultipleReceivers = true;
+ private final boolean topic;
-
+
public TransactedSingleResourceJmsMessageReceiver(Connector connector,
FlowConstruct flowConstruct,
InboundEndpoint endpoint) throws CreateException
@@ -63,7 +64,7 @@
super(connector, flowConstruct, endpoint);
this.connector = (JmsConnector) connector;
-
+ topic = this.connector.getTopicResolver().isTopic(endpoint);
// TODO check which properties being set in the TransecteJmsMessage receiver
// are needed...
@@ -97,8 +98,6 @@
}
// Create destination
- boolean topic = connector.getTopicResolver().isTopic(endpoint);
-
Destination dest = jmsSupport.createDestination(session, endpoint);
// Extract jms selector
String selector = null;
@@ -283,4 +282,9 @@
routeMessage(messageToRoute);
}
}
+
+ public boolean shouldConsumeInEveryNode() {
+ return !this.topic;
+ }
}
Modified: branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java (24612 => 24613)
--- branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/jms/src/main/java/org/mule/transport/jms/XaTransactedJmsMessageReceiver.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -49,7 +49,13 @@
protected final ThreadContextLocal context = new ThreadContextLocal();
protected final long timeout;
private final AtomicReference<RedeliveryHandler> redeliveryHandler = new AtomicReference<RedeliveryHandler>();
+ private final boolean topic;
+ public boolean shouldConsumeInEveryNode() {
+ return !this.topic;
+ }
+
/**
* Holder receiving the session and consumer for this thread.
*/
@@ -105,7 +111,7 @@
this.reuseSession);
// Do extra validation, XA Topic & reuse are incompatible. See MULE-2622
- boolean topic = this.connector.getTopicResolver().isTopic(getEndpoint());
+ topic = this.connector.getTopicResolver().isTopic(getEndpoint());
if (topic && (reuseConsumer || reuseSession))
{
logger.warn("Destination " + getEndpoint().getEndpointURI() + " is a topic and XA transaction was " +
Added: branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java (0 => 24613)
--- branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java (rev 0)
+++ branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -0,0 +1,55 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.jms;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+public class MultiConsumerJmsMessageReceiverTest extends AbstractMuleTestCase {
+
+
+ private JmsConnector mockJmsConnector;
+ private FlowConstruct mockFlowConstruct;
+ private InboundEndpoint mockInboundEndpoint;
+
+ public void testTopicReceiverShouldBeStartedOnlyInPrimaryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint,true)).thenReturn(true);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ MultiConsumerJmsMessageReceiver messageReceiver = new MultiConsumerJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(false));
+ }
+
+ public void testQueueReceiverShouldBeStartedInEveryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint,true)).thenReturn(false);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ MultiConsumerJmsMessageReceiver messageReceiver = new MultiConsumerJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(true));
+ }
+
+
+}
Property changes on: branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/MultiConsumerJmsMessageReceiverTest.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java (0 => 24613)
--- branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java (rev 0)
+++ branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -0,0 +1,55 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.jms;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+public class XaTransactedJmsMessageReceiverTest extends AbstractMuleTestCase {
+
+
+ private JmsConnector mockJmsConnector;
+ private FlowConstruct mockFlowConstruct;
+ private InboundEndpoint mockInboundEndpoint;
+
+ public void testTopicReceiverShouldBeStartedOnlyInPrimaryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint)).thenReturn(true);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ XaTransactedJmsMessageReceiver messageReceiver = new XaTransactedJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(false));
+ }
+
+ public void testQueueReceiverShouldBeStartedInEveryNode() throws Exception
+ {
+ when(mockJmsConnector.getTopicResolver().isTopic(mockInboundEndpoint)).thenReturn(false);
+ when(mockInboundEndpoint.getConnector()).thenReturn(mockJmsConnector);
+ XaTransactedJmsMessageReceiver messageReceiver = new XaTransactedJmsMessageReceiver(mockJmsConnector, mockFlowConstruct, mockInboundEndpoint);
+ assertThat("receiver must be started only in primary node", messageReceiver.shouldConsumeInEveryNode(), is(true));
+ }
+
+
+}
Property changes on: branches/mule-3.2.x/transports/jms/src/test/java/org/mule/transport/jms/XaTransactedJmsMessageReceiverTest.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/multicast/src/test/java/org/mule/transport/multicast/MulticastMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -15,11 +15,11 @@
import org.mule.api.transport.MessageReceiver;
import org.mule.service.ServiceCompositeMessageSource;
import org.mule.transport.AbstractConnector;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import com.mockobjects.dynamic.Mock;
-public class MulticastMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class MulticastMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
public MessageReceiver getMessageReceiver() throws Exception
{
Modified: branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/rmi/src/test/java/org/mule/transport/rmi/RmiMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -15,7 +15,7 @@
import org.mule.api.lifecycle.InitialisationException;
import org.mule.config.i18n.MessageFactory;
import org.mule.endpoint.EndpointURIEndpointBuilder;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import org.mule.util.concurrent.Latch;
import java.rmi.Naming;
@@ -44,7 +44,7 @@
*
*/
-public class RmiMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class RmiMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
private static Log LOGGER = LogFactory.getLog(RmiMessageReceiverTestCase.class);
Modified: branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/ssl/src/test/java/org/mule/transport/ssl/SslMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -14,11 +14,11 @@
import org.mule.api.service.Service;
import org.mule.api.transport.MessageReceiver;
import org.mule.service.ServiceCompositeMessageSource;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import com.mockobjects.dynamic.Mock;
-public class SslMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class SslMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
public MessageReceiver getMessageReceiver() throws Exception
{
Modified: branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/stdio/src/test/java/org/mule/transport/stdio/StdioMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -15,14 +15,14 @@
import org.mule.api.service.Service;
import org.mule.api.transport.MessageReceiver;
import org.mule.tck.testmodels.fruit.Orange;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-public class StdioMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class StdioMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
@Test
Modified: branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/tcp/src/test/java/org/mule/transport/tcp/TcpMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -15,11 +15,11 @@
import org.mule.api.transport.MessageReceiver;
import org.mule.service.ServiceCompositeMessageSource;
import org.mule.transport.AbstractConnector;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import com.mockobjects.dynamic.Mock;
-public class TcpMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class TcpMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
@Override
Modified: branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/udp/src/test/java/org/mule/transport/udp/UdpMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -16,11 +16,11 @@
import org.mule.api.transport.MessageReceiver;
import org.mule.endpoint.EndpointURIEndpointBuilder;
import org.mule.service.ServiceCompositeMessageSource;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
import com.mockobjects.dynamic.Mock;
-public class UdpMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class UdpMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
@Override
public MessageReceiver getMessageReceiver() throws Exception
Modified: branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java (24612 => 24613)
--- branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java 2012-07-06 20:23:46 UTC (rev 24612)
+++ branches/mule-3.2.x/transports/vm/src/test/java/org/mule/transport/vm/VMMessageReceiverTestCase.java 2012-07-11 13:58:26 UTC (rev 24613)
@@ -12,9 +12,9 @@
import org.mule.api.endpoint.InboundEndpoint;
import org.mule.api.transport.MessageReceiver;
-import org.mule.transport.AbstractMessageReceiverTestCase;
+import org.mule.transport.AbstractMessageReceiverTemplateTestCase;
-public class VMMessageReceiverTestCase extends AbstractMessageReceiverTestCase
+public class VMMessageReceiverTestCase extends AbstractMessageReceiverTemplateTestCase
{
VMMessageReceiver receiver;
http://xircles.codehaus.org/manage_email
Loading...