Class KafkaProducer
- java.lang.Object
-
- com.ibm.streamsx.topology.messaging.kafka.KafkaProducer
-
public class KafkaProducer extends java.lang.Object
A simple connector to an Apache Kafka cluster for producing Kafka messages -- publishing aTStream<Message>
to Kafka topics.A single connector is for a specific Kafka Broker as specified in the producer configuration.
A connector can create any number of producers in the topology. A producer can publish to one or more topics.
Sample use:
Topology top = ... Properties producerConfig = ... KafkaProducer pc = new KafkaProducer(top, producerConfig); TStream
myStream = ... TStream msgsToSend = myStream.transform(MyType to SimpleMessage); // with Java8 Lambda expressions... pc.publish(msgsToSend, ()->"myTopic"); // without Java8 pc.publish(msgsToSend, new Value ("myTopic"));
-
-
Constructor Summary
Constructors Constructor and Description KafkaProducer(TopologyElement te, java.util.Map<java.lang.String,java.lang.Object> config)
Create a producer connector for publishing tuples.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method and Description java.util.Map<java.lang.String,java.lang.Object>
getConfig()
Get the connector'sKafkaProducer
configuration information.TSink
publish(TStream<? extends Message> stream, Supplier<java.lang.String> topic)
Publish a stream of messages to a topic.TSink
publish(TStream<? extends Message> stream)
Publish a stream of messages to one or more topics.
-
-
-
Constructor Detail
-
KafkaProducer
public KafkaProducer(TopologyElement te, java.util.Map<java.lang.String,java.lang.Object> config)
Create a producer connector for publishing tuples.See the Apache Kafka documentation for
KafkaProducer
configuration properties at http://kafka.apache.org. Configuration property values are strings.Starting with
com.ibm.streamsx.messaging v3.0
, the Kafka "New Producer configs" are used. Minimal configuration typically includes:bootstrap.servers
acks
metadata.broker.list
serializer.class
request.required.acks
- Parameters:
config
- KafkaProducer configuration information.
-
-
Method Detail
-
getConfig
public java.util.Map<java.lang.String,java.lang.Object> getConfig()
Get the connector'sKafkaProducer
configuration information.- Returns:
- the unmodifiable configuration
-
publish
public TSink publish(TStream<? extends Message> stream)
Publish a stream of messages to one or more topics. Eachstream
tuple is sent to the topic specified by itsMessage.getTopic()
value. Same asproduce(stream, null)
.- Parameters:
stream
- the stream to publish- Returns:
- the sink element
-
publish
public TSink publish(TStream<? extends Message> stream, Supplier<java.lang.String> topic)
Publish a stream of messages to a topic.If
topic
is null, each tuple is published to the topic specified by itsMessage.getTopic()
. Otherwise, all tuples are published totopic
.The messages added to Kafka include a topic, message and key. If
Message.getKey()
is null, an empty key value is published.N.B. there seem to be some issues with the underlying com.ibm.streamsx.messaging library - e.g., issue#118. If your application is experiencing odd Kafka behavior try isolating the producer from its feeding streams. e.g.,
KafkaProducer pc = ... TStream
s = ... pc.publish(s.isolate(), ...); - Parameters:
stream
- the stream to publishtopic
- topic to publish to. May be null.- Returns:
- the sink element
- Throws:
java.lang.IllegalArgumentException
- if a non-null emptytopic
is specified.
-
-