This component collects and aggregates messages received on its IN_PORT based on a specified Completeness Condition. The collected messages are then forwarded as an aggregated bundle of messages to a component connected to its OUT_PORT.

The Aggregator is a special message filter that receives a stream of messages and identifies the messages that are correlated. Once a complete set of messages have been received, the Aggregator collects information from each of these message and publishes a single, aggregated message to the output channel for further processing.

The Aggregator is a stateful component unlike other simple routing components like Content Based Router (CBR) which are generally stateless. Stateless components process incoming messages one by one and are not required to maintain any information between messages.

The component also has an option to persist the message into a RDBMS.

Icon

The component ignores the non-XML messages (like text messages) send to its input port. The input is expected to be a valid XML message.

Configuration and Testing

The configuration of Aggregator is defined as shown in Figure 1.


Figure 1: Default Configuration of Aggregator

Attributes

Validate Input

If this attribute is set to 'yes', the service tries to validate the input received.

Icon

If this is set to 'no', service will not validate the input and hence the performance increases.

Icon

Setting it to 'no' may cause undesired results if the Input XMLis not valid

Error handling configuration

Please refer the Error Handling section in Common Configurations page for documentation.

Completeness Condition

Aggregator starts aggregating the received messages when the completeness condition is satisfied and sends the aggregated message on to the output port. Completeness condition can be configured to one of the following values:

  • Timeout with Override
    Messages are aggregated after a specified Time out (ms) or after configured number of messages as specified by Completeness Message Count are received, which ever happens earlier.

    Example 1
    If Time out (ms) is 5000 ms and Completeness Message Count is 3, Aggregator waits for 5000 milliseconds before aggregating. However, if 3 messages are received before 5000 milliseconds, then it will immediately aggregate them and send out. Similarly, if only 2 messages are received in 5000 ms then it aggregates those 2 messages and send out.

    Example 2
    When messages with different correlation IDs are received on the input port of the component and Correlation id is the condition specified for the property Group Message based on, the following procedure is followed to aggregate those messages.
    Let Mn(Cn)@Nsecs denote "nth Message with Correlation ID C has a value n arriving at Nth Second" and aggregator is configured as Aggregate after time out with 25 seconds based on correlation ID. M1(C1)
    @0s, M2C2
    @10s, M3C1
    @20s, M4C2
    @30s, M5C1
    @40s are received on input port of Aggregator
    @0 seconds, M1 is received, timer T1 is set for 25s for correlation ID 1
    @10 seconds, M2 is received with correlation ID 2, a new timer T2 starts now for 25s for correlation ID 2.
    @20 seconds, M3 is received, since 25s have not passed after M1 (which has same correlation ID as M3) is received, it is stored.
    @25 seconds, timer for correlation ID 1(T1) goes off and messages M1 and M3 are aggregated and sent out without waiting for M5.
    @30 seconds, M4 is received and stored along with M2 since, they have same correlation ID.
    @35 seconds, T2 times out (25 seconds after M2 has arrived) M2 and M4 are aggregated and sent out.
  • Timeout
    Messages belonging to the same group are aggregated after time specified by the property Time out (ms). The grouping is done based on the property Group Message based on.

    Icon

    If Completeness Condition is set to this value, another attribute, "Time out  (ms)" will appear to provide time out in milliseconds.

  • Wait for 'N' Messages
    Messages belonging to the same group are aggregated after receiving number of messages as specified by property Completeness Message Count. The grouping is done based on the property Group Message based on.
  • Dynamic Number of Messages
    Number of messages to be aggregated is dynamic (that is, changes with time during the lifespan of the business application). This number depends on properties Aggregation Condition and Completeness Message Xpath/Property Name containing completeness condition.
  • Timeout with Dynamic number of messages
    Messages are aggregated after a time specified by property Time out (ms) or after dynamically decided number of messages are received, which ever happens earlier. This number depends on properties Aggregation Condition and Completeness Message XPath/Property Name containing completeness condition.

    Icon

    If Completeness Condition is set to this value, another attribute, "Time out  (ms)" will appear to provide time out in milliseconds.

Apply completeness on all messages

