Operator KafkaConsumer

Kafka Toolkit > com.ibm.streamsx.kafka 1.3.0 > com.ibm.streamsx.kafka > KafkaConsumer

The KafkaConsumer operator is used to consume messages from Kafka topics. The operator can be configured to consume messages from one or more topics, as well as consume messages from specific partitions within topics.

Supported Kafka Version

This version of the toolkit supports Apache Kafka v0.10.2, v0.11.x, and v1.0.x.

Kafka Properties

The operator implements Kafka's KafkaConsumer API. As a result, it supports all Kafka properties that are supported by the underlying API. Properties can be specified in a file or in an application configuration. If specifying properties via a file, the propertiesFile parameter can be used. If specifying properties in an application configuration, the name of the application configuration can be specified using the appConfigName parameter.

The only property that the user is required to set is the bootstrap.servers property, which points to the Kafka brokers. All other properties are optional. The operator sets some properties by default to enable users to quickly get started with the operator. The following lists which properties the operator sets by default:

Property Name

Default Value

client.id

Randomly generated ID in the form: client-<random_string>

group.id

Randomly generated ID in the form: group-<random_string>

key.deserializer

See Automatic deserialization section below

value.deserializer

See Automatic deserialization section below

auto.commit.enable

false

NOTE: Users can override any of the above properties by explicitly setting the property value in either a properties file or in an application configuration.

Automatic Deserialization

The operator will automatically select the appropriate deserializers for the key and message based on their types. The following table outlines which deserializer will be used given a particular type:

Deserializer

SPL Types

org.apache.kafka.common.serialization.StringDeserializer

rstring

org.apache.kafka.common.serialization.IntegerDeserializer

int32, uint32

org.apache.kafka.common.serialization.LongDeserializer

int64, uint64

org.apache.kafka.common.serialization.FloatDeserializer

float32

org.apache.kafka.common.serialization.DoubleDeserializer

float64

org.apache.kafka.common.serialization.ByteArrayDeserializer

blob

These deserializers are wrapped by extensions that catch exceptions of type org.apache.kafka.common.errors.SerializationException to allow the operator to skip over malformed messages. The used extensions do not modify the actual deserialization function of the given base deserializers from the above table.

Users can override this behaviour and specify which deserializer to use by setting the key.deserializer and value.deserializer properties.

Committing received Kafka messages

As default, the operator sets the consusmer property auto.commit.enable to false and commits every received batch of messages after appending the messages to an internal queue. When users specify the value true for the auto.commit.enable property, the operator uses the auto-commit function of the Kafka client.

Kafka's Group Management

The operator is capable of taking advantage of Kafka's group management functionality. In order for the operator to use this functionality, the following requirements must be met

  • The operator cannot be in a consistent region
  • The startPosition parameter value cannot be Beginning (must be End or not specified)
  • None of the topics specified by the topics parameter can specify which partition to be assigned to

In addition to the above, the application needs to set the group.id Kafka property or the groupId parameter in order to assign the KafkaConsumer to a specific group.

Consistent Region Support

The KafkaConsumer operator can participate in a consistent region. The operator can be the start of a consistent region. Both operator driven and periodic checkpointing are supported. If using operator driven, the triggerCount parameter must be set to indicate how often the operator should initiate a consistent region. On checkpoint, the operator will save the last offset for each topic-partition that it is assigned to. In the event of a reset, the operator will seek to the saved offset for each topic-partition and begin consuming messages from that point.

Error Handling

Many exceptions thrown by the underlying Kafka API are considered fatal. In the event that Kafa throws an exception, the operator will restart.

Summary

Ports
This operator has 1 input port and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 17 parameters.

Optional: appConfigName, clientId, groupId, outputKeyAttributeName, outputMessageAttributeName, outputOffsetAttributeName, outputPartitionAttributeName, outputTimestampAttributeName, outputTopicAttributeName, partition, propertiesFile, startOffset, startPosition, startTime, topic, triggerCount, userLib

Metrics
This operator reports 1 metrics.

Properties

Implementation
Java

Input Ports

Ports (0)
This port is used to specify the topic-partition offsets that the consumer should begin reading messages from. When this port is specified, the operator will ignore the topic, partition and startPosition parameters. The operator will only begin consuming messages once a tuple is received on this port. Each tuple received on this port will cause the operator to seek to the offsets for the specified topic-partitions. This works as follows:
  • To seek to the beginning of a topic-partition, set the value of the offset to -1.
  • To seek to the end of a topic-partition, set the value of the offset attribute to -2.
  • Any other value will cause the operator to seek to that offset value. If that value does not exist, then the operator will use the auto.offset.reset policy to determine where to begin reading messages from.

This input port must contain a single rstring attribute. In order to add or remove a topic partition, the attribute must contain a JSON string in the following format:

{
  "action" : "ADD" or "REMOVE"
  "topicPartitionOffsets" : [
    {
      "topic" : "topic-name",
      "partition" : <partition_number>,
      "offset" : <offset_number>
    },
    ...
  ]
}

