Using streamsx.kafka with Red Hat AMQ Streams

Edit me

Abstract

This document describes how to use the SPL operators of the streamsx.kafka toolkit to connect with Red Hat AMQ Streams. At the time of writing this article, the latest version of AMQ Streams was 1.2.

What is Red Hat AMQ Streams

AMQ Streams is an Apache Kafka distribution on OpenShift container platform. It is part of the Red Hat AMQ messaging suite since October 2018 (Red Hat Announcement).

Supported Kafka features

AMQ Streams is managed with operators and custom resource definitions for configuring the Kafka cluster, topics, users, and access control to groups and topics.

  • Cluster Operator

    Responsible for deploying and managing Apache Kafka clusters within an OpenShift cluster.

  • Topic Operator

    Responsible for managing Kafka topics within a Kafka cluster running within an OpenShift cluster.

  • User Operator

    Rresponsible for managing Kafka users within a Kafka cluster running within an OpenShift cluster. When users are created, the User Operator also creates the secret for the users. This can be a client certificate and its private key, or the user’s password.

From the Kafka client perspective, AMQ Streams supports:

  • Plain text listener for access from applications running within the OpenShift environment
  • TLS listener for access from applications running within the OpenShift environment
  • External listener for access from applications running outside the OpenShift cluster. Dependent on the method how the Kafka service is exposed to the outside, a plain text connection may be supported or not.
  • User authentication on listener level
    • mutual TLS authentication with client certificate
    • SASL/SCRAM-SHA-512 with username and password
  • Simple user authorization by using Access Control Lists - authorization is not reflected in the consumer or producer configuration, however

The AMQ Streams 1.2 documentation can be found here: https://access.redhat.com/documentation/en-us/red_hat_amq/7.4/html-single/using_amq_streams_on_openshift_container_platform/index

Setup of streamsx.kafka toolkit operators

To use the KafkaConsumer or KafkaProducer operator, properties must be created, which can be configured as a property file or within an application configuration.

Connecting over a TLS secured connection

When a listener in AMQ Streams is configured for use of TLS (default for external listener), the Kafka brokers present a server certificate, which must be trusted by the client (i.e. the IBM Streams operator). AMQ Streams has its own CA certificate for the Kafka cluster, which is used to sign all server certificates. It is generated by the Cluster Operator. An administrator can replace the CA certificate by a user-provided certificate, which, for example, has a signature of a public CA. When the default CA certificate is used, it must be extracted from the OpenShift cluster and be placed in a truststore.

Connection information

As Red Hat AMQ Streams can be configured in many ways to support various features, you may need different information:

  • bootstrap (server name and TCP port)

    Always required. How this information is gathered, depends on the listener used to connect, and the listener configuration when the external listener is used.

  • TLS for the connection (yes/no)

  • cluster CA certificate

    This certificate is required when the default CA certificate is used. It must be available as file in PEM format. The PEM format is a text format, in which the binary data is base64 encoded, and included in -----BEGIN CERTIFICATE----- and -----END CERTIFICATE----- anchors.

  • hostname verification

    Exposing Kafka as NodePort service for external clients, requires that hostname verification must be disabled in the clients when TLS is used.

  • authentication (Which type of authentication: No, TLS, SCRAM-SHA-512)

  • dependent on the authentication type, secrets must be known

    • username and password for SCRAM authentication
    • client certificate and its private RSA key for TLS authentication. Certificate and key must be available in PEM (text) format as separate files.

When you have access to the OpenShift environment where AMQ Streams is deployed, you must know how the Kafka cluster is configured, in order to fetch the right information. How to gather the data is described in the AMQ Streams Documentation.

When you cannot access the OpenShift environment, the administrator of AMQ Streams must give you all connection details including certificates, private keys, username, password, accessible topics, and the like.

Creating a consumer or producer configuration

Kafka consumers or producers are configured by using properties. The properties, which are relevant for the connection, are identical for consumers and producers. These are the potentially used properties:

