Discussion:
[mule-scm] [mule][23794] branches/mule-3.x: MULE-5914: Implement <foreach> message processor
Daniel Feist
2012-02-03 02:57:40 UTC
Permalink
This doesn't need to be a InterceptingMessageProcessor, can just implement MessageProcessor and should be if possible. Also is there a more specific package this could go in rather than just org/mule/processor, there should be.. You might consider simply calling it Foreach end up putting it along other processors without this suffix.

Are Maps supported?

Dan
Revision
23794
Author
svacas
Date
2012-02-01 17:52:38 -0600 (Wed, 01 Feb 2012)
Log Message
MULE-5914: Implement <foreach> message processor
Modified Paths
branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/AbstractMessageSequenceSplitter.java
branches/mule-3.x/core/src/main/java/org/mule/util/StringMessageUtils.java
branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java
branches/mule-3.x/modules/spring-config/src/main/resources/META-INF/mule.xsd
Added Paths
branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java
branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java
branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java
branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java
branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml
Diff
Added: branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java (0 => 23794)
--- branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,153 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.processor;
+
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.lifecycle.Initialisable;
+import org.mule.api.lifecycle.InitialisationException;
+import org.mule.api.processor.InterceptingMessageProcessor;
+import org.mule.api.processor.MessageProcessor;
+import org.mule.expression.ExpressionConfig;
+import org.mule.processor.chain.DefaultMessageProcessorChainBuilder;
+import org.mule.routing.CollectionSplitter;
+import org.mule.routing.ExpressionSplitter;
+import org.mule.routing.outbound.AbstractMessageSequenceSplitter;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The <code>Foreach</code> MessageProcessor allows iterating over a collection
+ * payload, or any collection obtained by an expression, generating a message for
+ * each element.
+ * <p>
+ * The number of the message being processed is stored in
+ * <code>#[variable:counter]</code> and the root message is store in
+ * <code>#[variable:rootMessage]</code>. Both variables may be renamed by means of
+ * <p>
+ * Defining a groupSize greater than one, allows iterating over collections of
+ * elements of the specified size.
+ * <p>
+ * to foreach.
+ */
+public class ForeachMessageProcessor extends AbstractMessageProcessorOwner implements Initialisable, InterceptingMessageProcessor
+{
+
+ public static final String ROOT_MESSAGE_PROPERTY = "rootMessage";
+ public static final String COUNTER_PROPERTY = "counter";
+
+ protected Log logger = LogFactory.getLog(getClass());
+
+ private List<MessageProcessor> messageProcessors;
+ private MessageProcessor ownedMessageProcessor;
+ private AbstractMessageSequenceSplitter splitter;
+ private MessageProcessor next;
+ private String expression;
+ private int groupSize;
+ private String rootMessageVariableName;
+ private String counterVariableName;
+
+ public MuleEvent process(MuleEvent event) throws MuleException
+ {
+ String parentMessageProp = rootMessageVariableName != null ? rootMessageVariableName : ROOT_MESSAGE_PROPERTY;
+ event.getMessage().setInvocationProperty(parentMessageProp, event.getMessage());
+ ownedMessageProcessor.process(event);
+ return processNext(event);
+ }
+
+ protected MuleEvent processNext(MuleEvent event) throws MuleException
+ {
+ if (next == null)
+ {
+ return event;
+ }
+ else
+ {
+ return next.process(event);
+ }
+ }
+
+ protected List<MessageProcessor> getOwnedMessageProcessors()
+ {
+ return messageProcessors;
+ }
+
+ public void setListener(MessageProcessor listener)
+ {
+ next = listener;
+ }
+
+ public void setMessageProcessors(List<MessageProcessor> messageProcessors) throws MuleException
+ {
+ this.messageProcessors = messageProcessors;
+ }
+
+ public void initialise() throws InitialisationException
+ {
+ if (expression != null)
+ {
+ ExpressionConfig config = new ExpressionConfig();
+ config.setExpression(expression);
+ splitter = new ExpressionSplitter(config);
+ }
+ else
+ {
+ splitter = new CollectionSplitter();
+ }
+ splitter.setGroupSize(groupSize);
+ splitter.setCounterVariableName(counterVariableName);
+ splitter.setMuleContext(muleContext);
+ messageProcessors.add(0, splitter);
+
+ try
+ {
+ this.ownedMessageProcessor = new DefaultMessageProcessorChainBuilder().chain(messageProcessors).build();
+ }
+ catch (MuleException e)
+ {
+ throw new InitialisationException(e, this);
+ }
+ super.initialise();
+ }
+
+ public void setExpression(String expression)
+ {
+ this.expression = expression;
+ }
+
+ public void setGroupSize(int groupSize)
+ {
+ this.groupSize = groupSize;
+ }
+
+ public void setRootMessageVariableName(String rootMessageVariableName)
+ {
+ this.rootMessageVariableName = rootMessageVariableName;
+ }
+
+ public void setCounterVariableName(String counterVariableName)
+ {
+ this.counterVariableName = counterVariableName;
+ }
+
+}
+
+
Property changes on: branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/AbstractMessageSequenceSplitter.java (23793 => 23794)
--- branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/AbstractMessageSequenceSplitter.java 2012-02-01 21:09:30 UTC (rev 23793)
+++ branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/AbstractMessageSequenceSplitter.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -10,9 +10,6 @@
package org.mule.routing.outbound;
-import java.util.ArrayList;
-import java.util.List;
-
import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.RequestContext;
@@ -29,6 +26,9 @@
import org.mule.routing.DefaultRouterResultsHandler;
import org.mule.routing.MessageSequence;
+import java.util.ArrayList;
+import java.util.List;
+
/**
@@ -49,6 +49,8 @@
protected RouterResultsHandler resultsHandler = new DefaultRouterResultsHandler();
protected CorrelationMode enableCorrelation = CorrelationMode.IF_NOT_SET;
protected MessageInfoMapping messageInfoMapping;
+ protected int groupSize;
+ protected String counterVariableName;
public final MuleEvent process(MuleEvent event) throws MuleException
{
@@ -95,14 +97,22 @@
String correlationId = messageInfoMapping.getCorrelationId(originalEvent.getMessage());
List<MuleEvent> resultEvents = new ArrayList<MuleEvent>();
int correlationSequence = 0;
- int count = seq.size();
+ MessageSequence<?> messageSequence = seq;
+ if (groupSize > 1)
+ {
+ messageSequence = new PartitionedMessageSequence(seq, groupSize);
+ }
+ int count = messageSequence.size();
MuleEvent currentEvent = originalEvent;
- for (; seq.hasNext();)
+ for (; messageSequence.hasNext();)
{
- Object payload = seq.next();
+ Object payload = messageSequence.next();
MuleMessage message = createMessage(payload, originalEvent.getMessage());
correlationSequence++;
-
+ if (counterVariableName != null)
+ {
+ message.setInvocationProperty(counterVariableName, correlationSequence);
+ }
if (enableCorrelation != CorrelationMode.NEVER)
{
boolean correlationSet = message.getCorrelationId() != null;
@@ -159,4 +169,17 @@
{
this.messageInfoMapping = messageInfoMapping;
}
+
+ /**
+ * Split the elements in groups of the specified size
+ */
+ public void setGroupSize(int groupSize)
+ {
+ this.groupSize = groupSize;
+ }
+
+ public void setCounterVariableName(String counterVariableName)
+ {
+ this.counterVariableName = counterVariableName;
+ }
}
\ No newline at end of file
Added: branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java (0 => 23794)
--- branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,84 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.routing.outbound;
+
+import org.mule.api.MuleRuntimeException;
+import org.mule.config.i18n.MessageFactory;
+import org.mule.routing.MessageSequence;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.NoSuchElementException;
+
+/**
+ * collections of the specified size.
+ */
+public class PartitionedMessageSequence<Q> implements MessageSequence<Collection<Q>>
+{
+ private MessageSequence<Q> delegate;
+ private int groupSize;
+
+ public PartitionedMessageSequence(MessageSequence<Q> seq, int groupSize)
+ {
+ if (groupSize <= 1)
+ {
+ throw new MuleRuntimeException(MessageFactory.createStaticMessage("group size must be greater than 1"));
+ }
+ this.delegate = seq;
+ this.groupSize = groupSize;
+ }
+
+ public int size()
+ {
+ return (delegate.size() / groupSize) + ((delegate.size() % groupSize) > 0 ? 1 : 0);
+ }
+
+ public boolean hasNext()
+ {
+ return delegate.hasNext();
+ }
+
+ public Collection<Q> next()
+ {
+ if (!delegate.hasNext())
+ {
+ throw new NoSuchElementException();
+ }
+ Collection<Q> batch = new ArrayList<Q>();
+ int i = groupSize;
+ while (i > 0 && delegate.hasNext())
+ {
+ batch.add(delegate.next());
+ i--;
+ }
+ return batch;
+ }
+
+ public boolean isEmpty()
+ {
+ return !hasNext();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+}
+
+
Property changes on: branches/mule-3.x/core/src/main/java/org/mule/routing/outbound/PartitionedMessageSequence.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.x/core/src/main/java/org/mule/util/StringMessageUtils.java (23793 => 23794)
--- branches/mule-3.x/core/src/main/java/org/mule/util/StringMessageUtils.java 2012-02-01 21:09:30 UTC (rev 23793)
+++ branches/mule-3.x/core/src/main/java/org/mule/util/StringMessageUtils.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -252,6 +252,11 @@
for (Object name : names)
{
Object value = m.getProperty(name.toString(), scope);
+ // avoid calling toString recursively on MuleMessages
+ if (value instanceof MuleMessage)
+ {
+ value = "<<<MuleMessage>>>";
+ }
if (name.equals("password") || name.toString().contains("secret") || name.equals("pass"))
{
value = "****";
Added: branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java (0 => 23794)
--- branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java (rev 0)
+++ branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,63 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.routing.outbound;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.mule.tck.size.SmallTest;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.junit.Test;
+
+public class PartitionedMessageSequenceTestCase
+{
+
+ public void wrapCollectionMessageSequence()
+ {
+ Collection<String> group1 = new ArrayList<String>();
+ group1.add("one");
+ group1.add("two");
+ group1.add("three");
+ group1.add("four");
+
+ Collection<String> group2 = new ArrayList<String>();
+ group2.add("five");
+ group2.add("six");
+ group2.add("seven");
+
+ Collection<String> base = new ArrayList<String>();
+ base.addAll(group1);
+ base.addAll(group2);
+
+ CollectionMessageSequence<String> cms = new CollectionMessageSequence<String>(base);
+ int groupSize = group1.size();
+ PartitionedMessageSequence<String> pms = new PartitionedMessageSequence<String>(cms, groupSize);
+ assertEquals(2, pms.size());
+
+ Collection<String> batchItem = (Collection<String>) pms.next();
+ assertEquals(groupSize, batchItem.size());
+ assertTrue(batchItem.containsAll(group1));
+
+ batchItem = (Collection<String>) pms.next();
+ assertEquals(group2.size(), batchItem.size());
+ assertTrue(batchItem.containsAll(group2));
+
+ assertFalse(pms.hasNext());
+ }
+}
+
+
Property changes on: branches/mule-3.x/core/src/test/java/org/mule/routing/outbound/PartitionedMessageSequenceTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java (23793 => 23794)
--- branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java 2012-02-01 21:09:30 UTC (rev 23793)
+++ branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -132,6 +132,7 @@
import org.mule.model.seda.SedaService;
import org.mule.object.PrototypeObjectFactory;
import org.mule.object.SingletonObjectFactory;
+import org.mule.processor.ForeachMessageProcessor;
import org.mule.processor.IdempotentRedeliveryPolicy;
import org.mule.processor.InvokerMessageProcessor;
import org.mule.processor.NullMessageProcessor;
@@ -219,6 +220,7 @@
import org.mule.util.store.InMemoryObjectStore;
import org.mule.util.store.ManagedObjectStore;
import org.mule.util.store.TextFileObjectStore;
+
import org.springframework.beans.factory.xml.BeanDefinitionParser;
/**
@@ -511,6 +513,8 @@
registerBeanDefinitionParser("map-splitter", new SplitterDefinitionParser(MapSplitter.class));
registerBeanDefinitionParser("message-chunk-splitter", new SplitterDefinitionParser(MessageChunkSplitter.class));
registerBeanDefinitionParser("custom-splitter", new SplitterDefinitionParser());
+ registerMuleBeanDefinitionParser("foreach", new ChildDefinitionParser("messageProcessor", ForeachMessageProcessor.class)).addAlias("rootMessage",
+ "rootMessageVariableName").addAlias("counter", "counterVariableName");
// Routing: Routing Message Processors
Modified: branches/mule-3.x/modules/spring-config/src/main/resources/META-INF/mule.xsd (23793 => 23794)
--- branches/mule-3.x/modules/spring-config/src/main/resources/META-INF/mule.xsd 2012-02-01 21:09:30 UTC (rev 23793)
+++ branches/mule-3.x/modules/spring-config/src/main/resources/META-INF/mule.xsd 2012-02-01 23:52:38 UTC (rev 23794)
@@ -5641,6 +5641,41 @@
</xsd:complexContent>
</xsd:complexType>
+ <xsd:element name="foreach" type="foreachProcessorType"
+ substitutionGroup="abstract-intercepting-message-processor"/>
+ <xsd:complexType name="foreachProcessorType">
+ <xsd:complexContent>
+ <xsd:extension base="abstractInterceptingMessageProcessorType">
+ <xsd:sequence>
+ <xsd:group ref="messageProcessorOrOutboundEndpoint" minOccurs="1" maxOccurs="unbounded"/>
+ </xsd:sequence>
+ <xsd:attributeGroup ref="optionalExpressionAttributes"/>
+ <xsd:attribute name="groupSize" use="optional" type="xsd:integer">
+ <xsd:annotation>
+ <xsd:documentation>
+ Partitions the collection in subcollections of the specified group size.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="rootMessage" use="optional" default="rootMessage" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation>
+ Property name where the parent message is stored.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="counter" use="optional" default="counter" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation>
+ Property name used to store the number of message being iterated.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ </xsd:extension>
+ </xsd:complexContent>
+ </xsd:complexType>
+
+
<!--==================================-->
<!-- Routing (1-n) Message Processors -->
<!--==================================-->
Added: branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java (0 => 23794)
--- branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java (rev 0)
+++ branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,179 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.test.routing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.mule.DefaultMuleMessage;
+import org.mule.api.MuleMessage;
+import org.mule.module.client.MuleClient;
+import org.mule.tck.junit4.FunctionalTestCase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class ForeachTestCase extends FunctionalTestCase
+{
+
+ private MuleClient client;
+
+ protected void doSetUp() throws Exception
+ {
+ super.doSetUp();
+ client = new MuleClient(muleContext);
+ }
+
+ protected String getConfigResources()
+ {
+ return "foreach-test.xml";
+ }
+
+ public void testDefaultConfiguration() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("julio");
+ payload.add("sosa");
+
+ MuleMessage result = client.send("vm://input-1", payload, null);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(2, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ MuleMessage out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("julio", out.getPayload());
+
+ out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("sosa", out.getPayload());
+ }
+
+ public void testDefaultConfigurationPlusMP() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("syd");
+ payload.add("barrett");
+
+ MuleMessage result = client.send("vm://input-2", payload, null);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(3, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ MuleMessage out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("syd", out.getPayload());
+
+ out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("barrett", out.getPayload());
+
+ }
+
+ public void testDefaultConfigurationExpression() throws Exception
+ {
+ final Collection<String> names = new ArrayList<String>();
+ names.add("residente");
+ names.add("visitante");
+ Map<String, Object> props = new HashMap<String, Object>();
+ props.put("names", names);
+ MuleMessage message = new DefaultMuleMessage("message payload", props, muleContext);
+
+ MuleMessage result = client.send("vm://input-3", message);
+ assertTrue(result.getPayload() instanceof String);
+ assertEquals(names.size(), ((Collection<?>) message.getOutboundProperty("names")).size());
+
+ MuleMessage out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("residente", out.getPayload());
+
+ out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof String);
+ assertEquals("visitante", out.getPayload());
+ }
+
+ public void testPartitionedConfiguration() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("gulp");
+ payload.add("oktubre");
+ payload.add("un baion");
+ payload.add("bang bang");
+ payload.add("la mosca");
+
+ MuleMessage result = client.send("vm://input-4", payload, null);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(5, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ MuleMessage out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof Collection);
+ Collection<?> outPayload = (Collection<?>) out.getPayload();
+ assertEquals(3, outPayload.size());
+
+ out = client.request("vm://out", getTestTimeoutSecs());
+ assertTrue(out.getPayload() instanceof Collection);
+ outPayload = (Collection<?>) out.getPayload();
+ assertEquals(2, outPayload.size());
+ }
+
+ public void testRootMessageConfiguration() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("pyotr");
+ payload.add("ilych");
+ MuleMessage parent = new DefaultMuleMessage(payload, muleContext);
+
+ MuleMessage result = client.send("vm://input-5", parent);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(2, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ assertSame(parent.getPayload(), ((MuleMessage) result.getInboundProperty("parent")).getPayload());
+ }
+
+ public void testCounterConfiguration() throws Exception
+ {
+ final Collection<String> payload = new ArrayList<String>();
+ payload.add("wolfgang");
+ payload.add("amadeus");
+ payload.add("mozart");
+ MuleMessage parent = new DefaultMuleMessage(payload, muleContext);
+
+ MuleMessage result = client.send("vm://input-6", parent);
+ assertTrue(result.getPayload() instanceof Collection);
+ Collection<?> resultPayload = (Collection<?>) result.getPayload();
+ assertEquals(3, resultPayload.size());
+ assertSame(payload, resultPayload);
+
+ assertEquals(result.getInboundProperty("msg-last-index"), 3);
+ }
+
+}
+
+
Property changes on: branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Added: branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml (0 => 23794)
--- branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml (rev 0)
+++ branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml 2012-02-01 23:52:38 UTC (rev 23794)
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<mule xmlns="http://www.mulesoft.org/schema/mule/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:spring="http://www.springframework.org/schema/beans"
+ xmlns:test="http://www.mulesoft.org/schema/mule/test"
+ xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
+ xmlns:script="http://www.mulesoft.org/schema/mule/scripting"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
+ http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
+ http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/3.2/mule-vm.xsd
+ http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/3.2/mule-test.xsd
+ http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd">
+
+ <flow name="minimal-config">
+ <vm:inbound-endpoint path="input-1" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach>
+ <test:component/>
+ <vm:outbound-endpoint path="out"/>
+ </foreach>
+ </flow>
+
+ <flow name="minimal-config-plus-mp">
+ <vm:inbound-endpoint path="input-2" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach>
+ <test:component/>
+ <vm:outbound-endpoint path="out"/>
+ </foreach>
+ <script:transformer>
+ <script:script engine="groovy">
+ payload.add("added after foreach")
+ result = payload
+ </script:script>
+ </script:transformer>
+ </flow>
+
+ <flow name="minimal-config-expression">
+ <vm:inbound-endpoint path="input-3" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach expression="#[header:INBOUND:names]">
+ <test:component/>
+ <vm:outbound-endpoint path="out"/>
+ </foreach>
+ </flow>
+
+ <flow name="partitioned-config">
+ <vm:inbound-endpoint path="input-4" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach groupSize="3">
+ <test:component/>
+ <vm:outbound-endpoint path="out"/>
+ </foreach>
+ </flow>
+
+ <flow name="parent-message-config">
+ <vm:inbound-endpoint path="input-5" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach rootMessage="parent">
+ <test:component/>
+ </foreach>
+ <message-properties-transformer scope="outbound">
+ <add-message-property key="parent" value="#[variable:parent]"/>
+ </message-properties-transformer>
+ </flow>
+
+ <flow name="counter-config">
+ <vm:inbound-endpoint path="input-6" exchange-pattern="request-response"/>
+ <!-- with this configuration, the response from target MP won't be considered an error (no failureExpression) -->
+ <foreach counter="index">
+ <test:component/>
+ <message-properties-transformer scope="session">
+ <add-message-property key="msg-last-index" value="#[variable:index]"/>
+ </message-properties-transformer>
+ </foreach>
+ <message-properties-transformer scope="outbound">
+ <add-message-property key="msg-last-index" value="#[header:SESSION:msg-last-index]"/>
+ </message-properties-transformer>
+ </flow>
+
+</mule>
Property changes on: branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
http://xircles.codehaus.org/manage_email
Daniel Feist
2012-02-07 01:35:23 UTC
Permalink
Weren't you going to drop the "MessageProcessor" too and just leave "foreach"?

Dan
Revision
23822
Author
svacas
Date
2012-02-06 08:47:37 -0600 (Mon, 06 Feb 2012)
Log Message
MULE-5914: Implement <foreach> message processor
-add map iteration support
-package refactoring
Modified Paths
branches/mule-3.x/core/src/main/java/org/mule/routing/ExpressionSplitter.java
branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java
branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java
branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml
Added Paths
branches/mule-3.x/core/src/main/java/org/mule/routing/ForeachMessageProcessor.java
Removed Paths
branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java
Diff
Deleted: branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java (23821 => 23822)
--- branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java 2012-02-06 13:47:50 UTC (rev 23821)
+++ branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java 2012-02-06 14:47:37 UTC (rev 23822)
@@ -1,153 +0,0 @@
-/*
- * $Id$
- * --------------------------------------------------------------------------------------
- * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
- *
- * The software in this package is published under the terms of the CPAL v1.0
- * license, a copy of which has been included with this distribution in the
- * LICENSE.txt file.
- */
-
-package org.mule.processor;
-
-import org.mule.api.MuleEvent;
-import org.mule.api.MuleException;
-import org.mule.api.lifecycle.Initialisable;
-import org.mule.api.lifecycle.InitialisationException;
-import org.mule.api.processor.InterceptingMessageProcessor;
-import org.mule.api.processor.MessageProcessor;
-import org.mule.expression.ExpressionConfig;
-import org.mule.processor.chain.DefaultMessageProcessorChainBuilder;
-import org.mule.routing.CollectionSplitter;
-import org.mule.routing.ExpressionSplitter;
-import org.mule.routing.outbound.AbstractMessageSequenceSplitter;
-
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * The <code>Foreach</code> MessageProcessor allows iterating over a collection
- * payload, or any collection obtained by an expression, generating a message for
- * each element.
- * <p>
- * The number of the message being processed is stored in
- * <code>#[variable:counter]</code> and the root message is store in
- * <code>#[variable:rootMessage]</code>. Both variables may be renamed by means of
- * <p>
- * Defining a groupSize greater than one, allows iterating over collections of
- * elements of the specified size.
- * <p>
- * to foreach.
- */
-public class ForeachMessageProcessor extends AbstractMessageProcessorOwner implements Initialisable, InterceptingMessageProcessor
-{
-
- public static final String ROOT_MESSAGE_PROPERTY = "rootMessage";
- public static final String COUNTER_PROPERTY = "counter";
-
- protected Log logger = LogFactory.getLog(getClass());
-
- private List<MessageProcessor> messageProcessors;
- private MessageProcessor ownedMessageProcessor;
- private AbstractMessageSequenceSplitter splitter;
- private MessageProcessor next;
- private String expression;
- private int groupSize;
- private String rootMessageVariableName;
- private String counterVariableName;
-
- public MuleEvent process(MuleEvent event) throws MuleException
- {
- String parentMessageProp = rootMessageVariableName != null ? rootMessageVariableName : ROOT_MESSAGE_PROPERTY;
- event.getMessage().setInvocationProperty(parentMessageProp, event.getMessage());
- ownedMessageProcessor.process(event);
- return processNext(event);
- }
-
- protected MuleEvent processNext(MuleEvent event) throws MuleException
- {
- if (next == null)
- {
- return event;
- }
- else
- {
- return next.process(event);
- }
- }
-
- protected List<MessageProcessor> getOwnedMessageProcessors()
- {
- return messageProcessors;
- }
-
- public void setListener(MessageProcessor listener)
- {
- next = listener;
- }
-
- public void setMessageProcessors(List<MessageProcessor> messageProcessors) throws MuleException
- {
- this.messageProcessors = messageProcessors;
- }
-
- public void initialise() throws InitialisationException
- {
- if (expression != null)
- {
- ExpressionConfig config = new ExpressionConfig();
- config.setExpression(expression);
- splitter = new ExpressionSplitter(config);
- }
- else
- {
- splitter = new CollectionSplitter();
- }
- splitter.setGroupSize(groupSize);
- splitter.setCounterVariableName(counterVariableName);
- splitter.setMuleContext(muleContext);
- messageProcessors.add(0, splitter);
-
- try
- {
- this.ownedMessageProcessor = new DefaultMessageProcessorChainBuilder().chain(messageProcessors).build();
- }
- catch (MuleException e)
- {
- throw new InitialisationException(e, this);
- }
- super.initialise();
- }
-
- public void setExpression(String expression)
- {
- this.expression = expression;
- }
-
- public void setGroupSize(int groupSize)
- {
- this.groupSize = groupSize;
- }
-
- public void setRootMessageVariableName(String rootMessageVariableName)
- {
- this.rootMessageVariableName = rootMessageVariableName;
- }
-
- public void setCounterVariableName(String counterVariableName)
- {
- this.counterVariableName = counterVariableName;
- }
-
-}
-
-
Modified: branches/mule-3.x/core/src/main/java/org/mule/routing/ExpressionSplitter.java (23821 => 23822)
--- branches/mule-3.x/core/src/main/java/org/mule/routing/ExpressionSplitter.java 2012-02-06 13:47:50 UTC (rev 23821)
+++ branches/mule-3.x/core/src/main/java/org/mule/routing/ExpressionSplitter.java 2012-02-06 14:47:37 UTC (rev 23822)
@@ -20,7 +20,11 @@
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
/**
* Splits a message using the expression provided invoking the next message processor
@@ -65,6 +69,18 @@
}
return messages;
}
+ else if (result instanceof Map<?, ?>)
+ {
+ List<MuleMessage> list = new LinkedList<MuleMessage>();
+ Set<Map.Entry<?, ?>> set = ((Map) result).entrySet();
+ for (Entry<?, ?> entry : set)
+ {
+ MuleMessage message = new DefaultMuleMessage(entry.getValue(), muleContext);
+ message.setInvocationProperty(MapSplitter.MAP_ENTRY_KEY, entry.getKey());
+ list.add(message);
+ }
+ return list;
+ }
else if (result instanceof MuleMessage)
{
return Collections.singletonList((MuleMessage) result);
Copied: branches/mule-3.x/core/src/main/java/org/mule/routing/ForeachMessageProcessor.java (from rev 23808, branches/mule-3.x/core/src/main/java/org/mule/processor/ForeachMessageProcessor.java) (0 => 23822)
--- branches/mule-3.x/core/src/main/java/org/mule/routing/ForeachMessageProcessor.java (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/routing/ForeachMessageProcessor.java 2012-02-06 14:47:37 UTC (rev 23822)
@@ -0,0 +1,182 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.routing;
+
+import org.mule.DefaultMuleMessage;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
+import org.mule.api.lifecycle.Initialisable;
+import org.mule.api.lifecycle.InitialisationException;
+import org.mule.api.processor.InterceptingMessageProcessor;
+import org.mule.api.processor.MessageProcessor;
+import org.mule.expression.ExpressionConfig;
+import org.mule.processor.AbstractMessageProcessorOwner;
+import org.mule.processor.chain.DefaultMessageProcessorChainBuilder;
+import org.mule.routing.outbound.AbstractMessageSequenceSplitter;
+import org.mule.routing.outbound.CollectionMessageSequence;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The <code>Foreach</code> MessageProcessor allows iterating over a collection
+ * payload, or any collection obtained by an expression, generating a message for
+ * each element.
+ * <p>
+ * The number of the message being processed is stored in
+ * <code>#[variable:counter]</code> and the root message is store in
+ * <code>#[variable:rootMessage]</code>. Both variables may be renamed by means of
+ * <p>
+ * Defining a groupSize greater than one, allows iterating over collections of
+ * elements of the specified size.
+ * <p>
+ * to foreach.
+ */
+public class ForeachMessageProcessor extends AbstractMessageProcessorOwner implements Initialisable, InterceptingMessageProcessor
+{
+
+ public static final String ROOT_MESSAGE_PROPERTY = "rootMessage";
+ public static final String COUNTER_PROPERTY = "counter";
+
+ protected Log logger = LogFactory.getLog(getClass());
+
+ private List<MessageProcessor> messageProcessors;
+ private MessageProcessor ownedMessageProcessor;
+ private AbstractMessageSequenceSplitter splitter;
+ private MessageProcessor next;
+ private String expression;
+ private int groupSize;
+ private String rootMessageVariableName;
+ private String counterVariableName;
+
+ public MuleEvent process(MuleEvent event) throws MuleException
+ {
+ String parentMessageProp = rootMessageVariableName != null ? rootMessageVariableName : ROOT_MESSAGE_PROPERTY;
+ event.getMessage().setInvocationProperty(parentMessageProp, event.getMessage());
+ ownedMessageProcessor.process(event);
+ return processNext(event);
+ }
+
+ protected MuleEvent processNext(MuleEvent event) throws MuleException
+ {
+ if (next == null)
+ {
+ return event;
+ }
+ else
+ {
+ return next.process(event);
+ }
+ }
+
+ protected List<MessageProcessor> getOwnedMessageProcessors()
+ {
+ return messageProcessors;
+ }
+
+ public void setListener(MessageProcessor listener)
+ {
+ next = listener;
+ }
+
+ public void setMessageProcessors(List<MessageProcessor> messageProcessors) throws MuleException
+ {
+ this.messageProcessors = messageProcessors;
+ }
+
+ public void initialise() throws InitialisationException
+ {
+ if (expression != null)
+ {
+ ExpressionConfig config = new ExpressionConfig();
+ config.setExpression(expression);
+ splitter = new ExpressionSplitter(config);
+ }
+ else
+ {
+ splitter = new CollectionMapSplitter();
+ }
+ splitter.setGroupSize(groupSize);
+ splitter.setCounterVariableName(counterVariableName);
+ splitter.setMuleContext(muleContext);
+ messageProcessors.add(0, splitter);
+
+ try
+ {
+ this.ownedMessageProcessor = new DefaultMessageProcessorChainBuilder().chain(messageProcessors).build();
+ }
+ catch (MuleException e)
+ {
+ throw new InitialisationException(e, this);
+ }
+ super.initialise();
+ }
+
+ public void setExpression(String expression)
+ {
+ this.expression = expression;
+ }
+
+ public void setGroupSize(int groupSize)
+ {
+ this.groupSize = groupSize;
+ }
+
+ public void setRootMessageVariableName(String rootMessageVariableName)
+ {
+ this.rootMessageVariableName = rootMessageVariableName;
+ }
+
+ public void setCounterVariableName(String counterVariableName)
+ {
+ this.counterVariableName = counterVariableName;
+ }
+
+ private static class CollectionMapSplitter extends CollectionSplitter
+ {
+
+ protected MessageSequence<?> splitMessageIntoSequence(MuleEvent event)
+ {
+ Object payload = event.getMessage().getPayload();
+ if (payload instanceof Map<?, ?>)
+ {
+ List<MuleMessage> list = new LinkedList<MuleMessage>();
+ Set<Map.Entry<?, ?>> set = ((Map) payload).entrySet();
+ for (Entry<?, ?> entry : set)
+ {
+ MuleMessage splitMessage = new DefaultMuleMessage(entry.getValue(), muleContext);
+ splitMessage.setInvocationProperty(MapSplitter.MAP_ENTRY_KEY, entry.getKey());
+ list.add(splitMessage);
+ }
+ return new CollectionMessageSequence(list);
+ }
+ return super.splitMessageIntoSequence(event);
+ }
+
+ }
+}
+
+
Modified: branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java (23821 => 23822)
--- branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java 2012-02-06 13:47:50 UTC (rev 23821)
+++ branches/mule-3.x/modules/spring-config/src/main/java/org/mule/config/spring/handlers/MuleNamespaceHandler.java 2012-02-06 14:47:37 UTC (rev 23822)
@@ -132,7 +132,6 @@
import org.mule.model.seda.SedaService;
import org.mule.object.PrototypeObjectFactory;
import org.mule.object.SingletonObjectFactory;
-import org.mule.processor.ForeachMessageProcessor;
import org.mule.processor.IdempotentRedeliveryPolicy;
import org.mule.processor.InvokerMessageProcessor;
import org.mule.processor.NullMessageProcessor;
@@ -147,6 +146,7 @@
import org.mule.routing.ExpressionMessageInfoMapping;
import org.mule.routing.ExpressionSplitter;
import org.mule.routing.FirstSuccessful;
+import org.mule.routing.ForeachMessageProcessor;
import org.mule.routing.ForwardingCatchAllStrategy;
import org.mule.routing.IdempotentMessageFilter;
import org.mule.routing.IdempotentSecureHashMessageFilter;
Modified: branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java (23821 => 23822)
--- branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java 2012-02-06 13:47:50 UTC (rev 23821)
+++ branches/mule-3.x/tests/integration/src/test/java/org/mule/test/routing/ForeachTestCase.java 2012-02-06 14:47:37 UTC (rev 23822)
@@ -191,6 +191,40 @@
assertEquals(10, result.getInboundProperty("totalMessages"));
assertEquals(msgCollection.getPayload(), result.getPayload());
}
+
+ public void testMapPayload() throws Exception
+ {
+ final Map<String, String> payload = new HashMap<String, String>();
+ payload.put("name", "david");
+ payload.put("surname", "bowie");
+ MuleMessage parent = new DefaultMuleMessage(payload, muleContext);
+
+ MuleMessage result = client.send("vm://input-8", parent);
+ assertTrue(result.getPayload() instanceof Map);
+ Map<?, ?> resultPayload = (Map<?, ?>) result.getPayload();
+ assertEquals(payload.size(), resultPayload.size());
+ assertEquals(payload.size(), result.getInboundProperty("totalMessages"));
+ assertSame(payload, resultPayload);
+ }
+
+ public void testMapExpression() throws Exception
+ {
+ final Collection<String> names = new ArrayList<String>();
+ names.add("Sergei");
+ names.add("Vasilievich");
+ names.add("Rachmaninoff");
+ Map<String, Object> props = new HashMap<String, Object>();
+ props.put("names", names);
+ MuleMessage message = new DefaultMuleMessage("message payload", props, muleContext);
+
+ MuleMessage result = client.send("vm://input-9", message);
+ assertTrue(result.getPayload() instanceof String);
+ assertEquals(names.size(), ((Collection<?>) message.getOutboundProperty("names")).size());
+ assertEquals(names.size(), result.getInboundProperty("totalMessages"));
+ }
+
}
Modified: branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml (23821 => 23822)
--- branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml 2012-02-06 13:47:50 UTC (rev 23821)
+++ branches/mule-3.x/tests/integration/src/test/resources/foreach-test.xml 2012-02-06 14:47:37 UTC (rev 23822)
@@ -91,5 +91,33 @@
-->
</flow>
+ <flow name="map-config">
+ <vm:inbound-endpoint path="input-8" exchange-pattern="request-response"/>
+ <foreach>
+ <test:component/>
+ <message-properties-transformer scope="invocation">
+ <add-message-property key="totalMessages" value="#[variable:counter]"/>
+ </message-properties-transformer>
+ </foreach>
+ <message-properties-transformer scope="outbound">
+ <add-message-property key="totalMessages" value="#[variable:totalMessages]"/>
+ </message-properties-transformer>
+ <!--
+ <test:assert expression="#[groovy:message.getInvocationProperty('totalMessages')==2]" />
+ -->
+ </flow>
+ <flow name="map-expression-config">
+ <vm:inbound-endpoint path="input-9" exchange-pattern="request-response"/>
+ <foreach expression="#[header:INBOUND:names]">
+ <test:component/>
+ <message-properties-transformer scope="invocation">
+ <add-message-property key="totalMessages" value="#[variable:counter]"/>
+ </message-properties-transformer>
+ </foreach>
+ <message-properties-transformer scope="outbound">
+ <add-message-property key="totalMessages" value="#[variable:totalMessages]"/>
+ </message-properties-transformer>
+ </flow>
+
</mule>
http://xircles.codehaus.org/manage_email
Loading...