Messages corresponding to all correlation ids will be aggregated when a completeness condition is satisfied. Completeness condition is checked on all messages in cache.

  • no
    If a message with a particular correlation ID satisfies the completeness condition, then the cached messages having the same correlation ID will be aggregated and sent out.
  • yes
    If a message with a particular correlation ID satisfies the completeness condition, then the cached messages, irrespective of the correlation ID, will be aggregated and grouped based on correlation ID and sent out. Separate output will be sent for each correlation ID.

    Example
    Let Ma(Cn) denote message with Correlation ID(C) value n and Message XPath value as a and Aggregator is configured to the Completeness XPath value as X.
    Ma(C1), Mb(C2), Mb(C1), Mc(C2), Mc(C3), MX(C1), MX(C2), MX(C3) are sent to the input port of the Aggregator.
    • Value set to 'no'
      After receiving MX(C1) message (which satisfies the completeness condition), the messages Ma(C1), Mb(C1) and MX(C1) which have same correlation ID(1) will be aggregated and sent out. After receiving MX(C2) the messages Mb(C2), Mc(C2) and MX(C2) which have same correlation ID(2) will be aggregated and sent out. After receiving MX(C3), the messages Mc(C3) and MX(C3) will be aggregated and sent out.
    • Value set to 'yes'
      After receiving MX(C1) message which has the completeness xpath value X, then the messages Ma(C1), Mb(C1) and MX(C1) which have same correlation ID(1) will be aggregated and sent out. Now the cached messages Mb(C2), Mc(C2), Mc(C3) will also be aggregated based on correlation ID and sent out. Mb(C2) and Mc(C2) will be aggregated and sent out in a separate output. Mc(C3) will be sent in a separate output. After receiving MX(C2) which satisfies the completeness condition will be send out without caching. After receiving MX(C3) which satisfies the completeness condition will be send out without caching.
Timeout (ms)

Time (in milliseconds) for which the Aggregator waits, before it aggregates the messages in Cache.

Icon
  • This property is visible when we set Completeness Condition to "Timeout" or "Timeout with Dynamic number of messages".
  • The value should not be set to '0 (zero).
Completeness Message Count

This property specifies the number of messages satisfying the grouping condition after which messages should be aggregated. Default value 0 (zero) indicates infinite number of messages. 

Icon

This property is visible only when we set Completeness Condition to "Timeout with Override" or "Wait for 'N' Messages".

Completeness criteria source

This property determines criteria on which the completeness condition has to be applied.

Icon

This property will be visible and considered when the Completeness Condition is set to either "Dynamic Number of Messages" or "Timeout with Dynamic number of messages".

  • Input XML Message
    Aggregator evaluates the Xpath on the incoming XML message to determine the dynamic number of messages or to check the completeness condition. The Xpath can be provided in the property Completeness Message Xpath.
  • Event Process Context
    Aggregator evaluates the Xpath on the Application Context of an incoming XML message to determine the dynamic number of messages or to check the completeness condition. The Xpath can be provided in the property Completeness Message Xpath.
  • Document Header Property
    Aggregator checks the value of the header of the input message to determine the dynamic number of messages or to check the completeness condition. The Header name can be provided in the property Property Name containing Completeness Condition.
Aggregation Condition

Along with Completeness criteria source, this property is used to compute the dynamic number of messages to be aggregated.

Icon

This property is visible when the Completeness Condition is set to "Dynamic Number of Messages" or "Timeout with Dynamic number of messages".

  • Aggregate till count of messages
    Messages are aggregated upto dynamically decided number of messages satisfying grouping condition. The number of messages can be determined by evaluating the XPath on completeness criteria source (Input XML Message/Event Process Context/ Document Header Property) and checks if the cached messages are equal to the value determined, if equal then the cached messages are grouped based on the grouping condition and sent out. 
    Example:
    Let Completeness criteria source is set as Document Header Property and Property Name containing Completeness Condition as No_of_Messages and Apply completeness on all messages to No.
    Let MHN(Cn) - Message with Correlation ID C has a value n and the header No_of_Messages has value N.
    MH3(C1), MH1(C1), MH3(C2), MH3(C1), MH2(C2) are sent to the input port of Aggregator.
    When MH3(C1) is received the component caches the message and determines the completeness message count from the header as 3 and the cached message count as .
    Since, message count is not equal to 3 the Aggregator waits for another message.
    After receiving MH3(C1) the number of cached messages which are having correlation ID value 1 will become 3 which is equal to the header value of the message, so the messages MH3(C1), MH1(C1) and MH3(C1) will be aggregated and sent out.
    After receiving MH2(C2) the number of cached messages which are having correlation id value 2 will become 2 which is equal to the header value of the message so the messages MH3(C2) and MH2(C2) will be aggregated and sent out.
  • Aggregate till matching condition
    When this option is selected, then the completeness condition will be evaluated as follows:
    The XPath which is provided in Completeness Message Xpath/Property Name containing completeness condition will be evaluated on the Completeness Criteria Source (Input XML Message/Event Process Context/ Document Header Property) and this value is compared to the value provided in the property Matching String. If both are equal, then the Aggregator will aggregate the cached messages based on the grouping condition.