bootstrap.servers
security.protocol
ssl.endpoint.identification.algorithm
sasl.mechanism
sasl.jaas.config
ssl.keystore.location
ssl.keystore.type
ssl.keystore.password
ssl.key.password
ssl.truststore.location
ssl.truststore.type
ssl.truststore.password

Handling of truststore and keystore files

Certificates and private keys must be imported into stores, secured by passwords, and configured either as truststore or as keystore. The filenames of the store files go into the properties ssl.keystore.location and ssl.truststore.location. This implies that the store files must be accessible from wherever the Kafka operator’s PEs are running. To achieve this, the store files can be bundled with the application bundle or stored in a shared filesystem.

When storefiles are bundled with the application bundle (SAB), the files should be stored in the etc directory of the SPL application folder. The application folder must be denoted in a property with the {applicationDir} placeholder, for example

ssl.keystore.location={applicationDir}/etc/mykeystore.jks

Note: When keystore files or truststore files are attached to the application bundle, they cannot be replaced without re-building the bundle and re-submitting the application. When they are stored in a shared filesystem, they can be replaced. A restart of the PEs would apply the new keystore or truststore file.

When you have a Python3 environment

When you have a Python3 environment, you can use the streamsx.kafka Python package version 1.6.0 or higher to generate properties. Install the streamsx.python package into your Python3 environment:

$ pip install streamsx.kafka

This package installs the utility streamsx-kafka-make-properties.

streamsx-kafka-make-properties -h
usage: streamsx-kafka-make-properties [-h] [-v] --bootstrap hostname:port
                                      [hostname:port ...] [--no-tls]
                                      [--no-hostname-verify]
                                      [--trusted-cert file [file ...]]
                                      [--auth {NONE,TLS,PLAIN,SCRAM-SHA-512}]
                                      [--client-cert file]
                                      [--client-private-key file]
                                      [--username USERNAME]
                                      [--password PASSWORD]
                                      [--out-keystore-dir directory]
                                      [--out-propfile file]

Create Kafka consumer or producer configuration for use with the SPL
streamsx.kafka toolkit.

optional arguments:
  -h, --help            show this help message and exit
  -v, --version         show program's version number and exit

Connection:
  connection details

  --bootstrap hostname:port [hostname:port ...], -b hostname:port [hostname:port ...]
                        one or more bootstrap servers in the form
                        hostname:port
  --no-tls              connect over an unencrypted plaintext connection,
                        default is to use TLS
  --no-hostname-verify  disable hostname verification of server certificates
                        when TLS is used for the connection
  --trusted-cert file [file ...], -T file [file ...]
                        one or more paths to files with certificates in PEM
                        format, which are trusted by the client

Authentication:
  options for client authentication

  --auth {NONE,TLS,PLAIN,SCRAM-SHA-512}, -A {NONE,TLS,PLAIN,SCRAM-SHA-512}
                        authentication method, defaults to NONE
  --client-cert file, -C file
                        path to a file that contains the client certificate in
                        PEM format, required for TLS authentication
  --client-private-key file, -K file
                        path to a file that contains the private key of the
                        client certificate in PEM format, required for TLS
                        authentication
  --username USERNAME, -U USERNAME
                        user name, required for PLAIN and SCRAM-SHA-512
                        authentication
  --password PASSWORD, -P PASSWORD
                        password, required for PLAIN and SCRAM-SHA-512
                        authentication

Output:
  options for output file generation

  --out-keystore-dir directory, -d directory
                        directory where keystores files are created; the
                        directory must be an existing directory - default is
                        the current working directory
  --out-propfile file, -o file
                        The filename of an output property file. If not
                        specified, properties are written to stdout together
                        with other information

Example

$ streamsx-kafka-make-properties --bootstrap kafka1.my.domain:443 \
    --trusted-cert cluster-ca.crt --auth TLS \
    --client-cert user.crt --client-private-key user.key \
    --out-propfile kafka.properties

keystore file ./keystore-0520201806.jks generated. Store and key password is 05b5Umeirt0Zsfko
Copy this file into the etc/ directory of your Streams application.
truststore file ./truststore-0520201806.jks generated. Store password is 05b5Umeirt0Zsfko
Copy this file into the etc/ directory of your Streams application.
$

