This component collects and aggregates messages received on its IN_PORT based on a specified Completeness Condition. The collected messages are then forwarded as an aggregated bundle of messages to a component connected to its OUT_PORT.
The Aggregator is a special message filter that receives a stream of messages and identifies the messages that are correlated. Once a complete set of messages have been received, the Aggregator collects information from each of these message and publishes a single, aggregated message to the output channel for further processing.
The Aggregator is a stateful component unlike other simple routing components like Content Based Router (CBR) which are generally stateless. Stateless components process incoming messages one by one and are not required to maintain any information between messages.
The component also has an option to persist the message into a RDBMS.
Configuration and Testing
The configuration of Aggregator is defined as shown in Figure 1.
Figure 1: Default Configuration of Aggregator
Attributes
Validate Input
If this attribute is set to 'yes', the service tries to validate the input received.
Error handling configuration
Please refer the Error Handling section in Common Configurations page for documentation.
Completeness Condition
Aggregator starts aggregating the received messages when the completeness condition is satisfied and sends the aggregated message on to the output port. Completeness condition can be configured to one of the following values:
- Timeout with Override
Messages are aggregated after a specified Time out (ms) or after configured number of messages as specified by Completeness Message Count are received, which ever happens earlier.
Example 1
If Time out (ms) is 5000 ms and Completeness Message Count is 3, Aggregator waits for 5000 milliseconds before aggregating. However, if 3 messages are received before 5000 milliseconds, then it will immediately aggregate them and send out. Similarly, if only 2 messages are received in 5000 ms then it aggregates those 2 messages and send out.
Example 2
When messages with different correlation IDs are received on the input port of the component and Correlation id is the condition specified for the property Group Message based on, the following procedure is followed to aggregate those messages.
Let Mn(Cn)@Nsecs denote "nth Message with Correlation ID C has a value n arriving at Nth Second" and aggregator is configured as Aggregate after time out with 25 seconds based on correlation ID. M1(C1)
@0s, M2C2
@10s, M3C1
@20s, M4C2
@30s, M5C1
@40s are received on input port of Aggregator
@0 seconds, M1 is received, timer T1 is set for 25s for correlation ID 1
@10 seconds, M2 is received with correlation ID 2, a new timer T2 starts now for 25s for correlation ID 2.
@20 seconds, M3 is received, since 25s have not passed after M1 (which has same correlation ID as M3) is received, it is stored.
@25 seconds, timer for correlation ID 1(T1) goes off and messages M1 and M3 are aggregated and sent out without waiting for M5.
@30 seconds, M4 is received and stored along with M2 since, they have same correlation ID.
@35 seconds, T2 times out (25 seconds after M2 has arrived) M2 and M4 are aggregated and sent out.
Timeout
Messages belonging to the same group are aggregated after time specified by the property Time out (ms). The grouping is done based on the property Group Message based on.
- Wait for 'N' Messages
Messages belonging to the same group are aggregated after receiving number of messages as specified by property Completeness Message Count. The grouping is done based on the property Group Message based on.
- Dynamic Number of Messages
Number of messages to be aggregated is dynamic (that is, changes with time during the lifespan of the business application). This number depends on properties Aggregation Condition and Completeness Message Xpath/Property Name containing completeness condition.
Timeout with Dynamic number of messages
Messages are aggregated after a time specified by property Time out (ms) or after dynamically decided number of messages are received, which ever happens earlier. This number depends on properties Aggregation Condition and Completeness Message XPath/Property Name containing completeness condition.
Apply completeness on all messages
Messages corresponding to all correlation ids will be aggregated when a completeness condition is satisfied. Completeness condition is checked on all messages in cache.
- no
If a message with a particular correlation ID satisfies the completeness condition, then the cached messages having the same correlation ID will be aggregated and sent out.
- yes
If a message with a particular correlation ID satisfies the completeness condition, then the cached messages, irrespective of the correlation ID, will be aggregated and grouped based on correlation ID and sent out. Separate output will be sent for each correlation ID.
Example
Let Ma(Cn) denote message with Correlation ID(C) value n and Message XPath value as a and Aggregator is configured to the Completeness XPath value as X.
Ma(C1), Mb(C2), Mb(C1), Mc(C2), Mc(C3), MX(C1), MX(C2), MX(C3) are sent to the input port of the Aggregator.
- Value set to 'no'
After receiving MX(C1) message (which satisfies the completeness condition), the messages Ma(C1), Mb(C1) and MX(C1) which have same correlation ID(1) will be aggregated and sent out. After receiving MX(C2) the messages Mb(C2), Mc(C2) and MX(C2) which have same correlation ID(2) will be aggregated and sent out. After receiving MX(C3), the messages Mc(C3) and MX(C3) will be aggregated and sent out.
- Value set to 'no'
- Value set to 'yes'
After receiving MX(C1) message which has the completeness xpath value X, then the messages Ma(C1), Mb(C1) and MX(C1) which have same correlation ID(1) will be aggregated and sent out. Now the cached messages Mb(C2), Mc(C2), Mc(C3) will also be aggregated based on correlation ID and sent out. Mb(C2) and Mc(C2) will be aggregated and sent out in a separate output. Mc(C3) will be sent in a separate output. After receiving MX(C2) which satisfies the completeness condition will be send out without caching. After receiving MX(C3) which satisfies the completeness condition will be send out without caching.
- Value set to 'yes'
Timeout (ms)
Time (in milliseconds) for which the Aggregator waits, before it aggregates the messages in Cache.
Completeness Message Count
This property specifies the number of messages satisfying the grouping condition after which messages should be aggregated. Default value 0 (zero) indicates infinite number of messages.
Completeness criteria source
This property determines criteria on which the completeness condition has to be applied.
- Input XML Message
Aggregator evaluates the Xpath on the incoming XML message to determine the dynamic number of messages or to check the completeness condition. The Xpath can be provided in the property Completeness Message Xpath.
- Event Process Context
Aggregator evaluates the Xpath on the Application Context of an incoming XML message to determine the dynamic number of messages or to check the completeness condition. The Xpath can be provided in the property Completeness Message Xpath.
- Document Header Property
Aggregator checks the value of the header of the input message to determine the dynamic number of messages or to check the completeness condition. The Header name can be provided in the property Property Name containing Completeness Condition.
Aggregation Condition
Along with Completeness criteria source, this property is used to compute the dynamic number of messages to be aggregated.
- Aggregate till count of messages
Messages are aggregated upto dynamically decided number of messages satisfying grouping condition. The number of messages can be determined by evaluating the XPath on completeness criteria source (Input XML Message/Event Process Context/ Document Header Property) and checks if the cached messages are equal to the value determined, if equal then the cached messages are grouped based on the grouping condition and sent out.
Example:
Let Completeness criteria source is set as Document Header Property and Property Name containing Completeness Condition as No_of_Messages and Apply completeness on all messages to No.
Let MHN(Cn) - Message with Correlation ID C has a value n and the header No_of_Messages has value N.
MH3(C1), MH1(C1), MH3(C2), MH3(C1), MH2(C2) are sent to the input port of Aggregator.
When MH3(C1) is received the component caches the message and determines the completeness message count from the header as 3 and the cached message count as .
Since, message count is not equal to 3 the Aggregator waits for another message.
After receiving MH3(C1) the number of cached messages which are having correlation ID value 1 will become 3 which is equal to the header value of the message, so the messages MH3(C1), MH1(C1) and MH3(C1) will be aggregated and sent out.
After receiving MH2(C2) the number of cached messages which are having correlation id value 2 will become 2 which is equal to the header value of the message so the messages MH3(C2) and MH2(C2) will be aggregated and sent out.
- Aggregate till matching condition
When this option is selected, then the completeness condition will be evaluated as follows:
The XPath which is provided in Completeness Message Xpath/Property Name containing completeness condition will be evaluated on the Completeness Criteria Source (Input XML Message/Event Process Context/ Document Header Property) and this value is compared to the value provided in the property Matching String. If both are equal, then the Aggregator will aggregate the cached messages based on the grouping condition.
Completeness Message Xpath
This launches an Xpath Editor which can be used to configure the Xpath. If the completeness criteria source is Input XML Message, then before configuring this property, input port XSD must be specified against property Input Ports XSD to show the input XML structure in the XPath editor.
Property Name containing completeness condition
This property specifies the name of Header in the input message based on which Aggregator will determine the dynamic number of messages or the completeness condition.
Ignore Duplicate Messages
- yes
Aggregator checks for the duplicate messages and ignores the duplicate messages.
- no
Aggregator caches all the messages and doesn't check if the message is duplicate or not.
The procedure to determine the duplicate message is as follows
- Checks whether the message has a duplicate Correlation ID. If 'yes', then computes value for the property specified by Duplication Identifier.
- Now checks if a message already exists in the cache which have same value for Duplicate Identifier. If so, the Aggregator treats this message as duplicate.
Matching String
This property specifies the string that is to be matched with the xpath/document header value. All messages satisfying the grouping condition are aggregated till a message contains the Completeness Message Xpath or Property Name containing completeness condition value matching to the Matching string property value.
Duplication Identifier
This property determines a condition to identify duplicate documents that the Aggregator should use while ignoring duplicate documents.
Document Identifier can be one of the following:
- Document Header Property
Messages having same value for the property (javax.jms.Message.getStringProperty()) are treated as duplicates. The header name can be provided in the property Header Property Name.
- Application Context
Messages containing same Application Context satisfying XPath defined against XPath property are treated as duplicate messages. If the XPath is empty or null then messages having same Application Context are treated as duplicates.
- Text Body
Messages containing same text satisfying XPath defined against XPath property are treated as duplicates. If the XPath is empty or null then messages having same text are treated as duplicates.
- Carry Forward Context
Messages having same value for REQUEST_ID Property of the Carry Forward Context property are treated as duplicates.
- Senders Identification
Messages having same value for property ESBX_SYSTEM_ SOURCE_SERVICE_INSTANCE of the Carry Forward Context property are treated as duplicates.
Duplicate XPath
In case the Duplication Identifier is either Text Body/Application Context, the Text Body or the Application Context is evaluated by using an Xpath.
Header Property Name
In case the Duplication Identifier is set to Document Header Property, this value has to be set. It could be one of the names of the message properties. The Duplication Identification is done based on the value of this property.
Name Space
This property specifies the namespace to be used for the root element that encapsulates all aggregated messages. If Output Ports XSD is specified, this should be same as the target namespace of the output port XSD. This value will be set automatically when output port XSD is specified.
Group messages based on
This property determines a condition to identify similar documents that the Aggregator should use while aggregating. Documents which have same correlation ID are aggregated when the Completeness condition is satisfied.
Correlation ID can be one of the following:
- Document ID
Messages having same javax.jms.Message.getJMSMessageID() are aggregated together.
- Document Correlation ID
Messages having same javax.jms.Message.getJMSCorrelationID() are aggregated together.
- Document Header Property
Messages having same value for the property (javax.jms.Message.getStringProperty()) specified are aggregated. The header name can be provided in the property Header Property Name.
- Application Context
Messages containing Application Context satisfying XPath defined against XPath property are aggregated together. If the XPath is empty or null then messages having same Application Context are aggregated together.
- Text Body
Messages containing text satisfying XPath defined against XPath property are aggregated together. If the XPath is empty or null then messages having same text are aggregated together.
- Carry Forward Context
Messages having same value for REQUEST_ID Property of the Carry Forward Context property are aggregated together.
- Workflow Instance ID
Messages having same value for Property ESBX_SYSTEM_ WORK_FLOW_INST_ID in message are aggregated together. This can be used only when Document tracking is enabled.
None
Messages are grouped as they are received.
XPath
This property specifies the XPath which will be evaluated on Application Context or Text Body depends on the property Group messages based on for grouping. This will launch XPath editor to configure the XPath. If the Group messages based on is Text Body, then before configuring this property input port XSD has to be specified against the property Input Ports XSD to show the input XML structure in the XPath editor.
Header Property Name
The name of the property based on which grouping of messages is done.
Ignore messages after completion
- yes
Ignores messages containing correlation IDs which are already aggregated.
- no
Restarts aggregating messages containing correlation IDs which are already aggregated.
Example:
Aggregator is configured with completeness condition as Wait for 'N' messages and Completeness Message Count to 3.
Let Mt1(C1) denote message with correlation ID C has a value 1 and text body t1.
Mt1(C1), Mt2(C1), Mt3(C1), Mt4(C1) are sent to the input port of Aggregator. Messages Mt1(C1), Mt2(C1) and Mt3(C1) will be aggregated after receiving message Mt3(C1).
If the property Ignore messages after completion is set to 'yes', then the message Mt4(C1) will be ignored, since aggregation has been done on the correlation ID 1, and this message is also having same correlation ID so the message will be ignored.
If the property Ignore messages after completion is set to 'no', then the message Mt4(C1) will be cached, and the Aggregator waits for other messages.
Message persistence
- yes
Persists messages which are not aggregated into a database. If the completeness condition involves only the count of the messages, the old messages are transmitted along with the new messages when the completeness condition is satisfied. In case of timeout, if the timeout occurs when the aggregator is down, the old messages are transmitted when the aggregator is restarted; else, they are transmitted after timeout. The timeout is inclusive of the time when the aggregator was down. In case of persisting messages, message properties are not stored.
Aggregator itself takes the responsibility of starting database and creating tables. It internally uses Mckoi database.
- no
Maintains all messages that are not aggregated in an inMemory data structure.
Table Name
The name of the table that stores messages received by the Aggregator. The value of this field can be left as null. In such an instance, a table with tif_ as prefix is used.
Input Ports XSD
The XSD of the expected input messages.
Output Ports XSD
The XSD for the aggregated output message.
Root Element Name
Root element name that encapsulate all aggregated messages. If Output Ports XSD is specified, this should be same as the root element set in output port XSD. This value is set automatically when output port XSD is specified.
Override Message Properties
- yes
While aggregating messages, if the messages have a same property with different values, the property value of the last message (having this property) in the aggregation is set as the property value for the aggregated message.
- no
While aggregating messages, if the messages have a same property with different values, the property value of the first message (having this property) in the aggregation is set as the property value for the aggregated message.
Functional Demonstration
Scenario 1
Aggregating messages based on the timeout specified.
Configure the Aggregator as described in section Configuration and Testing component to send sample input and to check the response respectively.
In the example below, only Input ports XSD is set to chat schema (can use Fetch from Schema for this) in Aggregator Custom Property Sheet and all the remaining properties are left as default.
Figure 2: Demonstrating Scenario 1
On timeout, the aggregated messages are sent from the output port to the Display component.
Scenario 2
This scenario explains the usage of Dynamic number of messages completeness condition.
Configure the Aggregator as described in section Configuration and Testing and use chat and display component to send sample input and to check the response respectively.
In the example below, Aggregator is configured in such a way that if it receives a message Fiorano at the input port, it aggregates and sends the messages to the output port. The Figure 3 shows the configuration.
Figure 3: Sample Configuration used in Scenario 2
Figure 4: Scenario demonstration with sample input and output
Useful Tips
- When using the persist option, the database is local to the machine on which the Peer Server is running. In case of a Peer Server failover, the component creates a new table and therefore messages cannot be recovered.
- When using header property for grouping messages, properties with names like/starting with ESBX_SYSTEM_* cannot be used.