Completeness Message Xpath

This launches an Xpath Editor which can be used to configure the Xpath. If the completeness criteria source is Input XML Message, then before configuring this property, input port XSD must be specified against property Input Ports XSD to show the input XML structure in the XPath editor.

Icon

This property is visible when Completeness Condition is set to "Dynamic Number of Messages" or "Timeout with Dynamic number of messages".

Property Name containing completeness condition

This property specifies the name of Header in the input message based on which Aggregator will determine the dynamic number of messages or the completeness condition.

Icon

This property is visible when

  •  Completeness Condition is set to "Dynamic Number of Messages" or "Timeout with Dynamic number of messages"
    and
  •  Completeness criteria source as "Document Header Property".
Ignore Duplicate Messages
  • yes
    Aggregator checks for the duplicate messages and ignores the duplicate messages. 
  • no
    Aggregator caches all the messages and doesn't check if the message is duplicate or not.

The procedure to determine the duplicate message is as follows

  • Checks whether the message has a duplicate Correlation ID. If 'yes', then computes value for the property specified by Duplication Identifier.
  • Now checks if a message already exists in the cache which have same value for Duplicate Identifier. If so, the Aggregator treats this message as duplicate.
Matching String

This property specifies the string that is to be matched with the xpath/document header value. All messages satisfying the grouping condition are aggregated till a message contains the Completeness Message Xpath or Property Name containing completeness condition value matching to the Matching string property value.

Icon

This property is visible only when the property Aggregation condition is set to "Aggregator till matching condition".

Duplication Identifier

This property determines a condition to identify duplicate documents that the Aggregator should use while ignoring duplicate documents.

Icon

This property is visible only when the Ignore Duplicate Messages is set to 'yes'.


Document Identifier can be one of the following:

  • Document Header Property
    Messages having same value for the property (javax.jms.Message.getStringProperty()) are treated as duplicates. The header name can be provided in the property Header Property Name.
  • Application Context
    Messages containing same Application Context satisfying XPath defined against XPath property are treated as duplicate messages. If the XPath is empty or null then messages having same Application Context are treated as duplicates.
  • Text Body
    Messages containing same text satisfying XPath defined against XPath property are treated as duplicates. If the XPath is empty or null then messages having same text are treated as duplicates.
  • Carry Forward Context
    Messages having same value for REQUEST_ID Property of the Carry Forward Context property are treated as duplicates.
  • Senders Identification
    Messages having same value for property ESBX_SYSTEM_ SOURCE_SERVICE_INSTANCE of the Carry Forward Context property are treated as duplicates.
Duplicate XPath

In case the Duplication Identifier is either Text Body/Application Context, the Text Body or the Application Context is evaluated by using an Xpath.

Icon

This property is visible only when the Duplication Identifier is set to either "Text Body'"or "Application Context".

Header Property Name

In case the Duplication Identifier is set to Document Header Property, this value has to be set. It could be one of the names of the message properties. The Duplication Identification is done based on the value of this property.

Icon

This property is visible only when the Duplication Identifier is set to "Document Header Property".

Name Space

This property specifies the namespace to be used for the root element that encapsulates all aggregated messages. If Output Ports XSD is specified, this should be same as the target namespace of the output port XSD. This value will be set automatically when output port XSD is specified.

Group messages based on

This property determines a condition to identify similar documents that the Aggregator should use while aggregating. Documents which have same correlation ID are aggregated when the Completeness condition is satisfied.

Correlation ID can be one of the following:

  • Document ID
    Messages having same javax.jms.Message.getJMSMessageID() are aggregated together.
  • Document Correlation ID
    Messages having same javax.jms.Message.getJMSCorrelationID() are aggregated together.
  • Document Header Property
    Messages having same value for the property (javax.jms.Message.getStringProperty()) specified are aggregated. The header name can be provided in the property Header Property Name.
  • Application Context
    Messages containing Application Context satisfying XPath defined against XPath property are aggregated together. If the XPath is empty or null then messages having same Application Context are aggregated together.
  • Text Body
    Messages containing text satisfying XPath defined against XPath property are aggregated together. If the XPath is empty or null then messages having same text are aggregated together.
  • Carry Forward Context
    Messages having same value for REQUEST_ID Property of the Carry Forward Context property are aggregated together.
  • Workflow Instance ID
    Messages having same value for Property ESBX_SYSTEM_ WORK_FLOW_INST_ID in message are aggregated together. This can be used only when Document tracking is enabled.
  • None
    Messages are grouped as they are received.

    Icon

    Null Correlation IDs are not supported.

