Usecase: Consume All Partitions
Overview
A single KafkaConsumer
operator consumes all messages from a topic regardless of the number of partitions.
Details
Without a partition specification, the operator will consume from all partitions of the topic. The partitions of the subscribed topic are assigned by Kafka, and the operator represents a consumer group with only one member. When no group identifier is specified, the consumer operator creates an identifier. The operator will automatically be assigned new partitions, when partitions are added to the topic. On the other side, partition de-assignment and re-assignment can happen when
- Group management related timers expire
- The broker node being the group coordinator goes down
- Meta data of the subscribed topic changes, for example the number of partitions
Partition re-assignment makes the consumer replay Kafka messages beginning with last committed offsets.
Pros and Contras
- Pro: Very simple
- Con: Volume is limited by a single operator reading messages from all partitions
Guaranteed processing
- Consistent region: Supported (periodic and operator driven)
- Checkpointing via
config checkpoint
: Supported, but ignored. The operator does not save any data.
When the operator is used in a consistent region, at least once processing through the Streams application is guaranteed. Without a consistent region, tuples can get lost within the Streams application when a PE restarts.
Operator configuration
No assignment of partitions is configured through the partition operator parameter.
Examples
Consume messages without a key
stream <rstring json> Messages = KafkaConsumer() {
param
propertiesFile: "etc/consumer.properties";
topic: "myTopic";
outputMessageAttributeName: "json";
}
Consume keyed messages within an operator driven consistent region
() as JCP = JobControlPlane() {}
@consistent (trigger = operatorDriven)
stream <rstring json, rstring messageKey> Messages = KafkaConsumer() {
param
propertiesFile: "etc/consumer.properties";
topic: "myTopic";
outputMessageAttributeName: "json";
outputKeyAttributeName: "messageKey";
triggerCount: 10000; // make the region consistent every 10000 tuples
}
Consume keyed messages within a periodic consistent region
() as JCP = JobControlPlane() {}
@consistent (trigger = periodic, period = 60.0)
stream <rstring message, rstring key> Messages = KafkaConsumer() {
param
propertiesFile: "etc/consumer.properties";
topic: "myTopic";
}
message
and key
are the default attribute names for the Kafka message and the key. They need not be specified.