Difference between toolkit v1.x and v2.x
Introduction
The toolkit version 2 has two potentially breaking changes, which are tracked by following issues in Github for the underlaying Kafka-Toolkit:
These changes affect the MessageHubConsumer operator usage outside a consistent region,
and only when the startPosition parameter is used with a value different from Default
.
There are no changes when
- the consumer operator is used in a consistent region, or
- the consumer operator has an input port, or
- the consumer operator is not configured with startPosition parameter, or
- the startPosition parameter has the value
Default
.
(1) Enable group management also with start position when not in consistent region
Let’s assume following SPL code:
stream <MessageType.StringMessage> Messages = MessageHubConsumer() {
param
credentials: $myServiceCredentials;
topic: "test";
groupId: "myConsumerGroup";
startPosition: End;
}
Behaviour with MessageHub-Toolkit version 1.x:
The startPosition: End;
would disable group management, i.e. all partitions of the test topic would be consumed by this operator invocation. A second identical operator invocation (same group-Id) would also consume all partitions of the test topic. The two operators would not share the partitions of the test topic.
Behaviour with MessageHub-Toolkit version 2.x:
Because a group-ID is given by the user, the operator would enable group management, i.e. it would share the partitions with a second identical operator invocation (same group-Id). For coordination between the operators of the group, when to seek to the end, the application would require a JobControlPlane operator in its graph. The SPL code that was running with Kafka toolkit version 1.x will not run with toolkit version 2.x as long as there is no JobControlPlane in the graph.
Getting the 1.x behavior with the 2.x toolkit
The v2 toolkit consumer operator goes into group management mode (i.e. the consumer subscribes rather than self-assignes) when a group identifier is given by the user, either as operator parameter or as group.id
consumer config in a file or application configuration. To get the previous behavior back, the user must not specify a group identifier. The JobControlPlane operator is needed nevertheless.
(2) After restart, fetch from default fetch position when not in a CR
Let’s assume following SPL code:
@parallel (width = 10) // 10 parallel channels for 10 partitions
stream <MessageType.StringMessage> Messages = MessageHubConsumer() {
param
credentials: $myServiceCredentials;
topic: "test";
startPosition: Beginning;
partition: getChannel();
// Note: every consumer will have a unique group-ID,
// generated by the operator itself
}
Behaviour with MessageHub-Toolkit version 1.x:
The partition parameter always disables group management. Each consumer in the parallel region is pinned to one partition, consuming initially from the first available offset in each partition. When the region is extended (all PEs are re-created) or a consumer’s PE restarts, every restarted consumer fetches again from the beginning of its assigned partition.
Behaviour with MessageHub-Toolkit version 2.x:
The behavioural difference to toolkit version 1.x would affect the restart. At operator initialization each consumer decides to seek or not to seek the fetch position of every assigned topic partition to what startPosition is. When an offset has been committed for a topic partition during the lifetime of the application, the fetch position after restart will be the last committed offset, otherwise the fetch position will be seeked to what startPosition is, in the code example above to Beginning
.
This decision to seek or not to seek requires a JobControlPlane operator in the application graph. Without a JobControlPlane, the application will no longer run.
Getting the 1.x behavior with the 2.x toolkit
The 1.x behaviour is treated as not desirable. There is no option to get it back.