Usecase: Kafka Consumer Group

Edit me

Overview

Multiple MessageHubConsumer operators consume from the same topic(s) where the topic partitions are automatically distributed over the consumer operators.

  • Continual processing of messages from all partitions in the event of failure
  • No assumption about which partition is consumed by which consumer operator, thus no guarantee that a message with key ‘K’ will be processed by the same operator.
  • When partitions are added to the subscribed topic, these new partitions will be automatically assigned to one of the consumers in the group.

Details

N Consumer operators within a single streams graph (using UDP or manually added to graph) have the same consumer group identifier (Kafka consumer property group.id) accessing M partitions where (typically) N <= M.

Event Streams will:

  • automatically assign partitions to operators
  • re-assign partitions during a failure

If an operator or resource Cx fails then while the operator is restarting messages in the partition previously assigned to Cx will continue to be consumed by reassigning the partition to an existing operator Cy. When Cx recovers, it will re-join the group, and the partitions will be re-balanced.

More operators than partitions can be used (N > M) with N-M operators being idle until a failure occurs and they get (potentially) reassigned by the broker.

Examples with six partitions:

  • Three operators
    • each will pick two partitions (1/3 of the messages assuming even load distribution over partitions) during normal operation
    • when one operator is stopped or down for a longer period of time, the six partitions are distributed across the two remaining operators, each with three
    • Once the failed operator restarts, the partitions will again be redistributed across the three operators
  • Six operators
    • each operator will pick one partition during normal operation, processing 1/6 of message volume
    • on failure of one operator, one of the remaining five will take over the partition, one of the operators will consume two partitions, the other four one.
  • Seven operators
    • six operators will pick up a partition each, one operator will be idle.
    • on failure of one operator, the idle operator takes the partition of the failed one.

Partition de-assignment and re-assignment can happen when

  • Group management related timers expire, for example the heart-beet timeout session.timeout.ms or the poll timeout max.poll.interval.ms.
  • The broker node being the group coordinator goes down
  • Meta data of the subscribed topic changes, for example the number of partitions

Partition re-assignment makes the consumer replay Kafka messages beginning with last committed offsets.

Pros and Contras

  • Pro: High volume by having multiple operators reading messages in parallel from partitions
  • Pro: Takeover of partitions from failed or stopped consumers by other members of the consumer group.
  • Pro: No manual assignment of partitions, any number of operators will always correctly read all messages.
  • Con: Keyed messages may be handled by any operator after failure and reassignment. As a workaround, the messages can be repartitioned by the message key in the Streams application with abutting parallel region.

Guaranteed processing

  • Consistent region: Supported (periodic only)
  • Checkpointing via config checkpoint: Supported, but ignored. The operator does not save any data.

When the MessageHubConsumer operator is used in a consistent region, at least once processing through the Streams application is guaranteed. Without a consistent region, tuples can get lost within the Streams application when a PE restarts.

Operator configuration

Parameters / consumer properties

  • No assignment of partitions is configured through the partition operator parameter.
  • A group identifier must be specified either by the consumer property group.id, or by using the groupId parameter, which would have precedence over a bare property.
  • When not in a consistent region, the startPosition parameter must not be specified or must have the value Default (for toolkit versions < 2.0). Toolkit versions >= 2.0 also support Beginning, End, and Time.
  • The startPosition parameter must not be Offset.

Operator placement

Invocations of MessageHubConsumer operators should be exlocated from each other (separate PEs) to ensure upon failure multiple consumers are not taken out.

Consistent region

The consumer group must not have consumers outside of the consistent region.

Multiple copies

  • Create a composite containing the MessageHubConsumer invocation if you want to add more operators to the parallel channel besides the MessageHubConsumer
  • Annotate MessageHubConsumer or composite invocation with @parallel with width N (e.g. width = 3 to handle 6 partitions).

or

  • Invoke N copies of the MessageHubConsumer operator.

Examples

Without consistent region

public composite ConsumerGroup {
param
    expression <int32> $N: (int32) getSubmissionTimeValue ("consumerGroupSize", "3");
graph
    @parallel (width = $N)
    stream <rstring json, rstring messageKey> Messages = MessageHubConsumer() {
        param
            appConfigName: "ConsumeEventStreams";
            topic: "myTopic";
            groupId: "myConsumerGroup";
            outputMessageAttributeName: "json";
            outputKeyAttributeName: "messageKey";
            commitPeriod: 10.0;      // commit offsets every 10 seconds
        config placement: partitionExlocation ("A");
    }

    // do partitioned processing in Streams
    // messages with same key go always into the same parallel channel
    @parallel (width = 4, partitionBy = [{port = Messages, attributes = [messageKey]}])
    stream <rstring json> Processed = ProcessingLogic (Messages) {
    }
}

public composite ProcessingLogic (input In; output Out) {
graph
    ...
}

Consumer group in a consistent region, group-ID specified in the application configuration

public composite ConsumerGroupCR {
param
    expression <int32> $N: (int32) getSubmissionTimeValue ("consumerGroupSize", "3");
graph
    () as JCP = JobControlPlane() {}

    @consistent (trigger = periodic, period = 60.0, drainTimeout = 300.0, maxConsecutiveResetAttempts = 10)
    @parallel (width = $N)
    stream <rstring json, rstring messageKey> Messages = MessageHubConsumer() {
        param
            // always specify the app config name when it contains Kafka properties like 'group.id',
            // also when the default name 'eventstreams' is used.
            appConfigName: "ConsumeEventStreams";
            topic: "myTopic";
            outputMessageAttributeName: "json";
            outputKeyAttributeName: "messageKey";
        config placement: partitionExlocation ("A");
    }

    // do partitioned processing in Streams
    // messages with same key go always into the same parallel channel
    @parallel (width = 4, partitionBy = [{port = Messages, attributes = [messageKey]}])
    stream <rstring json> Processed = Processing (Messages) {
    }
}

public composite Processing (input In; output Out) {
graph
    ...
}

In this example, the groupId parameter is not used. Then the application configuration ConsumeEventStreams must contain a property with name group.id and the group identifier as the value in addition to eventstreams.creds, for example

property name property value
eventstreams.creds { “api_key”: “Tv39…eT”, …, …, “user”: “token” }
group.id myConsumerGroup

Note: For toolkit versions < 2.0, add the property messagehub.creds to the application configuration to specify the Event Streams service credentials.

Updated: