Daniel Feist
2012-02-03 02:57:40 UTC
This doesn't need to be a InterceptingMessageProcessor, can just implement MessageProcessor and should be if possible. Also is there a more specific package this could go in rather than just org/mule/processor, there should be.. You might consider simply calling it Foreach end up putting it along other processors without this suffix.
Are Maps supported?
Dan
Are Maps supported?
Dan
Revision
23794
Author
svacas
Date
2012-02-01 17:52:38 -0600 (Wed, 01 Feb 2012)
Log Message
MULE-5914: Implement <foreach> message processor
Modified Paths
branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/AbstractMessageSequenceSplitter.java
branches/mule-3.x/core/src/main/java/org/mule/util/StringMessageUtils.java
branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java
branches/mule-3.x/modules/spring-config/src/main/resources/META-INF/mule.xsd
Added Paths
branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java
branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java
branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java
branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java
branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml
Diff
Added: branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java (0 => 23794)
--- branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,153 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.processor;
+
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.lifecycle.Initialisable;
+import org.mule.api.lifecycle.InitialisationException;
+import org.mule.api.processor.InterceptingMessageProcessor;
+import org.mule.api.processor.MessageProcessor;
+import org.mule.expression.ExpressionConfig;
+import org.mule.processor.chain.DefaultMessageProcessorChainBuilder;
+import org.mule.routing.CollectionSplitter;
+import org.mule.routing.ExpressionSplitter;
+import org.mule.routing.outbound.AbstractMessageSequenceSplitter;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The <code>Foreach</code> MessageProcessor allows iterating over a collection
+ * payload, or any collection obtained by an expression, generating a message for
+ * each element.
+ * <p>
+ * The number of the message being processed is stored in
+ * <code>#[variable:counter]</code> and the root message is store in
+ * <code>#[variable:rootMessage]</code>. Both variables may be renamed by means of
+ * <p>
+ * Defining a groupSize greater than one, allows iterating over collections of
+ * elements of the specified size.
+ * <p>
+ * to foreach.
+ */
+public class ForeachMessageProcessor extends AbstractMessageProcessorOwner implements Initialisable, InterceptingMessageProcessor
+{
+
+ public static final String ROOT_MESSAGE_PROPERTY = "rootMessage";
+ public static final String COUNTER_PROPERTY = "counter";
+
+ protected Log logger = LogFactory.getLog(getClass());
+
+ private List<MessageProcessor> messageProcessors;
+ private MessageProcessor ownedMessageProcessor;
+ private AbstractMessageSequenceSplitter splitter;
+ private MessageProcessor next;
+ private String expression;
+ private int groupSize;
+ private String rootMessageVariableName;
+ private String counterVariableName;
+
+ public MuleEvent process(MuleEvent event) throws MuleException
+ {
+ String parentMessageProp = rootMessageVariableName != null ? rootMessageVariableName : ROOT_MESSAGE_PROPERTY;
+ event.getMessage().setInvocationProperty(parentMessageProp, event.getMessage());
+ ownedMessageProcessor.process(event);
+ return processNext(event);
+ }
+
+ protected MuleEvent processNext(MuleEvent event) throws MuleException
+ {
+ if (next == null)
+ {
+ return event;
+ }
+ else
+ {
+ return next.process(event);
+ }
+ }
+
+ protected List<MessageProcessor> getOwnedMessageProcessors()
+ {
+ return messageProcessors;
+ }
+
+ public void setListener(MessageProcessor listener)
+ {
+ next = listener;
+ }
+
+ public void setMessageProcessors(List<MessageProcessor> messageProcessors) throws MuleException
+ {
+ this.messageProcessors = messageProcessors;
+ }
+
+ public void initialise() throws InitialisationException
+ {
+ if (expression != null)
+ {
+ ExpressionConfig config = new ExpressionConfig();
+ config.setExpression(expression);
+ splitter = new ExpressionSplitter(config);
+ }
+ else
+ {
+ splitter = new CollectionSplitter();
+ }
+ splitter.setGroupSize(groupSize);
+ splitter.setCounterVariableName(counterVariableName);
+ splitter.setMuleContext(muleContext);
+ messageProcessors.add(0, splitter);
+
+ try
+ {
+ this.ownedMessageProcessor = new DefaultMessageProcessorChainBuilder().chain(messageProcessors).build();
+ }
+ catch (MuleException e)
+ {
+ throw new InitialisationException(e, this);
+ }
+ super.initialise();
+ }
+
+ public void setExpression(String expression)
+ {
+ this.expression = expression;
+ }
+
+ public void setGroupSize(int groupSize)
+ {
+ this.groupSize = groupSize;
+ }
+
+ public void setRootMessageVariableName(String rootMessageVariableName)
+ {
+ this.rootMessageVariableName = rootMessageVariableName;
+ }
+
+ public void setCounterVariableName(String counterVariableName)
+ {
+ this.counterVariableName = counterVariableName;
+ }
+
+}
+
+
Property changes on: branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/AbstractMessageSequenceSplitter.java (23793 => 23794)
--- branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/AbstractMessageSequenceSplitter.java 2012-02-01 21:09:30 UTC (rev 23793)
+++ branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/AbstractMessageSequenceSplitter.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -10,9 +10,6 @@
package org.mule.routing.outbound;
-import java.util.ArrayList;
-import java.util.List;
-
import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.RequestContext;
@@ -29,6 +26,9 @@
import org.mule.routing.DefaultRouterResultsHandler;
import org.mule.routing.MessageSequence;
+import java.util.ArrayList;
+import java.util.List;
+
/**
@@ -49,6 +49,8 @@
protected RouterResultsHandler resultsHandler = new DefaultRouterResultsHandler();
protected CorrelationMode enableCorrelation = CorrelationMode.IF_NOT_SET;
protected MessageInfoMapping messageInfoMapping;
+ protected int groupSize;
+ protected String counterVariableName;
public final MuleEvent process(MuleEvent event) throws MuleException
{
@@ -95,14 +97,22 @@
String correlationId = messageInfoMapping.getCorrelationId(originalEvent.getMessage());
List<MuleEvent> resultEvents = new ArrayList<MuleEvent>();
int correlationSequence = 0;
- int count = seq.size();
+ MessageSequence<?> messageSequence = seq;
+ if (groupSize > 1)
+ {
+ messageSequence = new PartitionedMessageSequence(seq, groupSize);
+ }
+ int count = messageSequence.size();
MuleEvent currentEvent = originalEvent;
- for (; seq.hasNext();)
+ for (; messageSequence.hasNext();)
{
- Object payload = seq.next();
+ Object payload = messageSequence.next();
MuleMessage message = createMessage(payload, originalEvent.getMessage());
correlationSequence++;
-
+ if (counterVariableName != null)
+ {
+ message.setInvocationProperty(counterVariableName, correlationSequence);
+ }
if (enableCorrelation != CorrelationMode.NEVER)
{
boolean correlationSet = message.getCorrelationId() != null;
@@ -159,4 +169,17 @@
{
this.messageInfoMapping = messageInfoMapping;
}
+
+ /**
+ * Split the elements in groups of the specified size
+ */
+ public void setGroupSize(int groupSize)
+ {
+ this.groupSize = groupSize;
+ }
+
+ public void setCounterVariableName(String counterVariableName)
+ {
+ this.counterVariableName = counterVariableName;
+ }
}
\ No newline at end of file
Added: branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java (0 => 23794)
--- branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,84 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.routing.outbound;
+
+import org.mule.api.MuleRuntimeException;
+import org.mule.config.i18n.MessageFactory;
+import org.mule.routing.MessageSequence;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.NoSuchElementException;
+
+/**
+ * collections of the specified size.
+ */
+public class PartitionedMessageSequence<Q> implements MessageSequence<Collection<Q>>
+{
+ private MessageSequence<Q> delegate;
+ private int groupSize;
+
+ public PartitionedMessageSequence(MessageSequence<Q> seq, int groupSize)
+ {
+ if (groupSize <= 1)
+ {
+ throw new MuleRuntimeException(MessageFactory.createStaticMessage("group size must be greater than 1"));
+ }
+ this.delegate = seq;
+ this.groupSize = groupSize;
+ }
+
+ public int size()
+ {
+ return (delegate.size() / groupSize) + ((delegate.size() % groupSize) > 0 ? 1 : 0);
+ }
+
+ public boolean hasNext()
+ {
+ return delegate.hasNext();
+ }
+
+ public Collection<Q> next()
+ {
+ if (!delegate.hasNext())
+ {
+ throw new NoSuchElementException();
+ }
+ Collection<Q> batch = new ArrayList<Q>();
+ int i = groupSize;
+ while (i > 0 && delegate.hasNext())
+ {
+ batch.add(delegate.next());
+ i--;
+ }
+ return batch;
+ }
+
+ public boolean isEmpty()
+ {
+ return !hasNext();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+}
+
+
Property changes on: branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.x/core/src/main/java/org/mule/util/StringMessageUtils.java (23793 => 23794)
--- branches/mule-3.x/core/src/main/java/org/mule/util/StringMessageUtils.java 2012-02-01 21:09:30 UTC (rev 23793)
+++ branches/mule-3.x/core/src/main/java/org/mule/util/StringMessageUtils.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -252,6 +252,11 @@
for (Object name : names)
{
Object value = m.getProperty(name.toString(), scope);
+ // avoid calling toString recursively on MuleMessages
+ if (value instanceof MuleMessage)
+ {
+ value = "<<<MuleMessage>>>";
+ }
if (name.equals("password") || name.toString().contains("secret") || name.equals("pass"))
{
value = "****";
Added: branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java (0 => 23794)
--- branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java (rev 0)
+++ branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,63 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.routing.outbound;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.mule.tck.size.SmallTest;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.junit.Test;
+
+public class PartitionedMessageSequenceTestCase
+{
+
+ public void wrapCollectionMessageSequence()
+ {
+ Collection<String> group1 = new ArrayList<String>();
+ group1.add("one");
+ group1.add("two");
+ group1.add("three");
+ group1.add("four");
+
+ Collection<String> group2 = new ArrayList<String>();
+ group2.add("five");
+ group2.add("six");
+ group2.add("seven");
+
+ Collection<String> base = new ArrayList<String>();
+ base.addAll(group1);
+ base.addAll(group2);
+
+ CollectionMessageSequence<String> cms = new CollectionMessageSequence<String>(base);
+ int groupSize = group1.size();
+ PartitionedMessageSequence<String> pms = new PartitionedMessageSequence<String>(cms, groupSize);
+ assertEquals(2, pms.size());
+
+ Collection<String> batchItem = (Collection<String>) pms.next();
+ assertEquals(groupSize, batchItem.size());
+ assertTrue(batchItem.containsAll(group1));
+
+ batchItem = (Collection<String>) pms.next();
+ assertEquals(group2.size(), batchItem.size());
+ assertTrue(batchItem.containsAll(group2));
+
+ assertFalse(pms.hasNext());
+ }
+}
+
+
Property changes on: branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java (23793 => 23794)
--- branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java 2012-02-01 21:09:30 UTC (rev 23793)
+++ branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -132,6 +132,7 @@
import org.mule.model.seda.SedaService;
import org.mule.object.PrototypeObjectFactory;
import org.mule.object.SingletonObjectFactory;
+import org.mule.processor.ForeachMessageProcessor;
import org.mule.processor.IdempotentRedeliveryPolicy;
import org.mule.processor.InvokerMessageProcessor;
import org.mule.processor.NullMessageProcessor;
@@ -219,6 +220,7 @@
import org.mule.util.store.InMemoryObjectStore;
import org.mule.util.store.ManagedObjectStore;
import org.mule.util.store.TextFileObjectStore;
+
import org.springframework.beans.factory.xml.BeanDefinitionParser;
/**
@@ -511,6 +513,8 @@
registerBeanDefinitionParser("map-splitter", new SplitterDefinitionParser(MapSplitter.class));
registerBeanDefinitionParser("message-chunk-splitter", new SplitterDefinitionParser(MessageChunkSplitter.class));
registerBeanDefinitionParser("custom-splitter", new SplitterDefinitionParser());
+ registerMuleBeanDefinitionParser("foreach", new ChildDefinitionParser("messageProcessor", ForeachMessageProcessor.class)).addAlias("rootMessage",
+ "rootMessageVariableName").addAlias("counter", "counterVariableName");
// Routing: Routing Message Processors
Modified: branches/mule-3.x/modules/spring-config/src/main/resources/META-INF/mule.xsd (23793 => 23794)
--- branches/mule-3.x/modules/spring-config/src/main/resources/META-INF/mule.xsd 2012-02-01 21:09:30 UTC (rev 23793)
+++ branches/mule-3.x/modules/spring-config/src/main/resources/META-INF/mule.xsd 2012-02-01 23:52:38 UTC (rev 23794)
@@ -5641,6 +5641,41 @@
</xsd:complexContent>
</xsd:complexType>
+ <xsd:element name="foreach" type="foreachProcessorType"
+ substitutionGroup="abstract-intercepting-message-processor"/>
+ <xsd:complexType name="foreachProcessorType">
+ <xsd:complexContent>
+ <xsd:extension base="abstractInterceptingMessageProcessorType">
+ <xsd:sequence>
+ <xsd:group ref="messageProcessorOrOutboundEndpoint" minOccurs="1" maxOccurs="unbounded"/>
+ </xsd:sequence>
+ <xsd:attributeGroup ref="optionalExpressionAttributes"/>
+ <xsd:attribute name="groupSize" use="optional" type="xsd:integer">
+ <xsd:annotation>
+ <xsd:documentation>
+ Partitions the collection in subcollections of the specified group size.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="rootMessage" use="optional" default="rootMessage" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation>
+ Property name where the parent message is stored.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="counter" use="optional" default="counter" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation>
+ Property name used to store the number of message being iterated.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ </xsd:extension>
+ </xsd:complexContent>
+ </xsd:complexType>
+
+
<!--==================================-->
<!-- Routing (1-n) Message Processors -->
<!--==================================-->
Added: branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java (0 => 23794)
--- branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java (rev 0)
+++ branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,179 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.test.routing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.mule.DefaultMuleMessage;
+import org.mule.api.MuleMessage;
+import org.mule.module.client.MuleClient;
+import org.mule.tck.junit4.FunctionalTestCase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class ForeachTestCase extends FunctionalTestCase
+{
+
+ private MuleClient client;
+
+ protected void doSetUp() throws Exception
+ {
+ super.doSetUp();
+ client = new MuleClient(muleContext);
+ }
+
+ protected String getConfigResources()
+ {
+ return "foreach-test.xml";
+ }
+
+ public void testDefaultConfiguration() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("julio");
+ payload.add("sosa");
+
+ MuleMessage result = client.send("vm://input-1", payload, null);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(2, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ MuleMessage out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("julio", out.getPayload());
+
+ out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("sosa", out.getPayload());
+ }
+
+ public void testDefaultConfigurationPlusMP() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("syd");
+ payload.add("barrett");
+
+ MuleMessage result = client.send("vm://input-2", payload, null);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(3, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ MuleMessage out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("syd", out.getPayload());
+
+ out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("barrett", out.getPayload());
+
+ }
+
+ public void testDefaultConfigurationExpression() throws Exception
+ {
+ final Collection<String> names = new ArrayList<String>();
+ names.add("residente");
+ names.add("visitante");
+ Map<String, Object> props = new HashMap<String, Object>();
+ props.put("names", names);
+ MuleMessage message = new DefaultMuleMessage("message payload", props, muleContext);
+
+ MuleMessage result = client.send("vm://input-3", message);
+ assertTrue(result.getPayload() instanceof String);
+ assertEquals(names.size(), ((Collection<?>) message.getOutboundProperty("names")).size());
+
+ MuleMessage out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("residente", out.getPayload());
+
+ out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("visitante", out.getPayload());
+ }
+
+ public void testPartitionedConfiguration() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("gulp");
+ payload.add("oktubre");
+ payload.add("un baion");
+ payload.add("bang bang");
+ payload.add("la mosca");
+
+ MuleMessage result = client.send("vm://input-4", payload, null);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(5, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ MuleMessage out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof Collection);
+ Collection<?> outPayload = (Collection<?>) out.getPayload();
+ assertEquals(3, outPayload.size());
+
+ out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof Collection);
+ outPayload = (Collection<?>) out.getPayload();
+ assertEquals(2, outPayload.size());
+ }
+
+ public void testRootMessageConfiguration() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("pyotr");
+ payload.add("ilych");
+ MuleMessage parent = new DefaultMuleMessage(payload, muleContext);
+
+ MuleMessage result = client.send("vm://input-5", parent);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(2, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ assertSame(parent.getPayload(), ((MuleMessage) result.getInboundProperty("parent")).getPayload());
+ }
+
+ public void testCounterConfiguration() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("wolfgang");
+ payload.add("amadeus");
+ payload.add("mozart");
+ MuleMessage parent = new DefaultMuleMessage(payload, muleContext);
+
+ MuleMessage result = client.send("vm://input-6", parent);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(3, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ assertEquals(result.getInboundProperty("msg-last-index"), 3);
+ }
+
+}
+
+
Property changes on: branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml (0 => 23794)
--- branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml (rev 0)
+++ branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<mule xmlns="http://www.mulesoft.org/schema/mule/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:spring="http://www.springframework.org/schema/beans"
+ xmlns:test="http://www.mulesoft.org/schema/mule/test"
+ xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
+ xmlns:script="http://www.mulesoft.org/schema/mule/scripting"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
+ http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
+ http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/3.2/mule-vm.xsd
+ http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/3.2/mule-test.xsd
+ http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd">
+
+ <flow name="minimal-config">
+ <vm:inbound-endpoint path="input-1" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach>
+ <test:component/>
+ <vm:outbound-endpoint path="out"/>
+ </foreach>
+ </flow>
+
+ <flow name="minimal-config-plus-mp">
+ <vm:inbound-endpoint path="input-2" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach>
+ <test:component/>
+ <vm:outbound-endpoint path="out"/>
+ </foreach>
+ <script:transformer>
+ <script:script engine="groovy">
+ payload.add("added after foreach")
+ result = payload
+ </script:script>
+ </script:transformer>
+ </flow>
+
+ <flow name="minimal-config-expression">
+ <vm:inbound-endpoint path="input-3" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach expression="#[header:INBOUND:names]">
+ <test:component/>
+ <vm:outbound-endpoint path="out"/>
+ </foreach>
+ </flow>
+
+ <flow name="partitioned-config">
+ <vm:inbound-endpoint path="input-4" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach groupSize="3">
+ <test:component/>
+ <vm:outbound-endpoint path="out"/>
+ </foreach>
+ </flow>
+
+ <flow name="parent-message-config">
+ <vm:inbound-endpoint path="input-5" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach rootMessage="parent">
+ <test:component/>
+ </foreach>
+ <message-properties-transformer scope="outbound">
+ <add-message-property key="parent" value="#[variable:parent]"/>
+ </message-properties-transformer>
+ </flow>
+
+ <flow name="counter-config">
+ <vm:inbound-endpoint path="input-6" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach counter="index">
+ <test:component/>
+ <message-properties-transformer scope="session">
+ <add-message-property key="msg-last-index" value="#[variable:index]"/>
+ </message-properties-transformer>
+ </foreach>
+ <message-properties-transformer scope="outbound">
+ <add-message-property key="msg-last-index" value="#[header:SESSION:msg-last-index]"/>
+ </message-properties-transformer>
+ </flow>
+
+</mule>
Property changes on: branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
http://xircles.codehaus.org/manage_email
23794
Author
svacas
Date
2012-02-01 17:52:38 -0600 (Wed, 01 Feb 2012)
Log Message
MULE-5914: Implement <foreach> message processor
Modified Paths
branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/AbstractMessageSequenceSplitter.java
branches/mule-3.x/core/src/main/java/org/mule/util/StringMessageUtils.java
branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java
branches/mule-3.x/modules/spring-config/src/main/resources/META-INF/mule.xsd
Added Paths
branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java
branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java
branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java
branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java
branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml
Diff
Added: branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java (0 => 23794)
--- branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,153 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.processor;
+
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.lifecycle.Initialisable;
+import org.mule.api.lifecycle.InitialisationException;
+import org.mule.api.processor.InterceptingMessageProcessor;
+import org.mule.api.processor.MessageProcessor;
+import org.mule.expression.ExpressionConfig;
+import org.mule.processor.chain.DefaultMessageProcessorChainBuilder;
+import org.mule.routing.CollectionSplitter;
+import org.mule.routing.ExpressionSplitter;
+import org.mule.routing.outbound.AbstractMessageSequenceSplitter;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The <code>Foreach</code> MessageProcessor allows iterating over a collection
+ * payload, or any collection obtained by an expression, generating a message for
+ * each element.
+ * <p>
+ * The number of the message being processed is stored in
+ * <code>#[variable:counter]</code> and the root message is store in
+ * <code>#[variable:rootMessage]</code>. Both variables may be renamed by means of
+ * <p>
+ * Defining a groupSize greater than one, allows iterating over collections of
+ * elements of the specified size.
+ * <p>
+ * to foreach.
+ */
+public class ForeachMessageProcessor extends AbstractMessageProcessorOwner implements Initialisable, InterceptingMessageProcessor
+{
+
+ public static final String ROOT_MESSAGE_PROPERTY = "rootMessage";
+ public static final String COUNTER_PROPERTY = "counter";
+
+ protected Log logger = LogFactory.getLog(getClass());
+
+ private List<MessageProcessor> messageProcessors;
+ private MessageProcessor ownedMessageProcessor;
+ private AbstractMessageSequenceSplitter splitter;
+ private MessageProcessor next;
+ private String expression;
+ private int groupSize;
+ private String rootMessageVariableName;
+ private String counterVariableName;
+
+ public MuleEvent process(MuleEvent event) throws MuleException
+ {
+ String parentMessageProp = rootMessageVariableName != null ? rootMessageVariableName : ROOT_MESSAGE_PROPERTY;
+ event.getMessage().setInvocationProperty(parentMessageProp, event.getMessage());
+ ownedMessageProcessor.process(event);
+ return processNext(event);
+ }
+
+ protected MuleEvent processNext(MuleEvent event) throws MuleException
+ {
+ if (next == null)
+ {
+ return event;
+ }
+ else
+ {
+ return next.process(event);
+ }
+ }
+
+ protected List<MessageProcessor> getOwnedMessageProcessors()
+ {
+ return messageProcessors;
+ }
+
+ public void setListener(MessageProcessor listener)
+ {
+ next = listener;
+ }
+
+ public void setMessageProcessors(List<MessageProcessor> messageProcessors) throws MuleException
+ {
+ this.messageProcessors = messageProcessors;
+ }
+
+ public void initialise() throws InitialisationException
+ {
+ if (expression != null)
+ {
+ ExpressionConfig config = new ExpressionConfig();
+ config.setExpression(expression);
+ splitter = new ExpressionSplitter(config);
+ }
+ else
+ {
+ splitter = new CollectionSplitter();
+ }
+ splitter.setGroupSize(groupSize);
+ splitter.setCounterVariableName(counterVariableName);
+ splitter.setMuleContext(muleContext);
+ messageProcessors.add(0, splitter);
+
+ try
+ {
+ this.ownedMessageProcessor = new DefaultMessageProcessorChainBuilder().chain(messageProcessors).build();
+ }
+ catch (MuleException e)
+ {
+ throw new InitialisationException(e, this);
+ }
+ super.initialise();
+ }
+
+ public void setExpression(String expression)
+ {
+ this.expression = expression;
+ }
+
+ public void setGroupSize(int groupSize)
+ {
+ this.groupSize = groupSize;
+ }
+
+ public void setRootMessageVariableName(String rootMessageVariableName)
+ {
+ this.rootMessageVariableName = rootMessageVariableName;
+ }
+
+ public void setCounterVariableName(String counterVariableName)
+ {
+ this.counterVariableName = counterVariableName;
+ }
+
+}
+
+
Property changes on: branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/AbstractMessageSequenceSplitter.java (23793 => 23794)
--- branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/AbstractMessageSequenceSplitter.java 2012-02-01 21:09:30 UTC (rev 23793)
+++ branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/AbstractMessageSequenceSplitter.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -10,9 +10,6 @@
package org.mule.routing.outbound;
-import java.util.ArrayList;
-import java.util.List;
-
import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.RequestContext;
@@ -29,6 +26,9 @@
import org.mule.routing.DefaultRouterResultsHandler;
import org.mule.routing.MessageSequence;
+import java.util.ArrayList;
+import java.util.List;
+
/**
@@ -49,6 +49,8 @@
protected RouterResultsHandler resultsHandler = new DefaultRouterResultsHandler();
protected CorrelationMode enableCorrelation = CorrelationMode.IF_NOT_SET;
protected MessageInfoMapping messageInfoMapping;
+ protected int groupSize;
+ protected String counterVariableName;
public final MuleEvent process(MuleEvent event) throws MuleException
{
@@ -95,14 +97,22 @@
String correlationId = messageInfoMapping.getCorrelationId(originalEvent.getMessage());
List<MuleEvent> resultEvents = new ArrayList<MuleEvent>();
int correlationSequence = 0;
- int count = seq.size();
+ MessageSequence<?> messageSequence = seq;
+ if (groupSize > 1)
+ {
+ messageSequence = new PartitionedMessageSequence(seq, groupSize);
+ }
+ int count = messageSequence.size();
MuleEvent currentEvent = originalEvent;
- for (; seq.hasNext();)
+ for (; messageSequence.hasNext();)
{
- Object payload = seq.next();
+ Object payload = messageSequence.next();
MuleMessage message = createMessage(payload, originalEvent.getMessage());
correlationSequence++;
-
+ if (counterVariableName != null)
+ {
+ message.setInvocationProperty(counterVariableName, correlationSequence);
+ }
if (enableCorrelation != CorrelationMode.NEVER)
{
boolean correlationSet = message.getCorrelationId() != null;
@@ -159,4 +169,17 @@
{
this.messageInfoMapping = messageInfoMapping;
}
+
+ /**
+ * Split the elements in groups of the specified size
+ */
+ public void setGroupSize(int groupSize)
+ {
+ this.groupSize = groupSize;
+ }
+
+ public void setCounterVariableName(String counterVariableName)
+ {
+ this.counterVariableName = counterVariableName;
+ }
}
\ No newline at end of file
Added: branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java (0 => 23794)
--- branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,84 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.routing.outbound;
+
+import org.mule.api.MuleRuntimeException;
+import org.mule.config.i18n.MessageFactory;
+import org.mule.routing.MessageSequence;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.NoSuchElementException;
+
+/**
+ * collections of the specified size.
+ */
+public class PartitionedMessageSequence<Q> implements MessageSequence<Collection<Q>>
+{
+ private MessageSequence<Q> delegate;
+ private int groupSize;
+
+ public PartitionedMessageSequence(MessageSequence<Q> seq, int groupSize)
+ {
+ if (groupSize <= 1)
+ {
+ throw new MuleRuntimeException(MessageFactory.createStaticMessage("group size must be greater than 1"));
+ }
+ this.delegate = seq;
+ this.groupSize = groupSize;
+ }
+
+ public int size()
+ {
+ return (delegate.size() / groupSize) + ((delegate.size() % groupSize) > 0 ? 1 : 0);
+ }
+
+ public boolean hasNext()
+ {
+ return delegate.hasNext();
+ }
+
+ public Collection<Q> next()
+ {
+ if (!delegate.hasNext())
+ {
+ throw new NoSuchElementException();
+ }
+ Collection<Q> batch = new ArrayList<Q>();
+ int i = groupSize;
+ while (i > 0 && delegate.hasNext())
+ {
+ batch.add(delegate.next());
+ i--;
+ }
+ return batch;
+ }
+
+ public boolean isEmpty()
+ {
+ return !hasNext();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+}
+
+
Property changes on: branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.x/core/src/main/java/org/mule/util/StringMessageUtils.java (23793 => 23794)
--- branches/mule-3.x/core/src/main/java/org/mule/util/StringMessageUtils.java 2012-02-01 21:09:30 UTC (rev 23793)
+++ branches/mule-3.x/core/src/main/java/org/mule/util/StringMessageUtils.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -252,6 +252,11 @@
for (Object name : names)
{
Object value = m.getProperty(name.toString(), scope);
+ // avoid calling toString recursively on MuleMessages
+ if (value instanceof MuleMessage)
+ {
+ value = "<<<MuleMessage>>>";
+ }
if (name.equals("password") || name.toString().contains("secret") || name.equals("pass"))
{
value = "****";
Added: branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java (0 => 23794)
--- branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java (rev 0)
+++ branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,63 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.routing.outbound;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.mule.tck.size.SmallTest;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.junit.Test;
+
+public class PartitionedMessageSequenceTestCase
+{
+
+ public void wrapCollectionMessageSequence()
+ {
+ Collection<String> group1 = new ArrayList<String>();
+ group1.add("one");
+ group1.add("two");
+ group1.add("three");
+ group1.add("four");
+
+ Collection<String> group2 = new ArrayList<String>();
+ group2.add("five");
+ group2.add("six");
+ group2.add("seven");
+
+ Collection<String> base = new ArrayList<String>();
+ base.addAll(group1);
+ base.addAll(group2);
+
+ CollectionMessageSequence<String> cms = new CollectionMessageSequence<String>(base);
+ int groupSize = group1.size();
+ PartitionedMessageSequence<String> pms = new PartitionedMessageSequence<String>(cms, groupSize);
+ assertEquals(2, pms.size());
+
+ Collection<String> batchItem = (Collection<String>) pms.next();
+ assertEquals(groupSize, batchItem.size());
+ assertTrue(batchItem.containsAll(group1));
+
+ batchItem = (Collection<String>) pms.next();
+ assertEquals(group2.size(), batchItem.size());
+ assertTrue(batchItem.containsAll(group2));
+
+ assertFalse(pms.hasNext());
+ }
+}
+
+
Property changes on: branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java (23793 => 23794)
--- branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java 2012-02-01 21:09:30 UTC (rev 23793)
+++ branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -132,6 +132,7 @@
import org.mule.model.seda.SedaService;
import org.mule.object.PrototypeObjectFactory;
import org.mule.object.SingletonObjectFactory;
+import org.mule.processor.ForeachMessageProcessor;
import org.mule.processor.IdempotentRedeliveryPolicy;
import org.mule.processor.InvokerMessageProcessor;
import org.mule.processor.NullMessageProcessor;
@@ -219,6 +220,7 @@
import org.mule.util.store.InMemoryObjectStore;
import org.mule.util.store.ManagedObjectStore;
import org.mule.util.store.TextFileObjectStore;
+
import org.springframework.beans.factory.xml.BeanDefinitionParser;
/**
@@ -511,6 +513,8 @@
registerBeanDefinitionParser("map-splitter", new SplitterDefinitionParser(MapSplitter.class));
registerBeanDefinitionParser("message-chunk-splitter", new SplitterDefinitionParser(MessageChunkSplitter.class));
registerBeanDefinitionParser("custom-splitter", new SplitterDefinitionParser());
+ registerMuleBeanDefinitionParser("foreach", new ChildDefinitionParser("messageProcessor", ForeachMessageProcessor.class)).addAlias("rootMessage",
+ "rootMessageVariableName").addAlias("counter", "counterVariableName");
// Routing: Routing Message Processors
Modified: branches/mule-3.x/modules/spring-config/src/main/resources/META-INF/mule.xsd (23793 => 23794)
--- branches/mule-3.x/modules/spring-config/src/main/resources/META-INF/mule.xsd 2012-02-01 21:09:30 UTC (rev 23793)
+++ branches/mule-3.x/modules/spring-config/src/main/resources/META-INF/mule.xsd 2012-02-01 23:52:38 UTC (rev 23794)
@@ -5641,6 +5641,41 @@
</xsd:complexContent>
</xsd:complexType>
+ <xsd:element name="foreach" type="foreachProcessorType"
+ substitutionGroup="abstract-intercepting-message-processor"/>
+ <xsd:complexType name="foreachProcessorType">
+ <xsd:complexContent>
+ <xsd:extension base="abstractInterceptingMessageProcessorType">
+ <xsd:sequence>
+ <xsd:group ref="messageProcessorOrOutboundEndpoint" minOccurs="1" maxOccurs="unbounded"/>
+ </xsd:sequence>
+ <xsd:attributeGroup ref="optionalExpressionAttributes"/>
+ <xsd:attribute name="groupSize" use="optional" type="xsd:integer">
+ <xsd:annotation>
+ <xsd:documentation>
+ Partitions the collection in subcollections of the specified group size.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="rootMessage" use="optional" default="rootMessage" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation>
+ Property name where the parent message is stored.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="counter" use="optional" default="counter" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation>
+ Property name used to store the number of message being iterated.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ </xsd:extension>
+ </xsd:complexContent>
+ </xsd:complexType>
+
+
<!--==================================-->
<!-- Routing (1-n) Message Processors -->
<!--==================================-->
Added: branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java (0 => 23794)
--- branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java (rev 0)
+++ branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,179 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.test.routing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.mule.DefaultMuleMessage;
+import org.mule.api.MuleMessage;
+import org.mule.module.client.MuleClient;
+import org.mule.tck.junit4.FunctionalTestCase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class ForeachTestCase extends FunctionalTestCase
+{
+
+ private MuleClient client;
+
+ protected void doSetUp() throws Exception
+ {
+ super.doSetUp();
+ client = new MuleClient(muleContext);
+ }
+
+ protected String getConfigResources()
+ {
+ return "foreach-test.xml";
+ }
+
+ public void testDefaultConfiguration() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("julio");
+ payload.add("sosa");
+
+ MuleMessage result = client.send("vm://input-1", payload, null);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(2, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ MuleMessage out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("julio", out.getPayload());
+
+ out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("sosa", out.getPayload());
+ }
+
+ public void testDefaultConfigurationPlusMP() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("syd");
+ payload.add("barrett");
+
+ MuleMessage result = client.send("vm://input-2", payload, null);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(3, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ MuleMessage out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("syd", out.getPayload());
+
+ out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("barrett", out.getPayload());
+
+ }
+
+ public void testDefaultConfigurationExpression() throws Exception
+ {
+ final Collection<String> names = new ArrayList<String>();
+ names.add("residente");
+ names.add("visitante");
+ Map<String, Object> props = new HashMap<String, Object>();
+ props.put("names", names);
+ MuleMessage message = new DefaultMuleMessage("message payload", props, muleContext);
+
+ MuleMessage result = client.send("vm://input-3", message);
+ assertTrue(result.getPayload() instanceof String);
+ assertEquals(names.size(), ((Collection<?>) message.getOutboundProperty("names")).size());
+
+ MuleMessage out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("residente", out.getPayload());
+
+ out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("visitante", out.getPayload());
+ }
+
+ public void testPartitionedConfiguration() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("gulp");
+ payload.add("oktubre");
+ payload.add("un baion");
+ payload.add("bang bang");
+ payload.add("la mosca");
+
+ MuleMessage result = client.send("vm://input-4", payload, null);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(5, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ MuleMessage out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof Collection);
+ Collection<?> outPayload = (Collection<?>) out.getPayload();
+ assertEquals(3, outPayload.size());
+
+ out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof Collection);
+ outPayload = (Collection<?>) out.getPayload();
+ assertEquals(2, outPayload.size());
+ }
+
+ public void testRootMessageConfiguration() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("pyotr");
+ payload.add("ilych");
+ MuleMessage parent = new DefaultMuleMessage(payload, muleContext);
+
+ MuleMessage result = client.send("vm://input-5", parent);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(2, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ assertSame(parent.getPayload(), ((MuleMessage) result.getInboundProperty("parent")).getPayload());
+ }
+
+ public void testCounterConfiguration() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("wolfgang");
+ payload.add("amadeus");
+ payload.add("mozart");
+ MuleMessage parent = new DefaultMuleMessage(payload, muleContext);
+
+ MuleMessage result = client.send("vm://input-6", parent);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(3, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ assertEquals(result.getInboundProperty("msg-last-index"), 3);
+ }
+
+}
+
+
Property changes on: branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml (0 => 23794)
--- branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml (rev 0)
+++ branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<mule xmlns="http://www.mulesoft.org/schema/mule/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:spring="http://www.springframework.org/schema/beans"
+ xmlns:test="http://www.mulesoft.org/schema/mule/test"
+ xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
+ xmlns:script="http://www.mulesoft.org/schema/mule/scripting"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
+ http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
+ http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/3.2/mule-vm.xsd
+ http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/3.2/mule-test.xsd
+ http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd">
+
+ <flow name="minimal-config">
+ <vm:inbound-endpoint path="input-1" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach>
+ <test:component/>
+ <vm:outbound-endpoint path="out"/>
+ </foreach>
+ </flow>
+
+ <flow name="minimal-config-plus-mp">
+ <vm:inbound-endpoint path="input-2" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach>
+ <test:component/>
+ <vm:outbound-endpoint path="out"/>
+ </foreach>
+ <script:transformer>
+ <script:script engine="groovy">
+ payload.add("added after foreach")
+ result = payload
+ </script:script>
+ </script:transformer>
+ </flow>
+
+ <flow name="minimal-config-expression">
+ <vm:inbound-endpoint path="input-3" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach expression="#[header:INBOUND:names]">
+ <test:component/>
+ <vm:outbound-endpoint path="out"/>
+ </foreach>
+ </flow>
+
+ <flow name="partitioned-config">
+ <vm:inbound-endpoint path="input-4" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach groupSize="3">
+ <test:component/>
+ <vm:outbound-endpoint path="out"/>
+ </foreach>
+ </flow>
+
+ <flow name="parent-message-config">
+ <vm:inbound-endpoint path="input-5" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach rootMessage="parent">
+ <test:component/>
+ </foreach>
+ <message-properties-transformer scope="outbound">
+ <add-message-property key="parent" value="#[variable:parent]"/>
+ </message-properties-transformer>
+ </flow>
+
+ <flow name="counter-config">
+ <vm:inbound-endpoint path="input-6" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach counter="index">
+ <test:component/>
+ <message-properties-transformer scope="session">
+ <add-message-property key="msg-last-index" value="#[variable:index]"/>
+ </message-properties-transformer>
+ </foreach>
+ <message-properties-transformer scope="outbound">
+ <add-message-property key="msg-last-index" value="#[header:SESSION:msg-last-index]"/>
+ </message-properties-transformer>
+ </flow>
+
+</mule>
Property changes on: branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
http://xircles.codehaus.org/manage_email