Message Hub (Event Streams) Toolkit > com.ibm.streamsx.messagehub 3.3.1 > com.ibm.streamsx.messagehub > MessageHubProducer
The MessageHubProducer operator is used to consume records from the IBM Event Streams cloud service. The operator has been designed to make connectivity to the service as simple as possible. This is achieved in a number of different ways, from having default values for the appConfigName parameter to allowing the user to copy/paste the raw service credentials JSON into either an application configuration property or a file.
The following table lists the default values that have been set by this operator for a couple of parameters:
Parameter |
Default Value |
Description |
---|---|---|
appConfigName |
eventstreams |
Users can choose to place the raw Event Streams credentials JSON in a property called eventstreams.creds in an application configuration called eventstreams. The operator will extract the information needed to connect to Event Streams. |
credentialsFile |
etc/eventstreams.json |
Users can paste the raw Event Streams credentials JSON into a file pointed to by this parameter. The operator will extract the information needed to connect to Event Streams. By default, the operator will look for a file called etc/eventstreams.json. |
This section outlines different options for enabling the MessageHubProducer operator to connect to the IBM Event Streams cloud service. Any of the following options can be used to configure the operator for connecting to IBM Cloud.
1. Use the credentials operator parameter
This option allows you to use any SPL expression that returns an rstring to specify the service credentials. As an example, you can write and use an SPL function that retrieves the credentials JSON from a key-value-store.
Note: When the credentials parameter is specified, credentials which are stored in a file or application configuration are ignored. You can specify additional Kafka configs in a property file or application configuration, but then you must specify the name of the property file or application configuration with the propertiesFile or appConfigName parameter.
2. Save Credentials in a File
With this option, users can copy their credentials JSON from the Event Streams service and store it in a file called eventstreams.json. When the operator starts up it will read the credentials from that file and extract the information needed to connect. The following steps outline how this can be done:
NOTE: Users can use the credentialsFile parameter to specify a different file containing the Event Streams service credentials JSON.
3. Save Credentials in an Application Configuration Property
With this option, users can copy their service credentials JSON from the Event Streams service and store it in an application configuration property called eventstreams.creds. When the operator starts, it will look for this property and extract the information needed to connect. The following steps outline how this can be done:
NOTE 1: Users can specify a different application configuration name by setting the appConfigName parameter. The operator will still look for a property called eventstreams.creds containing the Event Streams service credentials in JSON format.
NOTE 2: Users can add generic Kafka properties, for example metadata.max.age.ms, or client.dns.lookup, to the same application configuration, which contains the service credentials. To make the operator use these Kafka properties, the appConfigName parameter must be specified even if the default application configuration name eventstreams is used. Looking at the other way round, when the default application configuration name eventstreams is used, but not specified as appConfigName parameter value, only the service credentials are used from this application configuration.
Users only need to specify the topic that they wish to produce messages to (set via the topic parameter).
The operator implements Kafka's Producer API. As a result, it supports all Kafka properties that are supported by the underlying API. The producer properties can be found in the Apache Kafka documentation. Properties can be specified in a file or in an application configuration. If specifying properties via a file, the propertiesFile parameter can be used. If specifying properties in an application configuration, the name of the application configuration must be specified using the appConfigName parameter.
property name |
value |
---|---|
bootstrap.servers |
parsed from the service credentials |
sasl.jaas.config |
org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="kafka" username="username" password="password"; |
security.protocol |
SASL_SSL |
sasl.mechanism |
PLAIN |
ssl.protocol |
TLSv1.2 |
ssl.truststore.type |
JKS |
ssl.enabled.protocols |
TLSv1.2 |
ssl.endpoint.identification.algorithm |
HTTPS |
These properties cannot be overwritten by specific Kafka properties provided via properties file or application configuration. They are ignored when specified in a property file or application configuration. In addition to the properties above, the following properties are set by default or adjusted:
Property name |
Default Value |
---|---|
client.id |
Generated ID in the form: P-J<JobId>-<operator name> |
key.serializer |
See Automatic Serialization section below |
value.serializer |
See Automatic Serialization section below |
acks |
Controls the durability of records that are sent. Adjusted to all when the guaranteeOrdering parameter is true, or when the consistentRegionPolicy parameter is Transactional in consistent region. Otherwise acks is unchanged. The value 0 (fire and forget) is not recommended. |
retries |
When the guaranteeOrdering parameter is true, or when the consistentRegionPolicy parameter is Transactional in consistent region, retries is adjusted to a minimum of 1. |
linger.ms |
100 |
batch.size |
32768 |
max.in.flight.requests.per.connection |
Limited to 5 when guaranteeOrdering parameter is true, or when consistentRegionPolicy parameter is Transactional in consistent region. 10 in all other cases when unset. |
enable.idempotence |
true when guaranteeOrdering parameter is true, or when in consistent region and the consistentRegionPolicy parameter is Transactional. |
transactional.id |
Randomly generated ID in the form: tid-<random_string> only when in consistent region and the consistentRegionPolicy parameter is set to Transactional. |
transaction.timeout.ms |
adjusted to a minimum of drain timeout + 120000 milliseconds, but not greater than 900000. Adjusted only when in consistent region and consistentRegionPolicy parameter is set to Transactional. |
metric.reporters |
added to provided reporters: com.ibm.streamsx.kafka.clients.producer.ProducerMetricsReporter |
metrics.sample.window.ms |
adjusted to 10000 |
client.dns.lookup |
use_all_dns_ips |
reconnect.backoff.max.ms |
10000 |
reconnect.backoff.ms |
250 |
retry.backoff.ms |
500 |
NOTE: Although properties are adjusted, users can override any of the above properties by explicitly setting the property value in either a properties file or in an application configuration. Not all properties or possible property values, which can be specified for the Kafka producer version 2.5, are supported by all Broker versions. An example for such a config is the Zstandard compression algorithm, which is supported with broker version 2.1 and above.
Users can specify Kafka properties using Streams' application configurations. Information on configuring application configurations can be found here: Creating application configuration objects to securely store data. Each property set in the application configuration will be loaded as a Kafka property. For example, to specify the cipher suites for SSL that should be used for secure connections, an app config property named ssl.cipher.suites should be created.
The operator will automatically select the appropriate serializers for the key and message based on their types. The following table outlines which deserializer will be used given a particular type:
Serializer |
SPL Types |
---|---|
org.apache.kafka.common.serialization.StringSerializer |
rstring |
org.apache.kafka.common.serialization.IntegerSerializer |
int32, uint32 |
org.apache.kafka.common.serialization.LongSerializer |
int64, uint64 |
org.apache.kafka.common.serialization.FloatSerializer |
float32 |
org.apache.kafka.common.serialization.DoubleSerializer |
float64 |
org.apache.kafka.common.serialization.ByteArraySerializer |
blob |
Custom Metric |
Description |
---|---|
connection-count |
The current number of active connections. |
compression-rate-avg |
The average compression rate of record batches (as percentage, 100 means no compression). |
topic:compression-rate |
The average compression rate of record batches for a topic (as percentage, 100 means no compression). |
record-queue-time-avg |
The average time in ms record batches spent in the send buffer. |
record-queue-time-max |
The maximum time in ms record batches spent in the send buffer. |
record-send-rate |
The average number of records sent per second. |
record-retry-total |
The total number of retried record sends |
topic:record-send-total |
The total number of records sent for a topic. |
topic:record-retry-total |
The total number of retried record sends for a topic |
topic:record-error-total |
The total number of record sends that resulted in errors for a topic |
records-per-request-avg |
The average number of records per request. |
requests-in-flight |
The current number of in-flight requests awaiting a response. |
request-rate |
The number of requests sent per second |
request-size-avg |
The average size of requests sent. |
request-latency-avg |
The average request latency in ms |
request-latency-max |
The maximum request latency in ms |
batch-size-avg |
The average number of bytes sent per partition per-request. |
outgoing-byte-rate |
The number of outgoing bytes sent to all servers per second |
bufferpool-wait-time-total |
The total time an appender waits for space allocation in nanoseconds. |
buffer-available-bytes |
The total amount of buffer memory that is not being used (either unallocated or in the free list). |
A config checkpoint clause has no effect to the operator.
The producer operator can participate in a consistent region. The operator cannot be the start of a consistent region. When the consistent region drains, the operator flushes all accumulated outstanding records to the Kafka cluster.
The operator supports non-transactional (default behavior) and transactional message delivery. The delivery can be controlled by the consistentRegionPolicy parameter.
If the operator crashes or is reset while in a consistent region, the operator will write all tuples replayed. This ensures that every tuple sent to the operator will be written to the topic(s). However, non-transactional message delivery implies that duplicate messages may be written to the topic(s).
Messages are always inserted into a topic within the context of a transaction. Transactions are committed when the operator checkpoints. If the operator crashes or is reset while in a consistent region, the operator will abort an ongoing transaction and write all tuples replayed within a new transaction. External consumers configured with isolation.level=read_committed will not read the duplicates from the aborted transactions. Consumers that use a different isolation level will read duplicate messages as if they were produced without being part of a transaction.
For consumers that read the output topics with isolation.level=read_committed, the transactional producer minimizes number if duplicate messages with the downside that the produced messages are only visible at the checkpoint interval, which can be interpreted as additional latency.
A consumer that reads the output topics with isolation.level=read_committed can read duplicate messages when the consistent region fails after the Kafka transaction has been committed, but before the region has reached a consistent state.
NOTE 1: Transactions in Kafka have an inactivity timeout, which is configured by the producer property transaction.timeout.ms. This timeout is adjusted by the operator to a minimum of the drain timeout plus 120 seconds. The maximum value of this property is limited by the server property transaction.max.timeout.ms, which has a default value of 900000. The operator opens a transaction when the first tuple of a consistent cut is processed. Every tuple being processed resets the inactivity timer.
NOTE 2: For transactional delivery, the Kafka broker must have version 0.11 or higher. Older brokers do not support transactions.
Transactional message delivery is now supported by all plans of the Event Streams cloud service.
Many exceptions thrown by the underlying Kafka API are considered fatal. In the event that Kafka throws a retriable exception, the operator behaves different when used in consistent region or not. When used in a consistent region, the operator initiates a reset of the consistent region. The reset processing will instantiate a new Kafka producer within the operator.
When the operator is not used within a consistent region, the operator tries to recover internally by instantiating a new Kafka producer within the operator and resending all producer records, which are not yet acknowledged. Records that fail two producer generations are considered being finally failed. The corresponding tuple is counted in the custom metric nFailedTuples, and, if the operator is configured with an output port, an output tuple is submitted.
In the event that Kafka throws a non-retriable exception, the tuple that caused the exception is counted in the custom metric nFailedTuples, and, if the operator is configured with an output port, an output tuple is submitted.
Some exceptions can be retried by Kafka itself, such as those that occur due to network error. Therefore, it is not recommended to set the retries property to 0 to disable the producer's retry mechanism.
Optional: appConfigName, clientId, consistentRegionPolicy, credentials, credentialsFile, flush, guaranteeOrdering, keyAttribute, krb5Debug, messageAttribute, outputErrorsOnly, partitionAttribute, propertiesFile, sslDebug, timestampAttribute, topic, topicAttribute, userLib
Port that consumes tuples. Each tuple is written as a record to the configured topic(s).
This port is an optional output port. Dependent on the outputErrorsOnly parameter, the output stream includes only tuples for input tuples that failed to get published on one or all of the specified topics, or it contains tuples corresponding to all input tuples, successfully produced ones and failed tuples.
The output port is asynchronous to the input port of the operator. The sequence of the submitted tuples may also differ from the sequence of the input tuples. Window punctuations from the input stream are not forwarded.
The schema of the output port must consist of one optional attribute of tuple type with the same schema as the input port and one optional attribute of type rstring, ustring, optional<rstring>, or optional<ustring>, that takes a JSON formatted description of the occured error, or remains empty for successfully produced tuples. Emptiness of the attribute means that the attribute contains a string with zero length when declared as rstring or ustring, and an empty optional (optional without a value) when declared as optional. Both attributes can have any names and can be declared in any order in the schema.
Example for declaring the output stream as error output:
stream <Inp failedTuple, rstring failure> Errors = MessageHubProducer (Data as Inp) { ... }Example of the failure description, which would go into the failure attribute above:
{ "failedTopics":["topic1"], "lastExceptionType":"org.apache.kafka.common.errors.TopicAuthorizationException", "lastFailure":"Not authorized to access topics: [topic1]" }Please note that the generated JSON does not contain line breaks as in the example above, where the JSON has been broken into multiple lines to better show its structure.
Example for declaring the output stream for both successfully produced input tuples and failures:
stream <Inp inTuple, optional<rstring> failure> ProduceStatus = MessageHubProducer (Data as Inp) { param outputErrorsOnly: false; ... }
Optional: appConfigName, clientId, consistentRegionPolicy, credentials, credentialsFile, flush, guaranteeOrdering, keyAttribute, krb5Debug, messageAttribute, outputErrorsOnly, partitionAttribute, propertiesFile, sslDebug, timestampAttribute, topic, topicAttribute, userLib
Specifies the name of the application configuration containing Kafka properties.
Specifies the client ID that should be used when connecting to the Kafka cluster. The value specified by this parameter will override the client.id Kafka property if specified.
Each operator must have a unique client ID. When operators are replicated by a parallel region, the channel-ID is automatically appended to the clientId to make the client-ID distinct for the parallel channels.
If this parameter is not specified and the client.id Kafka property is not specified, the operator will create an ID with the pattern C-J<job-ID>-<operator name> for a consumer operator, and P-J<job-ID>-<operator name> for a producer operator.
Specifies the policy to use when in a consistent region.
When NonTransactional is specified, the operator guarantees that every tuple is written to the topic(s) at least once. When the consistent region resets, duplicates will most likely appear in the output topic(s). For consumers of the output topics, messages appears as they are produced.
When Transactional is specified, the operator will write tuples to the topic(s) within the context of a transaction. Transactions are commited when the operator checkpoints. This implies that downstream Kafka consumers may not see the messages until operator checkpoints. Transactional delivery minimizes (though not eliminates) duplicate messages for consumers of the output topics when they are configured with the consumer property isolation.level=read_committed. Consumers that read with the default isolation level read_uncommitted see all messages as they are produced. For these consumers, there is no difference between transactional and non-transactional message delivery.
For backward compatibility, the parameter value AtLeastOnce can also be specified, but is deprecated and can be removed in a future version. AtLeastOnce is equivalent to NonTransactional.
This parameter is ignored if the operator is not part of a consistent region. The default value is NonTransactional. NOTE: Kafka brokers older than version 0.11 do not support transactions.
Specifies the credentials of the Event Streams cloud service instance in JSON. This parameter takes priority over a credentials file and credentials specified as property in an application configuration.
Specifies the name of the file that contains the complete Event Streams service credentials in JSON format. If not specified, this parameter will attempt to load the credentials from the file etc/eventstreams.json or etc/messagehub.json for backwardcompatibility. A relative path is always interpreted as relative to the application directory of the Streams application.
Credentials stored in a file take priority over credentials stored in an appclication configuration.This parameter deprecates the messageHubCredentialsFile parameter.
Specifies the number of tuples, after which the producer is flushed. When not specified, or when the parameter value is not positive, the flush interval is adaptively calculated to avoid queing times significantly over five seconds.
Flushing the producer makes all buffered records immediately available to send to the server (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with the buffered records. When a small value is specified, the batching of tuples to server requests and compression (if used) may get inefficient.
Under normal circumstances, this parameter should be used only when the adaptive flush control gives not the desired results, for example when the custom metrics buffer-available-bytes goes very small and record-queue-time-max or record-queue-time-avg gets too high.
If set to true, the operator guarantees that the order of records within a topic partition is the same as the order of processed tuples when it comes to retries. This implies that the operator sets the enable.idempotence producer config automatically to true, acks to all, enables retries, and adjusts max.in.flight.requests.per.connection to an upper limit of 5.
If unset, the default value of this parameter is false, which means that the order can change due to retries as long as the producer configuration max.in.flight.requests.per.connection is greater than 1.
Note for users of Kafka 0.10.x:
The idempotent producer is not supported for Kafka versions < 0.11. When guaranteed record order is required with older Kafka servers, users must set the producer config max.in.flight.requests.per.connection=1 instead of setting guaranteeOrdering to true.
Specifies the input attribute that contains the Kafka key value. If not specified, the operator will look for an input attribute named key.
If Kerberos debugging is enabled, all Kerberos related protocol data and information is logged to the console. This setting is equivalent to vmArg: "-Dcom.ibm.security.krb5.Krb5Debug=all";. The default value for this parameter is false. The parameter is ignored when the com.ibm.security.krb5.Krb5Debug property is set via the vmArg parameter.
Specifies the attribute on the input port that contains the message payload. If not specified, the operator will look for an input attribute named message. If this parameter is not specified and there is no input attribute named message, the operator will throw an exception and terminate.
If set to true, the operator submits tuples to the optional output port only for the tuples that failed to produce. If set to false, the operator submits also tuples for the successfully produced input tuples.
If unset, the default value of this parameter is true. This parameter is ignored when the operator is not configured with an output port.
Specifies the input attribute that contains the partition number that the message should be written to. If this parameter is not specified, the operator will look for an input attribute named partition. If the user does not indicate which partition the message should be written to, then Kafka's default partitioning strategy will be used instead (partition based on the specified partitioner or in a round-robin fashion).
Specifies the name of the properties file containing Kafka properties. A relative path is always interpreted as relative to the application directory of the Streams application.
If SSL/TLS protocol debugging is enabled, all SSL protocol data and information is logged to the console. This setting is equivalent to vmArg: "-Djavax.net.debug=true";. The default value for this parameter is false. The parameter is ignored when the javax.net.debug property is set via the vmArg parameter.
Specifies the attribute on the input port that contains the timestamp for the message. If not specified, the operator will look for an input attribute named messageTimestamp. If this parameter is not specified and there is no input attribute named messageTimestamp, the operator will use the timestamp provided by Kafka (broker config log.message.timestamp.type=[CreateTime|LogAppendTime]).
Specifies the topic(s) that the producer should send messages to. The value of this parameter will take precedence over the topicAttribute parameter. This parameter will also take precedence if the input tuple schema contains an attribute named topic.
Specifies the input attribute that contains the name of the topic that the message should be written to. If this parameter is not specified, the operator will look for an input attribute named topic. This parameter value is overridden if the topic parameter is specified.
Allows the user to specify paths to JAR files that should be loaded into the operators classpath. This is useful if the user wants to be able to specify their own partitioners or assignors. The value of this parameter can either point to a specific JAR file, or to a directory. In the case of a directory, the operator will load all files ending in .jar onto the classpath. By default, this parameter will load all jar files found in <application_dir>/etc/libs.
Number of tuples that could not be produced for all topics
Number of input tuples not yet produced (acknowledged from Kafka)
Number times the input tuple processing was paused due to full tuple queue of pending tuples.
The producer generation. When a new producer is created, a new generation is created.