Usecase: Consume All Partitions
Overview
A single MessageHubConsumer
operator consumes all messages from a topic regardless of the number of partitions.
Details
Without a partition specification, the operator will consume from all partitions of the topic. The partitions of the subscribed topic are assigned by Kafka, and the operator represents a consumer group with only one member. When no group identifier is specified, the consumer operator will create an identifier. With group management enabled, the operator will automatically be assigned new partitions, when partitions are added to the topic. On the other side, partition de-assignment and re-assignment can happen when
- Group management related timers expire
- The broker node being the group coordinator goes down
- Meta data of the subscribed topic changes, for example the number of partitions
Partition re-assignment makes the consumer replay Kafka messages beginning with last committed offsets.
Note: MessageHub toolkits with version below 3.0 behave different when no group identifier is specified:
With toolkit version below 3.0, the consumer operator self-assignes all partitions of the topic, which are seen at startup. When new partitions are added to the topic, the PE that contains the operator must be restarted to read also added partitions.
Pros and Contras
- Pro: Very simple
- Con: Volume is limited by a single operator reading messages from all partitions
Guaranteed processing
- Consistent region: Supported (periodic and operator driven)
- Checkpointing via
config checkpoint
: Supported, but ignored. The operator does not save any data.
When the operator is used in a consistent region, at least once processing through the Streams application is guaranteed. Without a consistent region, tuples can get lost within the Streams application when a PE restarts.
Operator configuration
No assignment of partitions is configured through the partition operator parameter (partition parameter not used).
Examples
Consume messages without a key
public composite ConsumeAllPartitions {
graph
stream <rstring json> Messages = MessageHubConsumer() {
param
// always specify the app config name when the app config
// contains Kafka properties besides the credentials
appConfigName: "ConsumeEventStreams";
topic: "myTopic";
outputMessageAttributeName: "json";
}
...
}
Consume keyed messages within an operator driven consistent region
public composite ConsumeAllPartitionsCrOperatorDriven {
graph
() as JCP = JobControlPlane() {}
@consistent (trigger = operatorDriven)
stream <rstring json, rstring messageKey> Messages = MessageHubConsumer() {
param
appConfigName: "ConsumeEventStreams";
topic: "myTopic";
outputMessageAttributeName: "json";
outputKeyAttributeName: "messageKey";
triggerCount: 10000; // make the region consistent every 10000 tuples
}
...
}
Consume keyed messages within a periodic consistent region
public composite ConsumeAllPartitionsCrPeriodic {
graph
() as JCP = JobControlPlane() {}
@consistent (trigger = periodic, period = 60.0)
stream <MessageType.StringMessage> Messages = MessageHubConsumer() {
// MessageType.StringMessage is tuple<rstring message, rstring key>
param
appConfigName: "ConsumeEventStreams";
topic: "myTopic";
}
...
}
message
and key
are the default attribute names for the Kafka message and the key. They need not be specified as output attribute names.