com.ibm.streamsx.topology.messaging.kafka

Class KafkaConsumer

  • java.lang.Object
    • com.ibm.streamsx.topology.messaging.kafka.KafkaConsumer


  • public class KafkaConsumer
    extends java.lang.Object
    A simple connector to an Apache Kafka cluster for consuming Kafka messages -- subscribing to Kafka topics and creating a TStream<Message>.

    A single connector is for a specific Kafka cluster as specified in the consumer configuration.

    A connector can create any number of consumers in the topology. A consumer can subscribe to one or more topics.

    Sample use:

     Topology top = ...
     Properties consumerConfig = ...
     KafkaConsumer cc = new KafkaConsumer(top, consumerConfig);
     // with Java8 Lambda expressions... 
     TStream rcvdMsgs = cc.consumer(()->"myTopic");
     // without Java8... 
     TStream rcvdMsgs = cc.consumer(new Value("myTopic"));
     
    See Also:
    http://kafka.apache.org, com.ibm.streamsx.messaging
    • Constructor Summary

      Constructors 
      Constructor and Description
      KafkaConsumer(TopologyElement te, java.util.Map<java.lang.String,java.lang.Object> config)
      Create a consumer connector for subscribing to topics.
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method and Description
      java.util.Map<java.lang.String,java.lang.Object> getConfig()
      Get the connector's KafkaConsumer configuration information.
      TStream<Message> subscribe(Supplier<java.lang.Integer> threadsPerTopic, Supplier<java.lang.String> topic)
      Subscribe to a topic and create a stream of messages.
      TStream<Message> subscribe(Supplier<java.lang.String> topic)
      Subscribe to a topic and create a stream of messages.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • KafkaConsumer

        public KafkaConsumer(TopologyElement te,
                             java.util.Map<java.lang.String,java.lang.Object> config)
        Create a consumer connector for subscribing to topics.

        See the Apache Kafka documentation for KafkaConsumer configuration properties at http://kafka.apache.org. Configuration property values are strings.

        Minimal configuration typically includes:

        • zookeeper.connect
        • group.id
        • zookeeper.session.timeout.ms
        • zookeeper.sync.time.ms
        • auto.commit.interval.ms
        Parameters:
        te - TopologyElement
        config - KafkaConsumer configuration information.
    • Method Detail

      • getConfig

        public java.util.Map<java.lang.String,java.lang.Object> getConfig()
        Get the connector's KafkaConsumer configuration information.
        Returns:
        the unmodifiable configuration
      • subscribe

        public TStream<Message> subscribe(Supplier<java.lang.Integer> threadsPerTopic,
                                          Supplier<java.lang.String> topic)
        Subscribe to a topic and create a stream of messages.

        N.B., A topology that includes this will not support StreamsContext.Type.EMBEDDED.

        N.B. due to com.ibm.streamsx.messaging issue#118, multiple consumers will have issues in StreamsContext.Type.STANDALONE.

        N.B. due to com.ibm.streamsx.messaging issue#117, a consumer in StreamsContext.Type.STANDALONE subsequently results in an orphaned @{code standalone} processes that continues as the lead group/topic consumer thereby preventing subsequent instances of the group/topic consumer from receiving messages.

        N.B. due to com.ibm.streamsx.messaging issue#114, a consumer essentially ignores messages generated by producers where the optional key is null. e.g., Kafka's kafka-console-producer.sh tool generates key==null messages.

        Parameters:
        threadsPerTopic - number of threads to allocate to processing each topic. May be a submission parameter.
        topic - the topic to subscribe to. May be a submission parameter.
        Returns:
        TStream<Message> The generated Message tuples have a non-null topic. The tuple's key will be null if the Kafka message lacked a key or it's key was the empty string.
        Throws:
        java.lang.IllegalArgumentException - if topic is null.
        java.lang.IllegalArgumentException - if threadsPerTopic is null.
        See Also:
        Value, Topology.createSubmissionParameter(String, Class)
streamsx.topology 2.1 @ IBMStreams GitHub