public class KafkaConsumer
extends java.lang.Object
TStream<Message>.
A single connector is for a specific Kafka cluster as specified in the consumer configuration.
A connector can create any number of consumers in the topology. A consumer can subscribe to one or more topics.
Sample use:
Topology top = ... Properties consumerConfig = ... KafkaConsumer cc = new KafkaConsumer(top, consumerConfig); // with Java8 Lambda expressions... TStreamrcvdMsgs = cc.consumer(()->"myTopic"); // without Java8... TStream rcvdMsgs = cc.consumer(new Value ("myTopic"));
| Constructor and Description |
|---|
KafkaConsumer(TopologyElement te,
java.util.Map<java.lang.String,java.lang.Object> config)
Create a consumer connector for subscribing to topics.
|
| Modifier and Type | Method and Description |
|---|---|
java.util.Map<java.lang.String,java.lang.Object> |
getConfig()
Get the connector's
KafkaConsumer configuration information. |
TStream<Message> |
subscribe(Supplier<java.lang.Integer> threadsPerTopic,
Supplier<java.lang.String> topic)
Subscribe to a topic and create a stream of messages.
|
TStream<Message> |
subscribe(Supplier<java.lang.String> topic)
Subscribe to a topic and create a stream of messages.
|
public KafkaConsumer(TopologyElement te, java.util.Map<java.lang.String,java.lang.Object> config)
See the Apache Kafka documentation for KafkaConsumer
configuration properties at http://kafka.apache.org.
Configuration property values are strings.
Minimal configuration typically includes:
zookeeper.connectgroup.idzookeeper.session.timeout.mszookeeper.sync.time.msauto.commit.interval.mste - TopologyElementconfig - KafkaConsumer configuration information.public java.util.Map<java.lang.String,java.lang.Object> getConfig()
KafkaConsumer configuration information.public TStream<Message> subscribe(Supplier<java.lang.String> topic)
Same as subscribe(new Value<Integer>(1), topic)
topic - the topic to subscribe to. May be a submission parameter.Value,
Topology.createSubmissionParameter(String, Class)public TStream<Message> subscribe(Supplier<java.lang.Integer> threadsPerTopic, Supplier<java.lang.String> topic)
N.B., A topology that includes this will not support
StreamsContext.Type.EMBEDDED.
N.B. due to com.ibm.streamsx.messaging
issue#118,
multiple consumers will have issues in
StreamsContext.Type.STANDALONE.
N.B. due to com.ibm.streamsx.messaging
issue#117,
a consumer in StreamsContext.Type.STANDALONE subsequently results
in an orphaned @{code standalone} processes that continues as the lead
group/topic consumer thereby preventing subsequent instances of the
group/topic consumer from receiving messages.
N.B. due to com.ibm.streamsx.messaging
issue#114,
a consumer essentially ignores messages generated by producers where the
optional key is null.
e.g., Kafka's kafka-console-producer.sh tool generates
key==null messages.
threadsPerTopic - number of threads to allocate to processing each
topic. May be a submission parameter.topic - the topic to subscribe to. May be a submission parameter.Message tuples have a non-null topic.
The tuple's key will be null if the Kafka message
lacked a key or it's key was the empty string.java.lang.IllegalArgumentException - if topic is null.java.lang.IllegalArgumentException - if threadsPerTopic is null.Value,
Topology.createSubmissionParameter(String, Class)