public class KafkaConsumer
extends java.lang.Object
TStream<Message>
.
A single connector is for a specific Kafka cluster as specified in the consumer configuration.
A connector can create any number of consumers in the topology. A consumer can subscribe to one or more topics.
Sample use:
Topology top = ... Properties consumerConfig = ... KafkaConsumer cc = new KafkaConsumer(top, consumerConfig); // with Java8 Lambda expressions... TStreamrcvdMsgs = cc.consumer(()->"myTopic"); // without Java8... TStream rcvdMsgs = cc.consumer(new Value ("myTopic"));
Constructor and Description |
---|
KafkaConsumer(TopologyElement te,
java.util.Map<java.lang.String,java.lang.Object> config)
Create a consumer connector for subscribing to topics.
|
Modifier and Type | Method and Description |
---|---|
java.util.Map<java.lang.String,java.lang.Object> |
getConfig()
Get the connector's
KafkaConsumer configuration information. |
TStream<Message> |
subscribe(Supplier<java.lang.Integer> threadsPerTopic,
Supplier<java.lang.String> topic)
Subscribe to a topic and create a stream of messages.
|
TStream<Message> |
subscribe(Supplier<java.lang.String> topic)
Subscribe to a topic and create a stream of messages.
|
public KafkaConsumer(TopologyElement te, java.util.Map<java.lang.String,java.lang.Object> config)
See the Apache Kafka documentation for KafkaConsumer
configuration properties at http://kafka.apache.org.
Configuration property values are strings.
Minimal configuration typically includes:
zookeeper.connect
group.id
zookeeper.session.timeout.ms
zookeeper.sync.time.ms
auto.commit.interval.ms
te
- TopologyElement
config
- KafkaConsumer configuration information.public java.util.Map<java.lang.String,java.lang.Object> getConfig()
KafkaConsumer
configuration information.public TStream<Message> subscribe(Supplier<java.lang.String> topic)
Same as subscribe(new Value<Integer>(1), topic)
topic
- the topic to subscribe to. May be a submission parameter.Value
,
Topology.createSubmissionParameter(String, Class)
public TStream<Message> subscribe(Supplier<java.lang.Integer> threadsPerTopic, Supplier<java.lang.String> topic)
N.B., A topology that includes this will not support
StreamsContext.Type.EMBEDDED
.
N.B. due to com.ibm.streamsx.messaging
issue#118,
multiple consumers will have issues in
StreamsContext.Type.STANDALONE
.
N.B. due to com.ibm.streamsx.messaging
issue#117,
a consumer in StreamsContext.Type.STANDALONE
subsequently results
in an orphaned @{code standalone} processes that continues as the lead
group/topic consumer thereby preventing subsequent instances of the
group/topic consumer from receiving messages.
N.B. due to com.ibm.streamsx.messaging
issue#114,
a consumer essentially ignores messages generated by producers where the
optional key
is null
.
e.g., Kafka's kafka-console-producer.sh
tool generates
key==null
messages.
threadsPerTopic
- number of threads to allocate to processing each
topic. May be a submission parameter.topic
- the topic to subscribe to. May be a submission parameter.Message
tuples have a non-null topic
.
The tuple's key
will be null if the Kafka message
lacked a key or it's key was the empty string.java.lang.IllegalArgumentException
- if topic is null.java.lang.IllegalArgumentException
- if threadsPerTopic is null.Value
,
Topology.createSubmissionParameter(String, Class)