The above invocation would generate the files keystore-0520201806.jks and truststore-0520201806.jks in current working directory, and the property file kafka.properties with following content:

bootstrap.servers=kafka1.my.domain:443
security.protocol=SSL
ssl.keystore.type=JKS
ssl.keystore.password=05b5Umeirt0Zsfko
ssl.key.password=05b5Umeirt0Zsfko
ssl.keystore.location={applicationDir}/etc/keystore-0520201806.jks
ssl.endpoint.identification.algorithm=https
ssl.truststore.type=JKS
ssl.truststore.password=05b5Umeirt0Zsfko
ssl.truststore.location={applicationDir}/etc/truststore-0520201806.jks

Note, that the keystore and truststore file is created with a 10 digit random token in the filename, which is different on every streamsx-kafka-make-properties invocation.

The store files must be copied into the etc folder within your application as they are referenced in this way by the properties. The property file itself can be used as basis for separate consumer or producer properties or can be used as is. You should also consider to create an application configuration from the property file and use the application configuration with the Kafka operators.

When you can not use Python3

When no TLS for transport, and no authentication is required, the bootstrap.servers property is sufficient. In all other cased additional properties are required. For handling of certificates and keys the keytool from Java and openssl must be used to create the storefiles.

Create a truststore

When you connect with TLS and must configure trusted certificates, a truststore must be created and configured.

$ keytool -import -trustcacerts -alias root -file <trusted_cert_pem_file> \
    -keystore ./truststore.jks -storepass <trustore_password> -noprompt

Copy the truststore file, here ./truststore.jks, to its destination folder, and configure it in the ssl.truststore.location property. Configure the truststore password as ssl.truststore.password property.

Disable hostname verification of the server certificate

When hostname verification must be disabled, configure following property with an empty value:

ssl.endpoint.identification.algorithm=

Create a keystore for client certificate and private RSA keys

When you authenticate the client with a certificate, a keystore must be created and configured. This procedure involves the use of openssl to create an intermediate PKCS12 store.

$ openssl pkcs12 -export -in <client_cert_pem_file> -inkey <client_private_key_file> \
    -name client-alias -out ./keystore.pkcs12 -noiter -nomaciter -passout <keystore_password>
$ keytool -importkeystore -deststorepass <keystore_password> -destkeystore ./keystore.jks \
    -srckeystore ./keystore.pkcs12 -srcstoretype pkcs12 -srcstorepass <keystore_password>

Copy the keystore file, here ./keystore.jks to its destination folder, and configure it in the ssl.keystore.location property. Configure the keystore password as both ssl.keystore.password, and ssl.key.password properties. The key will always be encrypted with the same password as used to secure the keystore.

Configure TLS authentication

Following properties must be set in addition to bootstrap.servers, and the truststore and keystore relevant properties:

security.protocol=SSL

Configure SCRAM-SHA-512 authentication

For use with SCRAM-SHA-512 authentication, following properties must be configured:

When SCRAM-SHA-512 is used over a TLS secured connection:

security.protocol=SASL_SSL

When SCRAM-SHA-512 is used over a plaintext connection:

security.protocol=SASL_PLAINTEXT

Independent on transport layer security, following properties must be configured:

sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="<username>"\
    password="<password>";

Certificate Renewal

Automatic renewal of certificates is not supported. When certificates are renewed, a new keystores and optionally new truststore must be created. When the keystore and truststore files are bundled with the Streams Application Bundle, the bundle must be rebuild and re-submitted.

When the keystore and truststore files are stored on a shared volume, the stores can be replaced on the volume, and the PEs, which host the Kafka operators, must be restarted.

Useful links

AMQ Streams 1.2 Documentation: https://access.redhat.com/documentation/en-us/red_hat_amq/7.4/html-single/using_amq_streams_on_openshift_container_platform/index

Quickstart with AMQ Streams on minishift: https://dzone.com/articles/how-to-run-kafka-on-openshift-the-enterprise-kuber

Getting started with minishift: https://www.novatec-gmbh.de/en/blog/getting-started-minishift-openshift-origin-one-vm/

Updated: