Difference between Kafka toolkit v1.x and v2.x

Edit me

Introduction

The toolkit version 2 has two potentially breaking changes, which are tracked by following issues in Github:

  1. Group management with initial fetch position: issue 94
  2. Operator restart without seek: issue 107

These changes affect the KafkaConsumer operator usage outside a consistent region, and only when the startPosition parameter is used with a value different from Default. There are no changes when

  • the consumer operator is used in a consistent region, or
  • the consumer operator has an input port, or
  • the consumer operator is not configured with startPosition parameter, or
  • the startPosition parameter has the value Default.

(1) Enable group management also with start position when not in consistent region

Let’s assume following SPL code:

stream <MessageType.StringMessage> Messages = KafkaConsumer() {
    param
        propertiesFile: "etc/consumer.properties";  // assume no group.id in the file
        topic: "test";
        groupId: "myConsumerGroup";
        startPosition: End;
}

Behaviour with Kafka-Toolkit version 1.x:

The startPosition: End; would disable group management, i.e. all partitions of the test topic would be consumed by this operator invocation. A second identical operator invocation (same group-Id) would also consume all partitions of the test topic. The two operators would not share the partitions of the test topic.

Behaviour with Kafka-Toolkit version 2.x:

Because a group-ID is given by the user, the operator would enable group management, i.e. it would share the partitions with a second identical operator invocation (same group-Id). For coordination between the operators of the group, when to seek to the end of every single topic partition, the application requires a JobControlPlane operator in its graph. The SPL code that was running with Kafka toolkit version 1.x will not run with toolkit version 2.x as long as there is no JobControlPlane in the graph.

Getting the 1.x behavior with the 2.x toolkit

The v2 toolkit consumer operator goes into group management mode (i.e. the consumer subscribes rather than self-assignes the topic partitions) when a group identifier is given by the user, either as operator parameter or as the group.id consumer config in a file or application configuration. To get the previous behavior back, the user must not specify a group identifier. The JobControlPlane operator is needed nevertheless because of changed PE re-launch behaviour.

(2) After PE re-launch, fetch from default fetch position when not in a CR

Let’s assume following SPL code:

@parallel (width = 10)     // 10 parallel channels for 10 partitions
stream <MessageType.StringMessage> Messages = KafkaConsumer() {
    param
        propertiesFile: "etc/consumer.properties";  // assume no group.id in the file
        topic: "test";
        startPosition: Beginning;
        partition: getChannel();
        // Note: every consumer will have a unique group-ID,
        // generated by the operator itself
}

Behaviour with Kafka-Toolkit version 1.x:

The partition parameter always disables group management. Each consumer in the parallel region is pinned to one partition, consuming initially from the first available offset in each partition. When the region is extended (all PEs are re-created) or a consumer’s PE restarts, every restarted consumer fetches again from the beginning of its assigned partition. When startPosition is Beginning a huge amount of tuples can be replayed, with End as the startPosition, messages inserted during re-launch get lost.

Behaviour with Kafka-Toolkit version 2.x:

The behavioural difference to toolkit version 1.x would affect the restart. At operator initialization each consumer decides to seek or not to seek the fetch position of every assigned topic partition to what startPosition is. When an offset has been committed for a topic partition during the lifetime of the application, the fetch position after restart will be the last committed offset, otherwise the fetch position will be seeked to what startPosition is, in the code example above to Beginning. This decision to seek or not to seek requires a JobControlPlane operator in the application graph. Without a JobControlPlane, the application will no longer run.

Getting the 1.x behavior with the 2.x toolkit

The 1.x behaviour is treated as not desirable. There is no option to get it back.

Updated: