Discussion:
[mule-scm] [mule][23722] branches/mule-3.2.x: MULE-5998
Daniel Feist
2012-01-25 23:08:23 UTC
Permalink
Once done can you search source code using JIRA id for tests that may be excluded due to this (i excluded a couple)

Dan
Revision
23722
Author
mike.schilling
Date
2012-01-25 16:14:48 -0600 (Wed, 25 Jan 2012)
Log Message
MULE-5998
By default, sort aggregated events by the order in which they arrived.
Modified Paths
branches/mule-3.2.x/core/src/main/java/org/mule/routing/EventGroup.java
branches/mule-3.2.x/core/src/main/java/org/mule/routing/MessageChunkAggregator.java
branches/mule-3.2.x/core/src/main/java/org/mule/routing/correlation/EventCorrelator.java
branches/mule-3.2.x/core/src/main/java/org/mule/routing/correlation/ResequenceCorrelatorCallback.java
branches/mule-3.2.x/core/src/main/java/org/mule/routing/correlation/ResequenceMessagesCorrelatorCallback.java
branches/mule-3.2.x/core/src/test/java/org/mule/routing/AggregatorTestCase.java
branches/mule-3.2.x/core/src/test/java/org/mule/routing/EventGroupTestCase.java
branches/mule-3.2.x/examples/loanbroker-legacy/common/src/main/java/org/mule/example/loanbroker/routers/BankQuotesAggregationLogic.java
Diff
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/routing/EventGroup.java (23721 => 23722)
--- branches/mule-3.2.x/core/src/main/java/org/mule/routing/EventGroup.java 2012-01-25 21:11:01 UTC (rev 23721)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/routing/EventGroup.java 2012-01-25 22:14:48 UTC (rev 23722)
@@ -12,7 +12,6 @@
import org.mule.DefaultMessageCollection;
import org.mule.DefaultMuleEvent;
-import org.mule.DefaultMuleMessage;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
@@ -26,6 +25,10 @@
import org.mule.util.store.DeserializationPostInitialisable;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -44,6 +47,8 @@
private static final long serialVersionUID = 953739659615692697L;
public static final MuleEvent[] EMPTY_EVENTS_ARRAY = new MuleEvent[0];
+
+ public static final String MULE_ARRIVAL_ORDER_PROPERTY = MuleProperties.PROPERTY_PREFIX + "ARRIVAL_ORDER";
transient private ObjectStoreManager objectStoreManager = null;
@@ -55,6 +60,7 @@
private final String storePrefix;
private String commonRootId = null;
private static boolean hasNoCommonRootId = false;
+ private int arrivalOrderCounter = 0;
public static final String DEFAULT_STORE_PREFIX = "DEFAULT_STORE";
@@ -158,7 +164,8 @@
}
/**
- * Returns an iterator over a snapshot copy of this group's collected events. If
+ * Returns an iterator over a snapshot copy of this group's collected events
+ * sorted by their arrival time. If
* you need to iterate over the group and e.g. remove select events, do so via
* prevent e.g. concurrent reception/aggregation of the group during iteration,
@@ -170,6 +177,23 @@
@SuppressWarnings("unchecked")
public Iterator<MuleEvent> iterator() throws ObjectStoreException
{
+ return iterator(true);
+ }
+
+ /**
+ * Returns an iterator over a snapshot copy of this group's collected events.,
+ * optionally sorted by arrival order. If
+ * you need to iterate over the group and e.g. remove select events, do so via
+ * prevent e.g. concurrent reception/aggregation of the group during iteration,
+ * wrap the iteration in a synchronized block on the group instance.
+ *
+ */
+ public Iterator<MuleEvent> iterator(boolean sortByArrival) throws ObjectStoreException
+ {
synchronized (events)
{
if (events.allKeys().isEmpty())
@@ -178,19 +202,31 @@
}
else
{
- return IteratorUtils.arrayIterator(this.toArray());
+ return IteratorUtils.arrayIterator(this.toArray(sortByArrival));
}
}
}
+
/**
- * Returns a snapshot of collected events in this group.
+ * Returns a snapshot of collected events in this group sorted by their arrival time.
*
*/
public MuleEvent[] toArray() throws ObjectStoreException
{
+ return toArray(true);
+ }
+
+ /**
+ * Returns a snapshot of collected events in this group, optionally sorted by their arrival time.
+ *
+ */
+ public MuleEvent[] toArray(boolean sortByArrival) throws ObjectStoreException
+ {
synchronized (events)
{
if (events.allKeys().isEmpty())
@@ -203,6 +239,10 @@
{
eventArray[i] = events.retrieve(keys.get(i));
}
+ if (sortByArrival)
+ {
+ Arrays.sort(eventArray, new ArrivalOrderEventComparator());
+ }
return eventArray;
}
}
@@ -219,7 +259,8 @@
{
//Using both event ID and CorrelationSequence since in certain instances
//when an event is split up, the same event IDs are used.
- Serializable key=event.getId()+event.getMessage().getCorrelationSequence();
+ Serializable key=event.getId()+event.getMessage().getCorrelationSequence();
+ event.getMessage().setInvocationProperty(MULE_ARRIVAL_ORDER_PROPERTY, ++arrivalOrderCounter);
events.store(key, event);
if (!hasNoCommonRootId)
@@ -353,22 +394,29 @@
public MuleMessageCollection toMessageCollection() throws ObjectStoreException
{
- MuleMessageCollection col;
+ return toMessageCollection(true);
+ }
+
+ public MuleMessageCollection toMessageCollection(boolean sortByArrival) throws ObjectStoreException
+ {
+ DefaultMessageCollection col = new DefaultMessageCollection(muleContext);
+ List<MuleMessage> messages = new ArrayList<MuleMessage>();
+
synchronized (events)
{
- if (events.allKeys().isEmpty())
- {
- col = new DefaultMessageCollection(null);
- }
- col = new DefaultMessageCollection(muleContext);
-
for (Serializable id : events.allKeys())
{
- MuleMessage message = events.retrieve(id).getMessage();
- col.addMessage(message);
- ((DefaultMuleMessage)col).copyInvocationProperties(message);
+ MuleMessage message = events.retrieve(id).getMessage();
+ messages.add(message);
+ col.copyInvocationProperties(message);
}
}
+
+ if (sortByArrival)
+ {
+ Collections.sort(messages, new ArrivalOrderMessageComparator());
+ }
+ col.addMessages(messages);
return col;
}
@@ -421,4 +469,27 @@
String storeKey = storePrefix + ".eventGroup." + groupId;
this.events = getObjectStoreManager().getObjectStore(storeKey, true);
}
+
+ public final class ArrivalOrderMessageComparator implements Comparator<MuleMessage>
+ {
+ public int compare(MuleMessage message1, MuleMessage message2)
+ {
+ int val1 = message1.getInvocationProperty(MULE_ARRIVAL_ORDER_PROPERTY, -1);
+ int val2 = message2.getInvocationProperty(MULE_ARRIVAL_ORDER_PROPERTY, -1);
+
+ return val1 - val2;
+ }
+ }
+
+ public final class ArrivalOrderEventComparator implements Comparator<MuleEvent>
+ {
+ public int compare(MuleEvent event1, MuleEvent event2)
+ {
+ int val1 = event1.getMessage().getInvocationProperty(MULE_ARRIVAL_ORDER_PROPERTY, -1);
+ int val2 = event2.getMessage().getInvocationProperty(MULE_ARRIVAL_ORDER_PROPERTY, -1);
+
+ return val1 - val2;
+ }
+ }
}
+
\ No newline at end of file
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/routing/MessageChunkAggregator.java (23721 => 23722)
--- branches/mule-3.2.x/core/src/main/java/org/mule/routing/MessageChunkAggregator.java 2012-01-25 21:11:01 UTC (rev 23721)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/routing/MessageChunkAggregator.java 2012-01-25 22:14:48 UTC (rev 23722)
@@ -62,7 +62,7 @@
MuleEvent[] collectedEvents;
try
{
- collectedEvents = events.toArray();
+ collectedEvents = events.toArray(false);
}
catch (ObjectStoreException e)
{
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/routing/correlation/EventCorrelator.java (23721 => 23722)
--- branches/mule-3.2.x/core/src/main/java/org/mule/routing/correlation/EventCorrelator.java 2012-01-25 21:11:01 UTC (rev 23721)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/routing/correlation/EventCorrelator.java 2012-01-25 22:14:48 UTC (rev 23722)
@@ -437,7 +437,7 @@
}
else
{
- final FlowConstruct service = group.toArray()[0].getFlowConstruct();
+ final FlowConstruct service = group.toArray(false)[0].getFlowConstruct();
if (!(service instanceof Service))
{
throw new UnsupportedOperationException(
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/routing/correlation/ResequenceCorrelatorCallback.java (23721 => 23722)
--- branches/mule-3.2.x/core/src/main/java/org/mule/routing/correlation/ResequenceCorrelatorCallback.java 2012-01-25 21:11:01 UTC (rev 23721)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/routing/correlation/ResequenceCorrelatorCallback.java 2012-01-25 22:14:48 UTC (rev 23722)
@@ -65,7 +65,7 @@
{
try
{
- results = events.toArray();
+ results = events.toArray(false);
}
catch (ObjectStoreException e)
{
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/routing/correlation/ResequenceMessagesCorrelatorCallback.java (23721 => 23722)
--- branches/mule-3.2.x/core/src/main/java/org/mule/routing/correlation/ResequenceMessagesCorrelatorCallback.java 2012-01-25 21:11:01 UTC (rev 23721)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/routing/correlation/ResequenceMessagesCorrelatorCallback.java 2012-01-25 22:14:48 UTC (rev 23722)
@@ -61,7 +61,7 @@
MuleEvent[] results;
try
{
- results = (events == null) ? new MuleEvent[0] : events.toArray();
+ results = (events == null) ? new MuleEvent[0] : events.toArray(false);
}
catch (ObjectStoreException e)
{
Modified: branches/mule-3.2.x/core/src/test/java/org/mule/routing/AggregatorTestCase.java (23721 => 23722)
--- branches/mule-3.2.x/core/src/test/java/org/mule/routing/AggregatorTestCase.java 2012-01-25 21:11:01 UTC (rev 23721)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/routing/AggregatorTestCase.java 2012-01-25 22:14:48 UTC (rev 23722)
@@ -117,7 +117,7 @@
try
{
- for (Iterator iterator = events.iterator(); iterator.hasNext();)
+ for (Iterator iterator = events.iterator(false); iterator.hasNext();)
{
MuleEvent event = (MuleEvent) iterator.next();
try
Modified: branches/mule-3.2.x/core/src/test/java/org/mule/routing/EventGroupTestCase.java (23721 => 23722)
--- branches/mule-3.2.x/core/src/test/java/org/mule/routing/EventGroupTestCase.java 2012-01-25 21:11:01 UTC (rev 23721)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/routing/EventGroupTestCase.java 2012-01-25 22:14:48 UTC (rev 23722)
@@ -160,8 +160,8 @@
eg.addEvent(getTestEvent("foo1"));
eg.addEvent(getTestEvent("foo2"));
- Object[] array1 = IteratorUtils.toArray(eg.iterator());
- MuleEvent[] array2 = eg.toArray();
+ Object[] array1 = IteratorUtils.toArray(eg.iterator(false));
+ MuleEvent[] array2 = eg.toArray(false);
assertTrue(Arrays.equals(array1, array2));
}
Modified: branches/mule-3.2.x/examples/loanbroker-legacy/common/src/main/java/org/mule/example/loanbroker/routers/BankQuotesAggregationLogic.java (23721 => 23722)
--- branches/mule-3.2.x/examples/loanbroker-legacy/common/src/main/java/org/mule/example/loanbroker/routers/BankQuotesAggregationLogic.java 2012-01-25 21:11:01 UTC (rev 23721)
+++ branches/mule-3.2.x/examples/loanbroker-legacy/common/src/main/java/org/mule/example/loanbroker/routers/BankQuotesAggregationLogic.java 2012-01-25 22:14:48 UTC (rev 23722)
@@ -35,7 +35,7 @@
LoanQuote quote = null;
MuleEvent event = null;
- for (Iterator iterator = events.iterator(); iterator.hasNext();)
+ for (Iterator iterator = events.iterator(false); iterator.hasNext();)
{
event = (MuleEvent)iterator.next();
Object o = event.getMessage().getPayload();
http://xircles.codehaus.org/manage_email
Loading...