The following convenience functions are available to aid in creating the messages:

  • rstring addTopicPartitionMessage(rstring topic, int32 partition, int64 offset);
  • rstring addTopicPartitionMessage(list<tuple<rstring topic, int32 partition, int64 offset>> topicPartitionsToAdd);

  • rstring removeTopicPartitionMessage(rstring topic, int32 partition);

  • rstring removeTopicPartitionMessage(list<tuple<rstring topic, int32 partition>> topicPartitionsToRemove);

Properties

Output Ports

Assignments
Java operators do not support output assignments.
Ports (0)

This port produces tuples based on records read from the Kafka topic(s). A tuple will be output for each record read from the Kafka topic(s).

Properties

Parameters

This operator supports 17 parameters.

Optional: appConfigName, clientId, groupId, outputKeyAttributeName, outputMessageAttributeName, outputOffsetAttributeName, outputPartitionAttributeName, outputTimestampAttributeName, outputTopicAttributeName, partition, propertiesFile, startOffset, startPosition, startTime, topic, triggerCount, userLib

appConfigName

Specifies the name of the application configuration containing Kafka properties.

Properties
clientId

Specifies the client ID that should be used when connecting to the Kafka cluster. The value specified by this parameter will override the client.id Kafka property if specified. If this parameter is not specified and the client.id Kafka property is not specified, the operator will use a random client ID.

Properties
groupId

Specifies the group ID that should be used when connecting to the Kafka cluster. The value specified by this parameter will override the group.id Kafka property if specified. If this parameter is not specified and he the group.id Kafka property is not specified, the operator will use a random group ID.

Properties
outputKeyAttributeName

Specifies the output attribute name that should contain the key. If not specified, the operator will attempt to store the message in an attribute named 'key'.

Properties
outputMessageAttributeName

Specifies the output attribute name that will contain the message. If not specified, the operator will attempt to store the message in an attribute named 'message'.

Properties
outputOffsetAttributeName

Specifies the output attribute name that should contain the offset. If not specified, the operator will attempt to store the message in an attribute named 'offset'. The attribute must have the SPL type 'int64' or 'uint64'.

Properties
outputPartitionAttributeName

Specifies the output attribute name that should contain the partition number. If not specified, the operator will attempt to store the partition number in an attribute named 'partition'. The attribute must have the SPL type 'int32' or 'uint32'.

Properties
outputTimestampAttributeName

Specifies the output attribute name that should contain the record's timestamp. It is presented in milliseconds since Unix epoch.If not specified, the operator will attempt to store the message in an attribute named 'messageTimestamp'. The attribute must have the SPL type 'int64' or 'uint64'.

Properties
outputTopicAttributeName

Specifies the output attribute name that should contain the topic. If not specified, the operator will attempt to store the message in an attribute named 'topic'.

Properties
partition

Specifies the partitions that the consumer should be assigned to for each of the topics specified. It should be noted that using this parameter will assign the consumer to the specified topics, rather than subscribe to them. This implies that the consumer will not use Kafka's group management feature.

Properties
propertiesFile

Specifies the name of the properties file containing Kafka properties.

Properties
startOffset

This parameter indicates the start offset that the operator should begin consuming messages from. In order for this parameter's values to take affect, the startPosition parameter must be set to Offset. Furthermore, the specific partition(s) that the operator should consume from must be specified via the partition parameter.

If multiple partitions are specified via the partition parameter, then the same number of offset values must be specified. There is a one-to-one mapping between the position of the partition from the partition parameter and the position of the offset from the startOffset parameter. For example, if the partition parameter has the values '0, 1', and the startOffset parameter has the values '100, 200', then the operator will begin consuming messages from partition 0 at offset 100 and will consume messages from partition 1 at offset 200.

A limitation with using this parameter is that only a single topic can be specified.

Properties
startPosition

Specifies whether the operator should start reading from the end of the topic, the beginning of the topic or from a specific timestamp. Valid options include: Beginning, End, Time. If reading from a specific timestamp (i.e. setting the parameter value to Time), then the startTime parameter must also be defined. If this parameter is not specified, the default value is End.

Properties
startTime

This parameter is only used when the startPosition parameter is set to Time. When the startPosition parameter is to Time, the operator will begin reading records from the earliest offset whose timestamp is greater than or equal to the timestamp specified by this parameter. If no offsets are found, then the operator will begin reading messages from the end of the topic(s). The timestamp must be given as an 'int64' type in milliseconds since Unix epoch.

Properties
topic

Specifies the topic or topics that the consumer should subscribe to. To assign the consumer to specific partitions, use the partitions parameter.

Properties
triggerCount

This parameter specifies the number of tuples that will be submitted to the output port before triggering a consistent region. This parameter is only used if the operator is the start of an operator driven consistent region and is ignored otherwise.

Properties
userLib

Allows the user to specify paths to JAR files that should be loaded into the operators classpath. This is useful if the user wants to be able to specify their own partitioners. The value of this parameter can either point to a specific JAR file, or to a directory. In the case of a directory, the operator will load all files ending in .jar onto the classpath. By default, this parameter will load all jar files found in <application_dir>/etc/libs.

Properties

Metrics

nDroppedMalformedMessages - Counter

Number of dropped malformed messages

Libraries

Operator class library
Library Path: ../../impl/java/bin, ../../opt/downloaded/*, ../../impl/lib/*