Usecase: Kafka Consumer Group
Overview
Multiple KafkaConsumer
operators consume from the same topic(s) where the topic
partitions are automatically distributed over the consumer operators.
- Continual processing of messages from all partitions in the event of failure
- No assumption about which partition is consumed by which consumer operator, thus no guarantee that a message with key âKâ will be processed by the same operator.
- When partitions are added to the subscribed topic, these new partitions will be automatically assigned to one of the consumers in the group.
Details
N
Consumer operators within a single streams graph (using UDP or manually added to graph)
have the same consumer group id (Kafka property group.id
) accessing M
partitions where (typically) N <= M.
Kafka will:
- automatically assign operators to partitions
- reassign partitions during a failure
If an operator or resource Cx
fails then while the operator is restarting messages in the
partition previously assigned to Cx
will continue to be consumed by reassigning the partition to an existing
operator Cy
. When Cx
recovers, it will re-join the group, and the partitions will be re-balanced.
More operators than partitions can be used (N > M) with N-M operators being idle until a failure occurs and they get (potentially) reassigned by the broker.
Examples with six partitions:
- Three operators
- each will pick two partitions (1/3 of the messages assuming even load distribution over partitions) during normal operation
- when one operator is stopped or down for a longer period of time, the six partitions are distributed across the two remaining operators, each with three
- Once the failed operator restarts, the partitions will again be redistributed across the three operators
- Six operators
- each operator will pick one partition during normal operation, processing 1/6 of message volume
- on failure of one operator, one of the remaining five will take over the partition, one of the operators will consume two partitions, the other four one.
- Seven operators
- six operators will pick up a partition each, one operator will be idle.
- on failure of one operator, the idle operator takes the partition of the failed one.
Partition de-assignment and re-assignment can happen when
- Group management related timers expire, for example the heart-beat timeout
session.timeout.ms
or the poll timeoutmax.poll.interval.ms
. - The broker node being the group coordinator goes down
- Meta data of the subscribed topic changes, for example the number of partitions
Partition re-assignment makes the consumer replay Kafka messages beginning with last committed offsets.
Pros and Contras
- Pro: High volume by having multiple operators reading messages in parallel from partitions
- Pro: Takeover of partitions from failed or stopped consumers by other members of the consumer group.
- Pro: No manual assignment of partitions, any number of operators will always correctly read all messages.
- Con: Keyed messages may be handled by any operator after failure and reassignment. As a workaround, the messages can be repartitioned by the message key in the Streams application with abutting parallel region.
Guaranteed processing
- Consistent region: Supported (periodic only)
- Checkpointing via
config checkpoint
: Supported, but ignored. The operator does not save any data.
When the operator is used in a consistent region, at-least-once processing through the Streams application is guaranteed. Without a consistent region, tuples can get lost within the Streams application when a PE restarts.
Operator configuration
Parameters / consumer properties
- No assignment of partitions is configured through the partition operator parameter.
- A group identifier must be specified either by the consumer property
group.id
, or by using the groupId parameter, which would have precedence over a bare property. All operators of the consumer group must share this group identifier. - When not in a consistent region, the startPosition parameter must not be specified or must
have the value
Default
(for toolkit versions < 2.0). Toolkit versions >= 2.0 also supportBeginning
,End
, andTime
. - The startPosition parameter must not be
Offset
.
Operator placement
Invocations of consumer operators should be exlocated from each other (separate PEs) to ensure upon failure multiple consumers are not taken out.
Consistent region
The consumer group must not have consumers outside of the consistent region.
Multiple copies
- Create a composite containing the
KafkaConsumer
invocation if you want to add more operators to the parallel channel besides theKafkaConsumer
- Annotate
KafkaConsumer
or the composite invocation with@parallel
with width N (e.g.width = 3
to handle 6 partitions).
or
- Invoke N copies of the
KafkaConsumer
operator.
Examples
Without consistent region
public composite ConsumerGroup {
param
expression <int32> $N: (int32) getSubmissionTimeValue ("consumerGroupSize", "3");
graph
@parallel (width = $N)
stream <rstring json, rstring messageKey> Messages = KafkaConsumer() {
param
propertiesFile: "etc/consumer.properties";
topic: "myTopic";
groupId: "myConsumerGroup";
outputMessageAttributeName: "json";
outputKeyAttributeName: "messageKey";
commitPeriopd: 10.0; // commit offsets every 10 seconds
config placement: partitionExlocation ("A");
}
// do partitioned processing in Streams
// messages with same key go always into the same parallel channel
@parallel (width = 4, partitionBy = [{port = Messages, attributes = [messageKey]}])
stream <rstring json> Processed = Processing (Messages) {
}
...
}
public composite Processing (input In; output Out) {
graph
...
}
When using the startPosition parameter with Beginning
, End
, or Time
,
the application graph must contain a JobControlPlane operator, and the group must not have
consumers outside of the Streams application.
Consumer group in a consistent region, group-ID specified in property file
public composite ConsumerGroupCR {
param
expression <int32> $N: (int32) getSubmissionTimeValue ("consumerGroupSize", "3");
graph
() as JCP = JobControlPlane() {}
@consistent (trigger = periodic, period = 60.0, drainTimeout = 300.0, maxConsecutiveResetAttempts = 10)
@parallel (width = $N)
stream <rstring json, rstring messageKey> Messages = KafkaConsumer() {
param
propertiesFile: "etc/consumer.properties";
topic: "myTopic";
outputMessageAttributeName: "json";
outputKeyAttributeName: "messageKey";
config placement: partitionExlocation ("A");
}
// do partitioned processing in Streams
// messages with same key go always into the same parallel channel
@parallel (width = 4, partitionBy = [{port = Messages, attributes = [messageKey]}])
stream <rstring json> Processed = Processing (Messages) {
}
...
}
public composite Processing (input In; output Out) {
graph
...
}
As the groupId parameter is not used, the etc/consumer.properties
file must contain a line with
group.id=myConsumerGroup
besides the other properties, like bootstrap.servers
.