Using streamsx.kafka with Red Hat AMQ Streams
Abstract
This document describes how to use the SPL operators of the streamsx.kafka toolkit to connect with Red Hat AMQ Streams. At the time of writing this article, the latest version of AMQ Streams was 1.2.
What is Red Hat AMQ Streams
AMQ Streams is an Apache Kafka distribution on OpenShift container platform. It is part of the Red Hat AMQ messaging suite since October 2018 (Red Hat Announcement).
Supported Kafka features
AMQ Streams is managed with operators and custom resource definitions for configuring the Kafka cluster, topics, users, and access control to groups and topics.
-
Cluster Operator
Responsible for deploying and managing Apache Kafka clusters within an OpenShift cluster.
-
Topic Operator
Responsible for managing Kafka topics within a Kafka cluster running within an OpenShift cluster.
-
User Operator
Rresponsible for managing Kafka users within a Kafka cluster running within an OpenShift cluster. When users are created, the User Operator also creates the secret for the users. This can be a client certificate and its private key, or the user’s password.
From the Kafka client perspective, AMQ Streams supports:
- Plain text listener for access from applications running within the OpenShift environment
- TLS listener for access from applications running within the OpenShift environment
- External listener for access from applications running outside the OpenShift cluster. Dependent on the method how the Kafka service is exposed to the outside, a plain text connection may be supported or not.
- User authentication on listener level
- mutual TLS authentication with client certificate
- SASL/SCRAM-SHA-512 with username and password
- Simple user authorization by using Access Control Lists - authorization is not reflected in the consumer or producer configuration, however
The AMQ Streams 1.2 documentation can be found here: https://access.redhat.com/documentation/en-us/red_hat_amq/7.4/html-single/using_amq_streams_on_openshift_container_platform/index
Setup of streamsx.kafka toolkit operators
To use the KafkaConsumer or KafkaProducer operator, properties must be created, which can be configured as a property file or within an application configuration.
Connecting over a TLS secured connection
When a listener in AMQ Streams is configured for use of TLS (default for external listener), the Kafka brokers present a server certificate, which must be trusted by the client (i.e. the IBM Streams operator). AMQ Streams has its own CA certificate for the Kafka cluster, which is used to sign all server certificates. It is generated by the Cluster Operator. An administrator can replace the CA certificate by a user-provided certificate, which, for example, has a signature of a public CA. When the default CA certificate is used, it must be extracted from the OpenShift cluster and be placed in a truststore.
Connection information
As Red Hat AMQ Streams can be configured in many ways to support various features, you may need different information:
-
bootstrap (server name and TCP port)
Always required. How this information is gathered, depends on the listener used to connect, and the listener configuration when the external listener is used.
-
TLS for the connection (yes/no)
-
cluster CA certificate
This certificate is required when the default CA certificate is used. It must be available as file in PEM format. The PEM format is a text format, in which the binary data is base64 encoded, and included in
-----BEGIN CERTIFICATE-----
and-----END CERTIFICATE-----
anchors. -
hostname verification
Exposing Kafka as NodePort service for external clients, requires that hostname verification must be disabled in the clients when TLS is used.
-
authentication (Which type of authentication: No, TLS, SCRAM-SHA-512)
-
dependent on the authentication type, secrets must be known
- username and password for SCRAM authentication
- client certificate and its private RSA key for TLS authentication. Certificate and key must be available in PEM (text) format as separate files.
When you have access to the OpenShift environment where AMQ Streams is deployed, you must know how the Kafka cluster is configured, in order to fetch the right information. How to gather the data is described in the AMQ Streams Documentation.
When you cannot access the OpenShift environment, the administrator of AMQ Streams must give you all connection details including certificates, private keys, username, password, accessible topics, and the like.
Creating a consumer or producer configuration
Kafka consumers or producers are configured by using properties. The properties, which are relevant for the connection, are identical for consumers and producers. These are the potentially used properties:
bootstrap.servers
security.protocol
ssl.endpoint.identification.algorithm
sasl.mechanism
sasl.jaas.config
ssl.keystore.location
ssl.keystore.type
ssl.keystore.password
ssl.key.password
ssl.truststore.location
ssl.truststore.type
ssl.truststore.password
Handling of truststore and keystore files
Certificates and private keys must be imported into stores, secured by passwords,
and configured either as truststore or as keystore. The filenames of the store files go
into the properties ssl.keystore.location
and ssl.truststore.location
. This implies
that the store files must be accessible from wherever the Kafka operator’s PEs are running.
To achieve this, the store files can be bundled with the application bundle or stored in
a shared filesystem.
When storefiles are bundled with the application bundle (SAB), the files should be stored
in the etc
directory of the SPL application folder. The application folder must be denoted
in a property with the {applicationDir}
placeholder, for example
ssl.keystore.location={applicationDir}/etc/mykeystore.jks
Note: When keystore files or truststore files are attached to the application bundle, they cannot be replaced without re-building the bundle and re-submitting the application. When they are stored in a shared filesystem, they can be replaced. A restart of the PEs would apply the new keystore or truststore file.
When you have a Python3 environment
When you have a Python3 environment, you can use the streamsx.kafka
Python
package version 1.6.0 or higher to generate properties. Install the streamsx.python
package into your Python3 environment:
$ pip install streamsx.kafka
This package installs the utility streamsx-kafka-make-properties.
streamsx-kafka-make-properties -h
usage: streamsx-kafka-make-properties [-h] [-v] --bootstrap hostname:port
[hostname:port ...] [--no-tls]
[--no-hostname-verify]
[--trusted-cert file [file ...]]
[--auth {NONE,TLS,PLAIN,SCRAM-SHA-512}]
[--client-cert file]
[--client-private-key file]
[--username USERNAME]
[--password PASSWORD]
[--out-keystore-dir directory]
[--out-propfile file]
Create Kafka consumer or producer configuration for use with the SPL
streamsx.kafka toolkit.
optional arguments:
-h, --help show this help message and exit
-v, --version show program's version number and exit
Connection:
connection details
--bootstrap hostname:port [hostname:port ...], -b hostname:port [hostname:port ...]
one or more bootstrap servers in the form
hostname:port
--no-tls connect over an unencrypted plaintext connection,
default is to use TLS
--no-hostname-verify disable hostname verification of server certificates
when TLS is used for the connection
--trusted-cert file [file ...], -T file [file ...]
one or more paths to files with certificates in PEM
format, which are trusted by the client
Authentication:
options for client authentication
--auth {NONE,TLS,PLAIN,SCRAM-SHA-512}, -A {NONE,TLS,PLAIN,SCRAM-SHA-512}
authentication method, defaults to NONE
--client-cert file, -C file
path to a file that contains the client certificate in
PEM format, required for TLS authentication
--client-private-key file, -K file
path to a file that contains the private key of the
client certificate in PEM format, required for TLS
authentication
--username USERNAME, -U USERNAME
user name, required for PLAIN and SCRAM-SHA-512
authentication
--password PASSWORD, -P PASSWORD
password, required for PLAIN and SCRAM-SHA-512
authentication
Output:
options for output file generation
--out-keystore-dir directory, -d directory
directory where keystores files are created; the
directory must be an existing directory - default is
the current working directory
--out-propfile file, -o file
The filename of an output property file. If not
specified, properties are written to stdout together
with other information
Example
$ streamsx-kafka-make-properties --bootstrap kafka1.my.domain:443 \
--trusted-cert cluster-ca.crt --auth TLS \
--client-cert user.crt --client-private-key user.key \
--out-propfile kafka.properties
keystore file ./keystore-0520201806.jks generated. Store and key password is 05b5Umeirt0Zsfko
Copy this file into the etc/ directory of your Streams application.
truststore file ./truststore-0520201806.jks generated. Store password is 05b5Umeirt0Zsfko
Copy this file into the etc/ directory of your Streams application.
$
The above invocation would generate the files keystore-0520201806.jks
and truststore-0520201806.jks
in
current working directory, and the property file kafka.properties
with following content:
bootstrap.servers=kafka1.my.domain:443
security.protocol=SSL
ssl.keystore.type=JKS
ssl.keystore.password=05b5Umeirt0Zsfko
ssl.key.password=05b5Umeirt0Zsfko
ssl.keystore.location={applicationDir}/etc/keystore-0520201806.jks
ssl.endpoint.identification.algorithm=https
ssl.truststore.type=JKS
ssl.truststore.password=05b5Umeirt0Zsfko
ssl.truststore.location={applicationDir}/etc/truststore-0520201806.jks
Note, that the keystore and truststore file is created with a 10 digit random token in the filename,
which is different on every streamsx-kafka-make-properties
invocation.
The store files must be copied into the etc
folder within your application as they are
referenced in this way by the properties. The property file itself can be used as basis
for separate consumer or producer properties or can be used as is.
You should also consider to create an application configuration from the property file
and use the application configuration with the Kafka operators.
When you can not use Python3
When no TLS for transport, and no authentication is required, the bootstrap.servers
property is sufficient. In all other cased additional properties are required.
For handling of certificates and keys the keytool
from Java and openssl
must be
used to create the storefiles.
Create a truststore
When you connect with TLS and must configure trusted certificates, a truststore must be created and configured.
$ keytool -import -trustcacerts -alias root -file <trusted_cert_pem_file> \
-keystore ./truststore.jks -storepass <trustore_password> -noprompt
Copy the truststore file, here ./truststore.jks
, to its destination folder,
and configure it in the ssl.truststore.location
property. Configure the
truststore password as ssl.truststore.password
property.
Disable hostname verification of the server certificate
When hostname verification must be disabled, configure following property with an empty value:
ssl.endpoint.identification.algorithm=
Create a keystore for client certificate and private RSA keys
When you authenticate the client with a certificate, a keystore must be created
and configured. This procedure involves the use of openssl
to create an
intermediate PKCS12 store.
$ openssl pkcs12 -export -in <client_cert_pem_file> -inkey <client_private_key_file> \
-name client-alias -out ./keystore.pkcs12 -noiter -nomaciter -passout <keystore_password>
$ keytool -importkeystore -deststorepass <keystore_password> -destkeystore ./keystore.jks \
-srckeystore ./keystore.pkcs12 -srcstoretype pkcs12 -srcstorepass <keystore_password>
Copy the keystore file, here ./keystore.jks
to its destination folder,
and configure it in the ssl.keystore.location
property. Configure the keystore
password as both ssl.keystore.password
, and ssl.key.password
properties.
The key will always be encrypted with the same password as used to secure the keystore.
Configure TLS authentication
Following properties must be set in addition to bootstrap.servers
, and the truststore
and keystore relevant properties:
security.protocol=SSL
Configure SCRAM-SHA-512 authentication
For use with SCRAM-SHA-512 authentication, following properties must be configured:
When SCRAM-SHA-512 is used over a TLS secured connection:
security.protocol=SASL_SSL
When SCRAM-SHA-512 is used over a plaintext connection:
security.protocol=SASL_PLAINTEXT
Independent on transport layer security, following properties must be configured:
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="<username>"\
password="<password>";
Certificate Renewal
Automatic renewal of certificates is not supported. When certificates are renewed, a new keystores and optionally new truststore must be created. When the keystore and truststore files are bundled with the Streams Application Bundle, the bundle must be rebuild and re-submitted.
When the keystore and truststore files are stored on a shared volume, the stores can be replaced on the volume, and the PEs, which host the Kafka operators, must be restarted.
Useful links
AMQ Streams 1.2 Documentation: https://access.redhat.com/documentation/en-us/red_hat_amq/7.4/html-single/using_amq_streams_on_openshift_container_platform/index
Quickstart with AMQ Streams on minishift: https://dzone.com/articles/how-to-run-kafka-on-openshift-the-enterprise-kuber
Getting started with minishift: https://www.novatec-gmbh.de/en/blog/getting-started-minishift-openshift-origin-one-vm/