KinesisProducer microservice is used to consume data records to Amazon Kinesis Streams.
Configuration and Testing
Component Configurations
The following attributes can be configured in the Interaction Configuration panel as shown below.
Figure 1: Component Configuration Properties
Post Processing XSL Configuration
Post Processing XSL configuration can be used to transform response message before sending it to the output port.
Validate Input
If this attribute is enabled, the service tries to validate the input received. If disabled, service will not validate the input. For more details, refer Validate Input section under Interaction Configurations in Common Configurations page.
Error handling configuration
The remedial actions to be taken when a particular error occurs can be configured using this attribute.
Click the ellipsis button against this property to configure Error Handling properties for different types of Errors. By default, the options Log to error logs, Stop service and Send to error port are enabled.
AWS Connection Details
Click the AWS Connection Details ellipsis button to configure the properties.
Access key
Specify the Access Key id
Secret key
Specify the Secret Key
Region
The region of AWS Management console.
Kinesis Stream Name
Name of the Kinesis Stream from which the data records are read.
Application Name
Kinesis Consumer Library uses a unique Amazon Dynamo DB Table to keep track of the application state. It uses the application name to create the table; each Application Name must be unique.
Initial Position In Stream
Used to specify the position in the stream where a new application should start from.
- LATEST: Fetches you always read the most recent data in the shard
- TRIM_HORIZON: Gets the record of last untrimmed record in the shard in the system (the oldest data record in the shard)
Scenario: Send 10 data records to the stream when Consumer is inactive.
Now starting the consumer in LATEST mode will not fetch you any records.
If the consumer is in TRIM_HORIZON mode, all the unread messages(10 in this case) are retrieved from the stream.
Payload Data Type
Data blob is the data of interest your data producer adds to a stream. The maximum size of a data blob is 1 megabyte (MB) .The base64 encoded Data blob is called Payload Data Type
Specify the required output type. Data is interpreted as UTF-8 characters if string is selected.
- String
- Bytes
Check Point Interval (ms)
Check Point Interval in milliseconds is the interval at the state of the streams are internally stored in Dynamo Table
Number of Retries
Number of times the Service should try for checkpointing / Processing records in whichever case an exception occurs.
Back Off Time (ms)
The interval between the consecutive retries of checkpointing / Processing records in whichever case an exception occurs.
Threadpool Configuration
This property is used when there is a need to process messages in parallel within the component, still maintaining the sequence from the external perspective.
Click the Threadpool Configuration ellipsis button to configure the Threadpool Configuration properties.
Enable Thread Pool
Enable this option to configure the properties that appear as below.
Pool Size
Number of requests to be processed in parallel within the component. Default value is '1'.
Batch Eviction Interval (in ms)
Time in milliseconds after which the threads are evicted in case of inactivity. New threads are created in place of evicted threads when new requests are received. Default value is '1000'.
Functional Demonstration
The following flow demonstrates a flow of Kinesis Consumer ,which fetches the data from the provided StreamName
Output Message
The output message consists of following elements
Data
Data retrieved from the stream
Partition key
A partition key is used to group data by shard within a stream. The Streams service segregates the data records belonging to a stream into multiple shards, using the partition key associated with each data record to determine which shard a given data record belongs to.
Sequence Number
Sequence number defines the sequence of Records in a particular shard
Each data record has a unique sequence number.
ApproximateArrivalTimeStamp
Approximate time stamp of the record after retrieving from the kinesis stream.