Operator KafkaProducer

IBMStreams com.ibm.streamsx.messaging Toolkit > com.ibm.streamsx.messaging 5.4.3 > com.ibm.streamsx.messaging.kafka > KafkaProducer

DEPRECATED: The com.ibm.streamsx.messaging.kafka.KafkaProducer operator is deprecated and is replaced by the com.ibm.streamsx.kafka.KafkaProducer operator in the com.ibm.streamsx.kafka toolkit. The deprecated operator might be removed in a future release.

This operator acts as a Kafka producer sending tuples as messages to a Kafka broker. The broker is assumed to be already configured and running. The incoming stream can have three attributes: topic, key and message. The message is a required attribute. A topic can be specified as either an input stream attribute or as a parameter. Specify properties as described here: http://kafka.apache.org/documentation.html#configuration. If you are using Java security modules for login/authentication, ensure that they are compatible with IBM Java, as IBM Streams only runs with IBM Java. The SPL Attribute Types supported are rstring, ustring, and blob. The topic must be of type rstring/ustring, while the key and message must be of the same type (rstring, ustring, or blob).

Kafka 0.9 Server Support: By default this toolkit builds with Kafka 0.10 client JARs. The Kafka 0.10 client is not compatible with Kafka 0.9 brokers. To use this operator with Kafka 0.9 brokers, you must rebuild with the kafka-0.9 target after cleaning. From the toolkit root directory:

ant clean

ant kafka-0.9

AppConfig: You must provide properties for the operator using at least one of the following parameters: kafkaProperty, propertiesFile, or appConfigName. The hierarchy of properties goes: properties from appConfig beat out kafkaProperty parameter properties, which beat out properties from the propertiesFile.

Behavior in a Consistent Region This operator can participate in a consistent region. This operator cannot be placed at the start of a consistent region. The KafkaProducer guarantees at-least-once delivery of messages to a Kafka topic.

Summary

Ports
This operator has 1 input port and 0 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 10 parameters.

Optional: appConfigName, appConfigPropertyName, jaasFile, jaasFilePropName, kafkaProperty, keyAttribute, messageAttribute, propertiesFile, topic, topicAttribute

Metrics
This operator does not report any metrics.

Properties

Implementation
Java

Input Ports

Ports (0)

The tuples arriving on this port are expected to contain three attributes "key", "topic" and "message". Out of these "message", is a required attribute.

Properties

Parameters

This operator supports 10 parameters.

Optional: appConfigName, appConfigPropertyName, jaasFile, jaasFilePropName, kafkaProperty, keyAttribute, messageAttribute, propertiesFile, topic, topicAttribute

appConfigName

This parameter specifies the name of application configuration that stores client properties, the property specified via application configuration is overridden by the properties file and kafkaProperty parameter. The hierarchy of properties goes: properties from appConfig beat out kafkaProperty parameter properties, which beat out properties from the propertiesFile.

Properties
appConfigPropertyName

List of Kafka properties to retrieve from application configuration. The property name in the application configuration must the same as the Kafka property name. You may also supply jaasFile as a property name to act as the jaasFile parameter value.

Properties
jaasFile

Location of the jaas file to be used for SASL connections. Jaas file is recommended to be stored in the etc directory. If a relative path is specified, the path is relative to the application directory.This sets the system property java.security.auth.login.config. This can also be set using the appConfig by specifying jaasFile=<jaas.conf location>.

Properties
jaasFilePropName

This parameter specifies the property name of the jaasFile location in the application configuration. The default name is jaasFile.

Properties
kafkaProperty

Specify a Kafka property "key=value" form. This will override any property specified in the properties file. The hierarchy of properties goes: properties from appConfig beat out kafkaProperty parameter properties, which beat out properties from the propertiesFile.

Properties
keyAttribute

Name of the attribute for the key. Default is "key".

Properties
messageAttribute

Name of the attribute for the message. If this parameter is not specified, then by default the operator will look for an attribute named "message". If the "message" attribute is not found, a runtime error will be returned.

Properties
propertiesFile

Properties file containing kafka properties. Properties file is recommended to be stored in the etc directory. If a relative path is specified, the path is relative to the application directory. The hierarchy of properties goes: properties from appConfig beat out kafkaProperty parameter properties, which beat out properties from the propertiesFile.

Properties
topic

Topic to be published to. A topic can also be specified as an input stream attribute.

Properties
topicAttribute

Name of the attribute for the topic. Default is "topic"

Properties

Libraries

Operator class library
Library Path: ../../impl/lib/com.ibm.streamsx.messaging.jar, ../../opt/downloaded/*, ../../opt/*