Operator KafkaProducer

Kafka Toolkit > com.ibm.streamsx.kafka 1.3.0 > com.ibm.streamsx.kafka > KafkaProducer

The KafkaProducer operator is used to produce messages on Kafka topics. The operator can be configured to produce messages to one or more topics.

Supported Kafka Version

This version of the toolkit supports Apache Kafka v0.10.2, v0.11.x, and v1.0.x.

Kafka Properties

The operator implements Kafka's KafkaProducer API. As a result, it supports all Kafka properties that are supported by the underlying API. Properties can be specified in a file or in an application configuration. If specifying properties via a file, the propertiesFile parameter can be used. If specifying properties in an application configuration, the name of the application configuration can be specified using the appConfigName parameter.

The only property that the user is required to set is the bootstrap.servers property, which points to the Kafka brokers. All other properties are optional. The operator sets some properties by default to enable users to quickly get started with the operator. The following lists which properties the operator sets by default:

Property name

Default Value

client.id

Randomly generated ID in the form: producer-<random_string>

key.serializer

See Automatic Serialization section below

value.serializer

See Automatic Serialization section below

NOTE: Users can override any of the above properties by explicitly setting the property value in either a properties file or in an application configuration.

Kafka Properties via Application Configuration

Users can specify Kafka properties using Streams' application configurations. Information on configuring application configurations can be found here: Creating application configuration objects to securely store data. Each property set in the application configuration will be loaded as a Kafka property. For example, to specify the bootstrap servers that the operator should connect to, an app config property named bootstrap.servers should be created.

Automatic Serialization

The operator will automatically select the appropriate serializers for the key and message based on their types. The following table outlines which deserializer will be used given a particular type:

Serializer

SPL Types

org.apache.kafka.common.serialization.StringSerializer

rstring

org.apache.kafka.common.serialization.IntegerSerializer

int32, uint32

org.apache.kafka.common.serialization.LongSerializer

int64, uint64

org.apache.kafka.common.serialization.FloatSerializer

float32

org.apache.kafka.common.serialization.DoubleSerializer

float64

org.apache.kafka.common.serialization.ByteArraySerializer

blob

Consistent Region Strategy

The KafkaProducer operator can participate in a consistent region. The operator cannot be the start of a consistent region. The operator supports 'at least once' (default behavior) and 'exactly once' delivery semantics. The delivery semantics can be controlled by the consistentRegionPolicy parameter.

'At least once' delivery

If the operator crashes or is reset while in a consistent region, the operator will write all tuples replayed. This ensures that every tuple sent to the operator will be written to the topic(s). However, 'at least once' semantics implies that duplicate messages may be written to the topic(s).

'Exactly once' delivery

Messages are always inserted into a topic within the context of a transaction. Transactions are committed when the operator checkpoints. If the operator crashes or is reset while in a consistent region, the opertor will abort the ongoing transaction and write all tuples replayed within a new transaction. This ensures that every tuple sent to the operator will be written to the topic(s), and that clients configured with isolation.level = read_committed will not read the duplicates from the aborted transactions.

NOTE 1: Transactions in Kafka have an inactivity timeout with default value of 60 seconds. If the consistent region triggers less frequently and you expect a low message rate, consider to to increase the timeout by setting the client property transaction.timeout.ms to a higher value, for example 120000 (milliseconds). The maximum value of this property is limited by the server property transaction.max.timeout.ms, which has a default value of 900000.

NOTE 2: For 'exactly once' delivery semantics, the Kafka broker must have version 0.11 or higher because older brokers do not support transactions.

Error Handling

Many exceptions thrown by the underlying Kafka API are considered fatal. In the event that Kafka throws an exception, the operator will restart. Some exceptions can be retried, such as those that occur due to network error. Users are encouraged to set the KafkaProducer retries property to a value greater than 0 to enable the producer's retry mechanism.

Summary

Ports
This operator has 1 input port and 0 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 11 parameters.

Optional: appConfigName, clientId, consistentRegionPolicy, keyAttribute, messageAttribute, partitionAttribute, propertiesFile, timestampAttribute, topic, topicAttribute, userLib

Metrics
This operator does not report any metrics.

Properties

Implementation
Java

Input Ports

Ports (0)

This port consumes tuples to be written to the Kafka topic(s). Each tuple received on this port will be written to the Kafka topic(s).

Properties

Parameters

This operator supports 11 parameters.

Optional: appConfigName, clientId, consistentRegionPolicy, keyAttribute, messageAttribute, partitionAttribute, propertiesFile, timestampAttribute, topic, topicAttribute, userLib

appConfigName

Specifies the name of the application configuration containing Kafka properties.

Properties
clientId

Specifies the client ID that should be used when connecting to the Kafka cluster. The value specified by this parameter will override the client.id Kafka property if specified. If this parameter is not specified and the client.id Kafka property is not specified, the operator will use a random client ID.

Properties
consistentRegionPolicy

Specifies the policy to use when in the a consistent region. If 'AtLeastOnce' is specified, the operator will guarantee that every tuple is written to the topic(s) at least once. If 'Transactional' is specified, the operator will write tuples to the topic(s) within the context of a transaction. Transactions are commited when the operator checkpoints. This implies that downstream consumers may not see the messages until operator checkpoints, or if the consumer is configured to read uncommited messages. To achieve Exactly Once behaviour for a consumer, the property isolation.level must be set to 'read_committed' for the consumer. Otherwise also uncommitted messages are read from a Kafka topic, which then looks like at least once for the consumer.This parameter is ignored if the operator is not part of a consistent region. The default value is 'AtLeastOnce'. NOTE: Kafka brokers older than version v0.11 do not support transactions.

Properties
keyAttribute

Specifies the input attribute that contains the Kafka key value. If not specified, the operator will look for an input attribute named key.

Properties
messageAttribute

Specifies the attribute on the input port that contains the message payload. If not specified, the operator will look for an input attribute named message. If this parameter is not specified and there is no input attribute named message, the operator will throw an exception and terminate.

Properties
partitionAttribute

Specifies the input attribute that contains the partition number that the message should be written to. If this parameter is not specified, the operator will look for an input attribute named partition. If the user does not indicate which partition the message should be written to, then Kafka's default partitioning strategy will be used instead (partition based on the specified partitioner or in a round-robin fashion).

Properties
propertiesFile

Specifies the name of the properties file containing Kafka properties.

Properties
timestampAttribute

Specifies the attribute on the input port that contains the timestamp for the message. If not specified, theoperator will look for an input attribute named messageTimestamp. If this parameter is not specified and there is no input attribute named messageTimestamp, the operator will use the timestamp provided by the underlying Kafka API.

Properties
topic

Specifies the topic(s) that the producer should send messages to. The value of this parameter will take precedence over the topicAttribute parameter. This parameter will also take precedence if the input tuple schema contains an attribute named topic.

Properties
topicAttribute

Specifies the input attribute that contains the name of the topic that the message should be written to. If this parameter is not specified, the operator will look for an input attribute named topic. This parameter value is overridden if the topic parameter is specified.

Properties
userLib

Allows the user to specify paths to JAR files that should be loaded into the operators classpath. This is useful if the user wants to be able to specify their own partitioners. The value of this parameter can either point to a specific JAR file, or to a directory. In the case of a directory, the operator will load all files ending in .jar onto the classpath. By default, this parameter will load all jar files found in <application_dir>/etc/libs.

Properties

Libraries

Operator class library
Library Path: ../../impl/java/bin, ../../opt/downloaded/*, ../../impl/lib/*