public class KafkaProducer
extends java.lang.Object
TStream<Message>
to Kafka topics.
A single connector is for a specific Kafka Broker as specified in the producer configuration.
A connector can create any number of producers in the topology. A producer can publish to one or more topics.
Sample use:
Topology top = ... Properties producerConfig = ... KafkaProducer pc = new KafkaProducer(top, producerConfig); TStreammyStream = ... TStream msgsToSend = myStream.transform(MyType to SimpleMessage); // with Java8 Lambda expressions... pc.publish(msgsToSend, ()->"myTopic"); // without Java8 pc.publish(msgsToSend, new Value ("myTopic"));
Constructor and Description |
---|
KafkaProducer(TopologyElement te,
java.util.Map<java.lang.String,java.lang.Object> config)
Create a producer connector for publishing tuples.
|
Modifier and Type | Method and Description |
---|---|
java.util.Map<java.lang.String,java.lang.Object> |
getConfig()
Get the connector's
KafkaProducer configuration information. |
TSink |
publish(TStream<? extends Message> stream,
Supplier<java.lang.String> topic)
Publish a stream of messages to a topic.
|
TSink |
publish(TStream<? extends Message> stream)
Publish a stream of messages to one or more topics.
|
public KafkaProducer(TopologyElement te, java.util.Map<java.lang.String,java.lang.Object> config)
See the Apache Kafka documentation for KafkaProducer
configuration properties at http://kafka.apache.org.
Configuration property values are strings.
Starting with com.ibm.streamsx.messaging v3.0
, the
Kafka "New Producer configs" are used. Minimal configuration
typically includes:
bootstrap.servers
acks
metadata.broker.list
serializer.class
request.required.acks
config
- KafkaProducer configuration information.public java.util.Map<java.lang.String,java.lang.Object> getConfig()
KafkaProducer
configuration information.public TSink publish(TStream<? extends Message> stream)
stream
tuple is sent to the topic specified by its
Message.getTopic()
value.
Same as produce(stream, null)
.stream
- the stream to publishpublic TSink publish(TStream<? extends Message> stream, Supplier<java.lang.String> topic)
If topic
is null, each tuple is published to the topic
specified by its Message.getTopic()
.
Otherwise, all tuples are published to topic
.
The messages added to Kafka include a topic, message and key.
If Message.getKey()
is null, an empty key value is published.
N.B. there seem to be some issues with the underlying com.ibm.streamsx.messaging library - e.g., issue#118. If your application is experiencing odd Kafka behavior try isolating the producer from its feeding streams. e.g.,
KafkaProducer pc = ... TStreams = ... pc.publish(s.isolate(), ...);
stream
- the stream to publishtopic
- topic to publish to. May be null.java.lang.IllegalArgumentException
- if a non-null empty topic
is specified.