Getting Started with Kafka Operators

Edit me

Introduction

The IBM Streams Kafka Toolkit is designed to get you connected to your messaging servers as quickly as possible. Kafka is an ideal messaging server for stream computing. This guide will get you sending and receiving messages in no time, and will highlight some of the best practices. We will also cover how to get the Kafka operators running in a consistent region.

Skill Level

Readers of this guide are expected to have a basic understanding of Kafka and IBM Streams terminology. To get up to speed on Kafka basics, run through their great Quick Start guide.

If you are new to Streams, follow the Quick Start for an overview.

Requirements

Prior to using Kafka operators, the following software must be installed and configured:

  • IBM Streams - A Quick Start Edition is available for free. This guide assumes that you have a Streams domain and instance up and running.

  • Kafka Toolkit - You can download it from the IBM Streams GitHub Kafka Toolkit Repository Release Page.

  • Kafka Brokers - This guide assumes you are using Kafka 0.11 or above. To quickly get a Kafka server up and running, follow these directions.

Information to Collect

Once you have your Kafka server (or servers) set up, you will need their hostnames and listener ports. You can find them in your configuration file for each server (default is <Kafka-Install>/config/server.properties):

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=myhost.mycompany.com

Steps - Send and Receive Messages

  1. Configure the SPL compiler to find the Kafka toolkit directory. Use one of the following methods.
    • Set the STREAMS_SPLPATH environment variable to the root directory of the toolkit (with : as a separator)

      export STREAMS_SPLPATH=\<kafka-toolkit-location\>/com.ibm.streamsx.kafka:$STREAMS_SPLPATH

    • Specify the -t or –spl-path command parameter when you run the sc command.

      sc -t \<kafka-toolkit-location\>/com.ibm.streamsx.kafka -M MyMain

    • If Streams Studio is used to compile and run SPL application, add Kafka toolkit to toolkit locations in Streams Explorer by following these directions.

    • If you develop your applications with the IBM Streams extension for Visual Studio Code, follow this guide.

  2. Create an SPL application and add a toolkit dependency on the Kafka toolkit in your application. You can do this by editing the application dependency in Streams Studio, or by creating/editing the info.xml for the application and adding the dependency directly (you can also just start with the KafkaSample to skip this and the following step).

    Sample info.xml from the KafkaSample:

    <?xml version="1.0" encoding="UTF-8"?>
    <info:toolkitInfoModel xmlns:common="http://www.ibm.com/xmlns/prod/streams/spl/common" xmlns:info="http://www.ibm.com/xmlns/prod/streams/spl/toolkitInfo">
      <info:identity>
     <info:name>KafkaSample</info:name>
     <info:description>Sample application showing a Streams Kafka Producer and Consumer</info:description>
     <info:version>1.0.0</info:version>
     <info:requiredProductVersion>4.0.0.0</info:requiredProductVersion>
      </info:identity>
      <info:dependencies>
         <info:toolkit>
           <common:name>com.ibm.streamsx.kafka</common:name>
           <common:version>[3.0.0,6.0.0)</common:version>
         </info:toolkit>
      </info:dependencies>
    </info:toolkitInfoModel>
    
  3. Add the Kafka operator use directives to your application. If Streams Studio is used, this directive is automatically added when dragging and dropping a Kafka operator onto SPL application in the graphical editor (if you start with a sample from the Kafka toolkit, this step is already done for you).

    use com.ibm.streamsx.kafka::*;

    or

    use com.ibm.streamsx.kafka::KafkaProducer;

    use com.ibm.streamsx.kafka::KafkaConsumer;

  4. Configure the Kafka Producer to send messages to a Kafka Broker. You must:
    • Create a producer.properties file and place it in the etc directory of your application. This ensures that it will be included in the .sab application bundle (important for cloud and HA deployment). The following is a sample producer.properties file. See here for more producer configuration details.
      bootstrap.servers=broker.host.1:9092,broker.host.2:9092,broker.host.3:9092
    • Specify the location of the producer.properties file in the KafkaProducer operator using the propertiesFile parameter. You can specify either an absolute or a relative file path, where the path is relative to the application directory:

      propertiesFile : "etc/producer.properties";

    • Specify the Kafka topic to send messages to. This can be done via the rstring topic attribute in the incoming tuple or you can specify this using the topic parameter in the KafkaProducer (see the highlighted code in the beacon operator below).

    Here is the sample beacon and KafkaProducer code from the KafkaSample:

    //create some messages and send to KafkaProducer
     stream<rstring topic, rstring key, rstring message> OutputStream = Beacon() {
         param
             initDelay : 5.0;
             period : 0.2;
         output
             OutputStream: topic = $topic
                         , message = "Reality is merely an illusion, albeit a very persistent one."
                         , key = "Einstein";
     }
    
    
     () as KafkaSinkOp = KafkaProducer(OutputStream) {
         param
             propertiesFile : "etc/producer.properties";
     }
     
  5. Configure the Kafka Consumer to receive messages from the Kafka Broker. You must:
    • Create a consumer.properties file and place it in the etc directory of your application. Here is a sample consumer.properties file (for more details on Kafka Consumer configs, see here:
      bootstrap.servers=broker.host.1:9092,broker.host.2:9092,broker.host.3:9092
    • Specify the location of the consumer.properties file in the KafkaConsumer operator using the propertiesFile parameter:

      propertiesFile : "etc/consumer.properties";

    • Specify the Kafka topic (or topics) to subscribe to receive messages from. Do this using the rstring topic parameter in the KafkaConsumer. You can subscribe to multiple topics by using a comma separated list:

      topic: "topic1" , "topic2" , "topic3";

    Here is the KafkaConsumer operator from the KafkaSample:

    stream<rstring key, rstring message> KafkaStream = KafkaConsumer()
     {
         param
             propertiesFile : "etc/consumer.properties";
             topic : $topic;
     }

Consistent Regions

Kafka operators support consistent regions, which are sections of your operator graph where tuple processing is guaranteed. The KafkaProducer can participate in a consistent region (it cannot be the start), and can guarantee at-least-once tuple processing. No special configuration is required to use the KafkaProducer in a consistent region, so this section will only focus on the KafkaConsumer.

The KafkaConsumer supports at-least-once tuple processing and starts a consistent region (since it is a source). For general questions on consistent region, read this overview and these docs.

To start a consistent region with a KafkaConsumer, you must:

  • Place an @consistent annotation above the operator

  • Specify triggerCount parameter for operatorDriven trigger - The trigger count gives you control over the approximate number of messages between checkpointing. If you are using a periodic trigger for your consistent region, you do not need to specify this. Here is the KafkaConsumer from the KafkaConsumerGroupWithConsistentRegion sample:

    //Read in from a kafka server and start consistent region
    @consistent (trigger=periodic, period=60.0 /*seconds*/)
    stream <rstring message, int32 partition, rstring key> ConsumedMsgs = KafkaConsumer()
    {
        param
            propertiesFile: "etc/consumer.properties" ;
            topic: $topic ;
            groupId: "myGroupId" ;
    }

Parallel Consuming

Consuming in parallel lets you take advantage of the scalability of both Kafka and Streams. Kafka allows for parallel consumption of multi-topic partitions via consumer groups. Any consumer with the same group identifier group.id=<consumer-group-name> or operator parameter groupId: <consumer-group-name> will be a part of the same consumer group and will share the partitions of a topic. If you would like to directly control which partitions your consumer is reading from, you can do that by specifying the partition parameter in the KafkaConsumer operator.

The easiest way to consume from a single topic in parallel is to:

  • Use @parallel with a width equal to the number of partitions in your topic:

    @parallel(width = $numTopicPartitions)

Here is a simple example of using three consumers to read from a 3-partition topic using User Defined Parallelism:

    @parallel(width = 3)
    stream<rstring message, rstring key> KafkaConsumerOut = KafkaConsumer()
    {
        param
            propertiesFile : "etc/consumer.properties" ;
            topic : $topic ;
            groupId : "myGroupId" ;
    }

If you would like to consume in parallel within a consistent region, check out this KafkaConsumerGroupWithConsistentRegion sample.

You find more information about the common consumer patterns for parallel processing in the user documentation of the streamsx.kafka toolkit for the consumer group with dynamic partition assignment, and for user defined partition assignment.

Connecting to IBM Event Streams

IBM Event Streams is IBM’s Kafka as a service offering. You can use the Message Hub toolkit to connect to it. The Message Hub toolkit is based on the Kafka toolkit but simplifies connection to the service.

See this article on how to connect to Event Streams.

Additional Resources