KinesisConsumer microservice is used to consume data records from Amazon Kinesis Streams.

Configuration and Testing

Component Configurations

The attributes can be configured in the Configuration Properties Sheet (CPS) as below.


Figure 1: Component Configuration Properties

Post Processing XSL Configuration

Post Processing XSL configuration can be used to transform the response message before sending it to the output port.

Icon

As mentioned in the above section, refer to the Pre/Post Processing XSL Configuration section under the Common Configurations topic for details regarding XSL configuration.

Validate Input

If this attribute is enabled, the service tries to validate the input received. If disabled, service will not validate the input. For more details, refer Validate Input section under Interaction Configurations in Common Configurations page.

Icon

Performance increases Validate Input option is disabled, but it may cause undesired results in case the input XML is not valid.

Error handling configuration

The remedial actions to be taken when a particular error occurs can be configured using this attribute. 

Click the ellipsis button against this property to configure Error Handling properties for different types of Errors. By default, the options Log to error logs, Stop service and Send to error port are enabled.

AWS Connection Details

Click the AWS Connection Details ellipsis  button to configure the properties.


Figure 2: KinesisConsumer connection configuration properties

Access key 

Specify the Access Key id

Secret key

Specify the Secret Key

Region

The region of AWS Management console.

Kinesis Stream Name

Name of the Kinesis Stream from which the data records are read.

Application Name

Kinesis Consumer Library uses a unique Amazon Dynamo DB Table to keep track of the application state. It uses the application name to create the table; each Application Name must be unique.

Initial Position In Stream

Used to specify the position in the stream where a new application should start from.


Figure 3: Initial Position In Stream property options

  • LATEST: Fetches you always read the most recent data in the shard
  • TRIM_HORIZON: Gets the record of last untrimmed record in the shard in the system (the oldest data record in the shard)

Scenario: Send 10 data records to the stream when Consumer is inactive.

Now starting the consumer in LATEST mode will not fetch you any records.

If the consumer is in TRIM_HORIZON mode, all the unread messages(10 in this case) are retrieved from the stream.

Payload Data Type

Data blob is the data of interest your data producer adds to a stream. The maximum size of a data blob is 1 megabyte (MB) .The base64 encoded Data blob is called Payload Data Type

Specify the required output type. Data is interpreted as UTF-8 characters if string is selected.


Figure 4: Payload Data Type property options

  • String
  • Bytes

Check Point Interval (ms)

Check Point Interval in milliseconds is the interval at the state of the streams are internally stored in Dynamo Table

Number of Retries

Number of times the Service should try for checkpointing / Processing records in whichever case an exception occurs.

Back Off Time (ms)

The interval between the consecutive retries of checkpointing / Processing records in whichever case an exception occurs.

Threadpool Configuration

This property is used when there is a need to process messages in parallel within the component, still maintaining the sequence from the external perspective. 

Icon
  • This needs to be used only in such circumstances.
  • If sequential processing is not required, please use sessions on the input port.

Click the Threadpool Configuration ellipsis button to configure the Threadpool Configuration properties. 


Figure 5: KinesisConsumer Threadpool configuration properties

Enable Thread Pool

Enable this option to configure the properties that appear as below.

Pool Size

Number of requests to be processed in parallel within the component. Default value is '1'.

Batch Eviction Interval (in ms)

Time in milliseconds after which the threads are evicted in case of inactivity. New threads are created in place of evicted threads when new requests are received. Default value is '1000'.

Functional Demonstration

The following flow demonstrates a flow of Kinesis Consumer, which fetches the data from the provided StreamName.


Figure 6: KinesisConsumer sample flow

Output Message

The output message consists of following elements

Data

Data retrieved from the stream

Partition key

A partition key is used to group data by shard within a stream. The Streams service segregates the data records belonging to a stream into multiple shards, using the partition key associated with each data record to determine which shard a given data record belongs to.

Sequence Number

Sequence number defines the sequence of Records in a particular shard

Each data record has a unique sequence number.

ApproximateArrivalTimeStamp

Approximate time stamp of the record after retrieving from the kinesis stream.


Figure 7: Output displayed in the Display window

Adaptavist ThemeBuilder EngineAtlassian Confluence