Getting Started with Kafka Operators
Edit meIntroduction
The IBM Streams Messaging Toolkit is designed to get you connected to your messaging servers as quickly as possible. Kafka is an ideal messaging server for stream computing. This guide will get you sending and receiving messages in no time, and will highlight some of the best practices. We will also cover how to get the Kafka operators running in a consistent region.
Skill Level
Readers of this guide are expected to have a basic understanding of Kafka and IBM Streams terminology. To get up to speed on Kafka basics, run through their great Quick Start guide. To get a basic understanding of IBM Streams, you can read our Quick Start.
Requirements
Prior to using Kafka operators, the following software must be installed and configured:
- IBM Streams - A Quick Start Edition VM is available for free. This guide assumes that you have a Streams domain and instance up and running.
- Messaging Toolkit 4.0+ - You can download it from the IBM Streams GitHub Messaging Toolkit Repository Release Page.
- Kafka Brokers - This guide assumes you are using Kafka 0.9 or above. To quickly get a Kafka server up and running, follow these directions.
Information to Collect
Once you have your Kafka server (or servers) set up, you will need their hostnames and listener ports. You can find them in your configuration file for each server (default is <Kafka-Install>/config/server.properties
):
# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=myhost.mycompany.com
Steps - Send and Receive Messages
- Configure the SPL compiler to find the messaging toolkit directory. Use one of the following methods.
-
Set the STREAMS_SPLPATH environment variable to the root directory of the toolkit (with : as a separator)
export STREAMS_SPLPATH=\<messaging-toolkit-location\>/com.ibm.streamsx.messaging:$STREAMS_SPLPATH
-
Specify the -t or –spl-path command parameter when you run the sc command.
sc -t $STREAMS_INSTALL/toolkits/com.ibm.streamsx.messaging -M MyMain
-
If Streams Studio is used to compile and run SPL application, add messaging toolkit to toolkit locations in Streams Explorer by following these directions.
-
-
Create an SPL application and add a toolkit dependency on the Messaging toolkit in your application. You can do this by editing the application dependency in Streams Studio, or by creating/editing the info.xml for the application and adding the dependency directly (you can also just start with the KafkaSample to skip this and the following step).
Sample info.xml from the KafkaSample:
<?xml version="1.0" encoding="UTF-8"?> <info:toolkitInfoModel xmlns:common="http://www.ibm.com/xmlns/prod/streams/spl/common" xmlns:info="http://www.ibm.com/xmlns/prod/streams/spl/toolkitInfo"> <info:identity> <info:name>KafkaSample</info:name> <info:description>Sample application showing a Streams Kafka Producer and Consumer</info:description> <info:version>1.0.0</info:version> <info:requiredProductVersion>4.0.0.0</info:requiredProductVersion> </info:identity> <info:dependencies> <info:toolkit> <common:name>com.ibm.streamsx.messaging</common:name> <common:version>[3.0.0,6.0.0)</common:version> </info:toolkit> </info:dependencies> </info:toolkitInfoModel>
-
Add the Kafka operator use directives to your application. If Streams Studio is used, this directive is automatically added when dragging and dropping a Kafka operator onto SPL application in the graphical editor (if you start with a sample from the messaging toolkit, this step is already done for you).
use com.ibm.streamsx.messaging.kafka::*;
or
use com.ibm.streamsx.messaging.kafka::KafkaProducer;
use com.ibm.streamsx.messaging.kafka::KafkaConsumer;
- Configure the Kafka Producer to send messages to a Kafka Broker. You must:
- Create a producer.properties file and place it in the
etc
directory of your application. This ensures that it will be included in the .sab application bundle (important for cloud and HA deployment). The following is a sample producer.properties file. See here for more producer configuration details.bootstrap.servers=broker.host.1:9092,broker.host.2:9092,broker.host.3:9092 acks=0
-
Specify the location of the producer.properties file in the KafkaProducer operator using the propertiesFile parameter. You can specify either an absolute or a relative file path, where the path is relative to the application directory:
propertiesFile : etc/producer.properties
- Specify the Kafka topic to send messages to. This can be done via the rstring topic attribute in the incoming tuple or you can specify this using the topic parameter in the KafkaProducer (see the highlighted code in the beacon operator below).
Here is the sample beacon and KafkaProducer code from the KafkaSample:
//create some messages and send to KafkaProducer stream<rstring topic, rstring key, rstring message> OutputStream = Beacon() { param initDelay : 5.0; period : 0.2; output OutputStream: topic = $topic , message = "Reality is merely an illusion, albeit a very persistent one." , key = "Einstein"; } () as KafkaSinkOp = KafkaProducer(OutputStream) { param propertiesFile : "etc/producer.properties"; }
Notice: We don't specify the topic as a parameter, but instead as a part of the incoming tuple. This means that each incoming tuple can be directed towards a different topic. - Create a producer.properties file and place it in the
- Configure the Kafka Consumer to receive messages from the Kafka Broker. You must:
- Create a consumer.properties file and place it in the
etc
directory of your application. Here is a sample consumer.properties file (for more details on Kafka Consumer configs, see here:bootstrap.servers=broker.host.1:9092,broker.host.2:9092,broker.host.3:9092 group.id=mygroup
-
Specify the location of the consumer.properties file in the KafkaConsumer operator using the propertiesFile parameter:
propertiesFile : etc/consumer.properties
-
Specify the Kafka topic (or topics) to subscribe to receive messages from. Do this using the rstring topic parameter in the KafkaConsumer. You can subscribe to multiple topics by using a comma separated list:
topic: "topic1" , "topic2" , "topic3";
Here is the KafkaConsumer operator from the KafkaSample:
stream<rstring key, rstring message> KafkaStream = KafkaConsumer() { param propertiesFile : "etc/consumer.properties"; topic : $topic; }
Tip: Not seeing any messages coming into your consumer? You may have stray consumers in the same consumer group reading from your topic. Try adding this as a KafkaProperty parameter:
kafkaProperty : "group.id=" + (rstring) getTimestampInSecs() ; - Create a consumer.properties file and place it in the
Consistent Regions
Kafka operators support consistent regions, which are sections of your operator graph where tuple processing is guaranteed. The KafkaProducer can participate in a consistent region (it cannot be the start), and can guarantee at-least-once tuple processing. No special configuration is required to use the KafkaProducer in a consistent region, so this section will only focus on the KafkaConsumer.
The KafkaConsumer supports exactly-once tuple processing and starts a consistent region (since it is a source). For general questions on consistent region, read this overview and these docs.
To start a consistent region with a KafkaConsumer, you must:
-
Place an
@consistent
annotation above the operator - Specify the partition parameter - The partition refers to the Kafka topic partition that we will maintain exactly-once processing for. This example is for a single-partition topic, but for a three-partition topic you can simple specify:
partition: 0,1,2;
- Specify triggerCount parameter for operatorDriven trigger - The trigger count gives you control over the approximate number of messages between checkpointing. If you are using a periodic trigger for your consistent region, you do not need to specify this. Here is the KafkaConsumer from the KafkaConsistentRegionConsumerSimple sample:
//Read in from a kafka server and start consistent region
@consistent(trigger = operatorDriven)
stream<rstring message, rstring key> KafkaConsumerOut = KafkaConsumer()
{
param
propertiesFile : "etc/consumer.properties" ;
topic : $topic ;
partition : 0 ;
triggerCount : 20 ;
}
Parallel Consuming
Consuming in parallel lets you take advantage of the scalability of both Kafka and Streams. Kafka allows for parallel consumption of multi-topic partitions via consumer groups. Any consumer with the same group.id=<consumer-group-name>
will be a part of the same consumer group and will share the partitions of a topic. If you would like to directly control which partitions your consumer is reading from, you can do that by specifying the partition
parameter in the KafkaConsumer operator.
The easiest way to consume from a single topic in parallel is to:
-
Use @parallel with a width equal to the number of partitions in your topic:
@parallel(width = $numTopicPartitions)
Here is a simple example of using three consumers to read from a 3-partition topic using User Defined Parallelism:
@parallel(width = 3)
stream<rstring message, rstring key> KafkaConsumerOut = KafkaConsumer()
{
param
propertiesFile : "etc/consumer.properties" ;
topic : $topic ;
}
If you would like to consume in parallel within a consistent region, check out this KafkaConsistentRegionConsumerParallel sample.
Advanced Parallel Processing
This section is under construction
Connecting to Message Hub in the Cloud
You can use the Streams Kafka operators to produce to and consume from the Kafka-based Message Hub Bluemix service. For a complete guide on how to do this, check out this great article.