XPath

This property specifies the XPath which will be evaluated on Application Context or Text Body depends on the property Group messages based on for grouping. This will launch XPath editor to configure the XPath. If the Group messages based on is Text Body, then before configuring this property input port XSD has to be specified against the property Input Ports XSD to show the input XML structure in the XPath editor. 

Icon

This property is visible when the property Group messages based on is set to "Text Body". 

Header Property Name

The name of the property based on which grouping of messages is done.

Icon

This property is visible when the property Group messages based on is set to "Document Header Property".

Ignore messages after completion
  • yes
    Ignores messages containing correlation IDs which are already aggregated.
  • no
    Restarts aggregating messages containing correlation IDs which are already aggregated.

Example:
Aggregator is configured with completeness condition as Wait for 'N' messages and Completeness Message Count to 3.
Let Mt1(C1) denote message with correlation ID C has a value 1 and text body t1.
Mt1(C1), Mt2(C1), Mt3(C1), Mt4(C1) are sent to the input port of Aggregator. Messages Mt1(C1), Mt2(C1) and Mt3(C1) will be aggregated after receiving message Mt3(C1).

If the property Ignore messages after completion is set to 'yes', then the message Mt4(C1) will be ignored, since aggregation has been done on the correlation ID 1, and this message is also having same correlation ID so the message will be ignored.

If the property Ignore messages after completion is set to 'no', then the message Mt4(C1) will be cached, and the Aggregator waits for other messages.

Message persistence
  • yes
    Persists messages which are not aggregated into a database. If the completeness condition involves only the count of the messages, the old messages are transmitted along with the new messages when the completeness condition is satisfied. In case of timeout, if the timeout occurs when the aggregator is down, the old messages are transmitted when the aggregator is restarted; else, they are transmitted after timeout. The timeout is inclusive of the time when the aggregator was down. In case of persisting messages, message properties are not stored.
    Aggregator itself takes the responsibility of starting database and creating tables. It internally uses Mckoi database.
  • no
    Maintains all messages that are not aggregated in an inMemory data structure.
Table Name

The name of the table that stores messages received by the Aggregator. The value of this field can be left as null. In such an instance, a table with tif_ as prefix is used.

Icon

This property is visible and the table is used when Message persistence property is set to 'yes'.

Input Ports XSD

The XSD of the expected input messages.

Output Ports XSD

The XSD for the aggregated output message.

Root Element Name

Root element name that encapsulate all aggregated messages. If Output Ports XSD is specified, this should be same as the root element set in output port XSD. This value is set automatically when output port XSD is specified.

Override Message Properties
  • yes
    While aggregating messages, if the messages have a same property with different values, the property value of the last message (having this property) in the aggregation is set as the property value for the aggregated message.
  • no
    While aggregating messages, if the messages have a same property with different values, the property value of the first message (having this property) in the aggregation is set as the property value for the aggregated message.

Functional Demonstration

Scenario 1

Aggregating messages based on the timeout specified.

Configure the Aggregator as described in section Configuration and Testing component to send sample input and to check the response respectively.
In the example below, only Input ports XSD is set to chat schema (can use Fetch from Schema for this) in Aggregator Custom Property Sheet and all the remaining properties are left as default.


Figure 2: Demonstrating Scenario 1

On timeout, the aggregated messages are sent from the output port to the Display component.

Scenario 2

This scenario explains the usage of Dynamic number of messages completeness condition.
Configure the Aggregator as described in section Configuration and Testing and use chat and display component to send sample input and to check the response respectively.

In the example below, Aggregator is configured in such a way that if it receives a message Fiorano at the input port, it aggregates and sends the messages to the output port. The Figure 3 shows the configuration.


Figure 3: Sample Configuration used in Scenario 2


Figure 4: Scenario demonstration with sample input and output

Useful Tips

  • When using the persist option, the database is local to the machine on which the Peer Server is running. In case of a Peer Server failover, the component creates a new table and therefore messages cannot be recovered.
  • When using header property for grouping messages, properties with names like/starting with ESBX_SYSTEM_* cannot be used.
Icon

To understand the service better, refer the Aggregation example which demonstrates the Aggregator service features.

Adaptavist ThemeBuilder EngineAtlassian Confluence