Pattern: Aggregator


Pattern: Aggregator

For example, we want to select the best bid from a number of vendor responses or we want to bill the client for an order after all items have been pulled from the warehouse.

How do we combine the results of individual, but related messages so that they can be
processed as a whole?

  • Messages out of order
  • Message delayed
  • Which messages are related?
  • Avoid separate channel for each system

Use a stateful filter, an Aggregator, to collect and store individual messages until a complete set of related messages has been received. Then, the Aggregator publishes a single message distilled from the individual messages.

The Aggregator is a special Filter that receives a stream of messages and identifies messages that

are correlated. Once a complete set of messages has been received. the Aggregator collects information from each correlated message and publishes a single, aggregated message to the output channel for further processing.
When designing an Aggregator, we need to specify the following items:
  • Correlation – which incoming messages belong together?
  • Completeness Condition – when are we ready to publish the result message?
  • Aggregation Algorithm – how do we combine the received messages into a single result message?

Example: Aggregator in JMS

The Aggregator receives bid messages on one channel, aggregates all related bids and publishes a message with the lowest bid to another channel. Bids are correlated through an Auction ID property that acts as a Correlation Identifier for the messages. The aggregation strategy is to receive a minimum of 3 bids. The Aggregator is self-starting and does not require external initialization.
The solution consists of the following main classes:
  • Aggregator – contains logic to receive messages, aggregate them and send result messages. Interfaces with aggregates via the Aggregate interface.
  • AuctionAggregate – implements the Aggregate interface. This class acts as an Adapter (see [GoF]) between the Aggregate interface and the Auction class. This setup allows the Auction class to be free of references to the JMS API.
  • Auction – a collection of related bids that have been received. The Auction class implements the aggregation strategy, e.g. finding the lowest bid and determining when the aggregate is complete.
  • Bid – is a convenience class that holds the data items associated with a bid. We convert incoming message data into a bid object so that we can access the bid data through a strongly-typed interface, making the the Auction logic completely independent from the JMS API.
public class Aggregator implements MessageListener
{
static final String PROP_CORRID = "AuctionID";
Map activeAggregates = new HashMap();
Destination inputDest = null;
Destination outputDest = null;
Session session = null;
MessageConsumer in = null;
MessageProducer out = null;
public Aggregator (Destination inputDest, Destination outputDest, Session session)
{
this.inputDest = inputDest;
this.outputDest = outputDest;
this.session = session;
}
public void run()
{
try {
in = session.createConsumer(inputDest);
out = session.createProducer(outputDest);
in.setMessageListener(this);
} catch (Exception e) {
System.out.println("Exception occurred: " + e.toString());
}
}
public void onMessage(Message msg)
{
try {
String correlationID = msg.getStringProperty(PROP_CORRID);
Aggregate aggregate = (Aggregate)activeAggregates.get(correlationID);
if (aggregate == null) {
aggregate = new AuctionAggregate(session);
activeAggregates.put(correlationID, aggregate);
}
//--- ignore message if aggregate is already closed
if (!aggregate.isComplete()) {
aggregate.addMessage(msg);
if (aggregate.isComplete()) {
MapMessage result = (MapMessage)aggregate.getResultMessage();
out.send(result);
}
}
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
}
}
}

public interface Aggregate {

public void addMessage(Message message);

public boolean isComplete();

public Message getResultMessage();

}
public class Auction

{

ArrayList bids = new ArrayList();

public void addBid(Bid bid)

{

bids.add(bid);

System.out.println(bids.size() + " Bids in auction.");

}

public boolean isComplete()

{

return (bids.size() >= 3);

}

public Bid getBestBid()

{

Bid bestBid = null;

Iterator iter = bids.iterator();

if (iter.hasNext())

bestBid = (Bid) iter.next();

while (iter.hasNext()) {

Bid b = (Bid) iter.next();

if (b.getPrice() < bestBid.getPrice()) {

bestBid = b;

}

}

return bestBid;

}

public class AuctionAggregate implements Aggregate {

static String PROP_AUCTIONID = "AuctionID";

static String ITEMID = "ItemID";

static String VENDOR = "Vendor";

static String PRICE = "Price";

private Session session;

private Auction auction;

public AuctionAggregate(Session session)

{

this.session = session;

auction = new Auction();

}

public void addMessage(Message message) {

Bid bid = null;

if (message instanceof MapMessage) {

try {

MapMessage mapmsg = (MapMessage)message;

String auctionID = mapmsg.getStringProperty(PROP_AUCTIONID);

String itemID = mapmsg.getString(ITEMID);

String vendor = mapmsg.getString(VENDOR);

double price = mapmsg.getDouble(PRICE);

bid = new Bid(auctionID, itemID, vendor, price);

auction.addBid(bid);

} catch (JMSException e) {

System.out.println(e.getMessage());

}

}

}

public boolean isComplete()

{

return auction.isComplete();

}

public Message getResultMessage() {

Bid bid = auction.getBestBid();

try {

MapMessage msg = session.createMapMessage();

msg.setStringProperty(PROP_AUCTIONID, bid.getCorrelationID());

msg.setString(ITEMID, bid.getItemID());

msg.setString(VENDOR, bid.getVendorName());

msg.setDouble(PRICE, bid.getPrice());

return msg;

} catch (JMSException e) {

System.out.println("Could not create message: " + e.getMessage());

return null;

}

}

The following sequence diagram summarizes the interaction between the classes:

Leave a comment