Message Hub (Event Streams) Toolkit > com.ibm.streamsx.messagehub 3.3.1 > com.ibm.streamsx.messagehub > MessageHubConsumer
The MessageHubConsumer operator is used to consume records from the IBM Event Streams cloud service. The standard use patterns for the MessageHubConsumer operator are described in the overview of the user documentation.
The operator has been designed to make connectivity to the service as simple as possible. This is achieved in a number of different ways, from having default values for the appConfigName parameter to allowing the user to copy/paste the raw service credentials JSON into either an application configuration property or a file.
The following table lists the default values that have been set by this operator for a couple of parameters:
Parameter |
Default Value |
Description |
---|---|---|
appConfigName |
eventstreams |
Users can choose to place the raw Event Streams credentials JSON in a property called eventstreams.creds in an application configuration called eventstreams. The operator will extract the information needed to connect to Event Streams. |
credentialsFile |
etc/eventstreams.json |
Users can paste the raw Event Streams credentials JSON into a file pointed to by this parameter. The operator will extract the information needed to connect to Event Streams. By default, the operator will look for a file called etc/eventstreams.json. |
This section outlines different options for enabling the MessageHubConsumer operator to connect to the IBM Event Streams cloud service. Any of the following options can be used to configure the operator for connecting to IBM Cloud.
1. Use the credentials operator parameter
This option allows you to use any SPL expression that returns an rstring to specify the service credentials. As an example, you can write and use an SPL function that retrieves the credentials JSON from a key-value-store.
Note: When the credentials parameter is specified, credentials which are stored in a file or application configuration are ignored. You can specify additional Kafka configs in a property file or application configuration, but then you must specify the name of the property file or application configuration with the propertiesFile or appConfigName parameter.
2. Save Credentials in a File
With this option, users can copy their credentials JSON from the Event Streams service and store it in a file called eventstreams.json. When the operator starts up it will read the credentials from that file and extract the information needed to connect. The following steps outline how this can be done:
NOTE: Users can use the credentialsFile parameter to specify a different file containing the Event Streams service credentials JSON.
3. Save Credentials in an Application Configuration Property
With this option, users can copy their service credentials JSON from the Event Streams service and store it in an application configuration property called eventstreams.creds. When the operator starts, it will look for this property and extract the information needed to connect. The following steps outline how this can be done:
NOTE 1: Users can specify a different application configuration name by setting the appConfigName parameter. The operator will still look for a property called eventstreams.creds containing the Event Streams service credentials in JSON format.
NOTE 2: Users can add generic Kafka properties, for example metadata.max.age.ms, or client.dns.lookup, to the same application configuration, which contains the service credentials. To make the operator use these Kafka properties, the appConfigName parameter must be specified even if the default application configuration name eventstreams is used. Looking at the other way round, when the default application configuration name eventstreams is used, but not specified as appConfigName parameter value, only the service credentials are used from this application configuration.
Users only need to specify the topic(s) that they wish to consume messages from (set via the topic parameter).
The operator implements Kafka's Consumer API. As a result, it supports all Kafka properties that are supported by the underlying API. The consumer properties can be found in the Apache Kafka documentation. Properties can be specified in a file or in an application configuration. If specifying properties via a file, the propertiesFile parameter can be used. If specifying properties in an application configuration, the name of the application configuration must be specified using the appConfigName parameter.
property name |
value |
---|---|
bootstrap.servers |
parsed from the service credentials |
sasl.jaas.config |
org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="kafka" username="username" password="password"; |
security.protocol |
SASL_SSL |
sasl.mechanism |
PLAIN |
ssl.protocol |
TLSv1.2 |
ssl.truststore.type |
JKS |
ssl.enabled.protocols |
TLSv1.2 |
ssl.endpoint.identification.algorithm |
HTTPS |
These properties cannot be overwritten by specific Kafka properties provided via properties file or application configuration. They are ignored when specified in a property file or application configuration. In addition to the properties above, the following properties are set by default or adjusted:
Property Name |
Default Value |
---|---|
auto.commit.enable |
adjusted to false |
client.id |
Generated ID in the form: C-J<JobId>-<operator name> when not user provided, when user provided and used in parallel region, the parallel channel number is added. |
group.id |
constructed from hashes of instance-ID, job-ID, and operator name |
group.instance.id |
when staticGroupMember parameter is true: hashes from instance-ID and operator name. When user provided and used in a parallel region, the parallel channel number is added. |
isolation.level |
read_committed |
key.deserializer |
See Automatic deserialization section below |
value.deserializer |
See Automatic deserialization section below |
partition.assignment.strategy |
When in consistent region, the partition assignment strategy is adjusted to strategies that support the EAGER protocol only. When not set or no EAGER only assignor remains, and in consistent region, org.apache.kafka.clients.consumer.RangeAssignor or org.apache.kafka.clients.consumer.RoundRobinAssignor is chosen dependent on number of subscribed topics. When not in a consistent region and no strategy is given: org.apache.kafka.clients.consumer.RoundRobinAssignor when multiple topics or a pattern is subscribed. Otherwise Kafka's built-in default applies. |
max.poll.interval.ms |
adjusted to a minimum of 3 * max (reset timeout, drain timeout) when in consistent region, 300000 otherwise |
metadata.max.age.ms |
adjusted to a maximum of 2000 |
session.timeout.ms |
when unset, set to max (1.2 * reset timeout, 120000) when the operator is a static consumer group member, to 20000 otherwise |
request.timeout.ms |
adjusted to 25000 when the operator is a static consumer group member, to session.timeout.ms + 5000 otherwise |
metric.reporters |
added to provided reporters: com.ibm.streamsx.kafka.clients.consumer.ConsumerMetricsReporter |
metrics.sample.window.ms |
adjusted to 10000 |
client.dns.lookup |
use_all_dns_ips |
reconnect.backoff.max.ms |
10000 |
reconnect.backoff.ms |
250 |
retry.backoff.ms |
500 |
NOTE: Although properties are adjusted, users can override any of the above properties by explicitly setting the property value in either a properties file or in an application configuration.
The operator will automatically select the appropriate deserializers for the key and message based on their types. The following table outlines which deserializer will be used given a particular type:
Deserializer |
SPL Types |
---|---|
org.apache.kafka.common.serialization.StringDeserializer |
rstring |
org.apache.kafka.common.serialization.IntegerDeserializer |
int32, uint32 |
org.apache.kafka.common.serialization.LongDeserializer |
int64, uint64 |
org.apache.kafka.common.serialization.FloatDeserializer |
float32 |
org.apache.kafka.common.serialization.DoubleDeserializer |
float64 |
org.apache.kafka.common.serialization.ByteArrayDeserializer |
blob |
These deserializers are wrapped by extensions that catch exceptions of type org.apache.kafka.common.errors.SerializationException to allow the operator to skip over malformed messages. The used extensions do not modify the actual deserialization function of the given base deserializers from the above table.
Users can override this behavior and specify which deserializer to use by setting the key.deserializer and value.deserializer properties.
Custom Metric |
Description |
---|---|
connection-count |
The current number of active connections. |
incoming-byte-rate |
The number of bytes read off all sockets per second |
topic-partition:records-lag |
The latest lag of the specific partition. A value of -1 indicates that the metric is not applicable to the operator. |
records-lag-max |
The maximum lag in terms of number of records for any partition in this window |
fetch-size-avg |
The average number of bytes fetched per request |
topic:fetch-size-avg |
The average number of bytes fetched per request for a topic |
commit-rate |
The number of commit calls per second |
commit-latency-avg |
The average time taken for a commit request |
Committing offsets is always controlled by the Streams operator. All auto-commit related settings via consumer properties are ignored by the operator.
a) The operator is not part of a consistent region
The consumer operator commits the offsets of those Kafka messages, which have been submitted as tuples. Offsets are committed under the following conditions:
1. The partitions within a consumer group are rebalanced. Before new partitions are assigned, the offsets of the currently assigned partitions are committed. When the partitions are re-assigned, the operators start fetching from these committed offsets. The periodic commit controlled by the commitCount or commitPeriod parameter is reset after rebalance. A partition rebalance happens every time a subscription via control port is changed.
2. Offsets are committed periodically. The period can be a time period or a tuple count. If nothing is specified, offsets are committed every 5 seconds. The time period can be specified with the commitPeriod parameter. When the commitCount parameter is used with a value of N, offsets are committed every N submitted tuples.
3. Partition assignment or subscription via control port is removed. The offsets of those partitions which are de-assigned are committed.
b) The operator is part of a consistent region
Offsets are always committed when the consistent region drains, i.e. when the region becomes a consistent state. On drain, the consumer operator commits the offsets of those Kafka messages that have been submitted as tuples. When the operator is in a consistent region, the parameters commitCount and commitPeriod are ignored because the commit frequency is given by the trigger period of the consistent region. In a consistent region, offsets are committed synchronously, i.e. the offsets are committed when the drain processing of the operator finishes. Commit failures result in consistent region reset.
The operator is capable of taking advantage of Kafka's group management function.
In the figure above, the topic myTopic with three partitions is consumed by two consumer groups. In Group A, which has four consumers, one consumer is idle because the number of partitions is only three. All other consumers in the group would consume exactly one topic partition. Consumer group B has less consumers than partitions. One consumer is assigned to two partitions. The assignment of consumers to partition(s) is fully controlled by Kafka.
In order for the operator to use this function, the following requirements must be met
In a consistent region, a consumer group must not have consumers outside of the consistent region, for example in a different Streams job.
When not in a consistent region, the consumer group must not have consumers outside of the Streams job unless the startPosition parameter is not used or has the value Default.
When group management is used together with startPosition Beginning, End, or Time, the application graph must have a JobControlPlane operator to work correctly. When the JobControlPlane operator cannot be connected, the operator will fail to initialize.
Metrics related to group management
metric name |
description |
---|---|
isGroupManagementActive |
1 indicates that group management is active, 0 indicates that group management is inactive. |
nPartitionRebalances |
Number of partition assignment rebalances for each consumer operator. The metric is only visible when group management is active. |
To benefit from this feature, the Kafka server must be at minimum version 2.3.
Since version 2.3, Kafka supports static group membership, more detailed in this confluent blog post.
Streams applications can benefit from this feature as it avoids unnecessary consumer group rebalances when PEs are re-launched. With dynamic group members (the traditional behavior), a consumer actively leaves the group when its PE is re-launched, leading to an immediate partition rebalance among the remaining consumers.
With static group membership, a consumer leaves the group only based on the session timeout, i.e. when the broker does not receive the heartbeat from the consumer for longer than the session timeout. When the session timeout is high enough, the remaining consumers in a consumer group within an autonomous region will not be affected from the restart of another consumer. They will continue consuming the assigned partitions without being re-assigned to partitions and reset to their last committed offsets. A consumer group within a consistent region benefits also when unnecessary rebalances are avoided because a rebalance always triggers a reset of the consistent region. That's why this feature can help avoid unnecessary consistent region resets.
You can enable this feature by setting either the consumer config group.instance.id to a unique value within the consumer group, or by setting the staticGroupMember parameter to true, which creates a uniqe group.instance.id. When a group.instance.id is detected, the operator automatically increases the default session timeout when no user-provided session timeout is given.
Kafka 2.4 implemented incremental cooperative rebalancing of consumers, KIP-429. The rebalancing protocol is an embedded protocol between the consumer that is chosen as the group leader, and the other group members to negotiate who owns which partitions. The Kafka server, especially the group coordinator is not aware of this embedded protocol, so that the new rebalancing protocol is available for all Kafka server versions, which are supported by this toolkit.
The new rebalancing strategy is beneficial for large consumer groups and happens in multiple rounds, in which only some topic partitions are moved (revoked/assigned) between consumers. Partitions that need not be revoked, stay assigned and continue being consumed. A more elaborated description of this new feature can be found in this confluent blog post.
Activation
Use following consumer property for all consumers within the consumer group: partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
Note: When the consumer operator is used in a consistent region, automatically an assignor for the EAGER protocol is configured as every rebalance iteration would trigger a reset of the consistent region. The reset of a consistent region is a stop-the-world event for the application so that consumers in a consistent region would not benefit from incremental rebalancing. On the contrary, incremental rebalancing could have a negative impact in consistent region.
The operator can be configured for operator driven and periodic checkpointing. Checkpointing is in effect when the operator is configured with an input port. When the operator has no input port, checkpointing can be configured, but is silently ignored. The operator checkpoints the current partition assignment or topic subscription, which is modified via control tuples received by the input port. The current fetch positions are not saved.
On reset, the partition assignment or topic subscription is restored from the checkpoint. The fetch offsets will be the last committed offsets.
With config checkpoint: operatorDriven; the operator creates a checkpoint when the manual partition assignment or the topic subscription changed, i.e. after each input tuple has been processed, which changed the assignment or subscription of the operator.
When in a consistent region
When the operator is part of a consistent region, the operator is reset to the initial state or to the last consistent state when the PE is re-launched. The operator or the group of consumer operators replay tuples. There are no specifics in this case.
When not in a consistent region
a) The consumer operator has no input port
With group management enabled, the operators of a consumer group need a JobControlPlane operator in the application graph to coordinate the initial fetch offsets when the startPosition parameter is set and not Default. When re-launched, the operator seeks a partition to the fetch offset given by startPosition when none of the operators within the group previously committed offsets for this partition. When offsets have been committed, the fetch offset for a partition at operator start will be its last committed offset. This behavior enables you to change the width of a parallel region with consumers being a consumer group. The JobControlPlane operator is not required when the startPosition parameter is not used or has the value Default.
When group management is not enabled for a consumer operator (no group identifier specified or partitions specified), i.e. a single consumer is pinned to topic partitions, a JobControlPlane operator is required in the application graph when startPosition is not Default. The consumer operator stores those partitions in it for which it has already committed offsets. These partitions will not be seeked to startPosition after re-launch of a PE.
Omitting the startPosition parameter or using Default as the value does not require a JobControlPlane operator.
b) The consumer operator is configured with a control input port
When the operator is configured with an input port, the partition assignments or subscription, which have been created via the control stream, are lost. It is therefore recommended to fuse the consumer operator with the source of the control stream to replay the control tuples after restart or to use a config checkpoint clause, preferably operatorDriven, to restore the partition assignment and continue fetching records beginning with the last committed offsets.
The operator can be the start of a consistent region. Both operator driven and periodic triggering of the region are supported. If using operator driven, the triggerCount parameter must be set to indicate how often the operator should initiate a consistent region.
Unless the operator is configured with an input port or the partition parameter is used, the operator participates automatically in a consumer group defined by a user provided or operator generated group ID. A consistent region can have multiple consumer groups.
Tuple replay after reset of the consistent region
After reset of the consistent region, the operators that participate in a consumer group may replay tuples that have been submitted by a different consumer before. The reason for this is, that the assignment of partitions to consumers can change. This property of a consumer group must be kept in mind when combining consumer groups with consistent region.
When the partition parameter is used or the operator has an input port, the partition assignment is static (a consumer consumes all partitions or those, which are specified), so that the consumer operator replays after consistent region reset those tuples, which it has submitted before.
When the consumers of a consumer group rebalance the partition assignment, for example, immediately after job submission, or when the broker node being the group's coordinator is shutdown, multiple resets of the consistent region can occur when the consumers start up. It is recommended to set the maxConsecutiveResetAttempts parameter of the @consistent annotation to a higher value than the default value of 5.
On drain, the operator will commit offsets of the submitted tuples.
On checkpoint, the operator will save the last offset for each topic-partition that it is assigned to. In the event of a reset, the operator will seek to the saved offset for each topic-partition and begin consuming messages from that point.
Metrics related to consistent region
metric name |
description |
---|---|
drainTimeMillis |
drain duration of the last drain in milliseconds |
drainTimeMillisMax |
maximum drain duration in milliseconds |
These metrics are only present when the operator participates in a consistent region.
Many exceptions thrown by the underlying Kafka API are considered fatal. In the event that Kafka throws an exception, the operator will restart.
Optional: appConfigName, clientId, commitCount, commitPeriod, credentials, credentialsFile, groupId, krb5Debug, outputKeyAttributeName, outputMessageAttributeName, outputOffsetAttributeName, outputPartitionAttributeName, outputTimestampAttributeName, outputTopicAttributeName, partition, pattern, propertiesFile, sslDebug, startOffset, startPosition, startPositionStr, startTime, staticGroupMember, topic, triggerCount, userLib
This port is used to specify the topics or topic partitions that the consumer should begin reading messages from. When this port is specified, the topic, partition and startPosition parameters cannot be used. The operator will only begin consuming messages once a tuple is received on this port, which specifies a partition assignment or a topic subscription.
When the MessageHubConsumer participates in a consistent region, only partition assignment via control port is supported. The support of consistent region with the control port is deprecated and may be removed in next major toolkit version.
When a topic subscription is specified, the operator benefits from Kafka's group management (Kafka assigns the partitions to consume). When an assignment is specified in a control tuple, the operator self-assigns to the given partition(s) of the given topic(s). Assignments and subscriptions via control port cannot be mixed. Note, that it is not possible to use both assignment and subscription, it is also not possible to subscribe after a previous assignment and unassignment, and vice versa. This input port must contain a single rstring attribute that takes a JSON formatted string.
Adding or removing a topic subscription
To add or remove a topic subscription, the single rstring attribute must contain a JSON string in the following format:
{ "action" : "ADD" or "REMOVE", "topics" : [ { "topic" : "topic-name" }, ... ] }
The following types and convenience functions are available to aid in creating the JSON string:
Adding or removing a manual partition assignment
To add or remove a topic partition assignment, the single rstring attribute must contain a JSON string in the following format:
{ "action" : "ADD" or "REMOVE", "topicPartitionOffsets" : [ { "topic" : "topic-name" ,"partition" : <partition_number> ,"offset" : <offset_number> <--- the offset attribute is optional }, ... ] }
The following types and convenience functions are available to aid in creating the JSON string:
Important Note: This input port must not receive a final punctuation. Final markers are automatically forwarded causing downstream operators close their input ports. When this input port receives a final marker, it will stop fetching Kafka messages and stop submitting tuples.
Port that produces tuples
Optional: appConfigName, clientId, commitCount, commitPeriod, credentials, credentialsFile, groupId, krb5Debug, outputKeyAttributeName, outputMessageAttributeName, outputOffsetAttributeName, outputPartitionAttributeName, outputTimestampAttributeName, outputTopicAttributeName, partition, pattern, propertiesFile, sslDebug, startOffset, startPosition, startPositionStr, startTime, staticGroupMember, topic, triggerCount, userLib
Specifies the name of the application configuration containing Kafka properties.
Specifies the client ID that should be used when connecting to the Kafka cluster. The value specified by this parameter will override the client.id Kafka property if specified.
Each operator must have a unique client ID. When operators are replicated by a parallel region, the channel-ID is automatically appended to the clientId to make the client-ID distinct for the parallel channels.
If this parameter is not specified and the client.id Kafka property is not specified, the operator will create an ID with the pattern C-J<job-ID>-<operator name> for a consumer operator, and P-J<job-ID>-<operator name> for a producer operator.
This parameter specifies the number of tuples that will be submitted to the output port before committing their offsets. Valid values are greater than zero. This parameter is optional and conflicts with the commitPeriod parameter.
This parameter is only used when the operator is not part of a consistent region. When the operator participates in a consistent region, offsets are always committed when the region drains.
This parameter specifies the period of time in seconds, after which the offsets of submitted tuples are committed. This parameter is optional and has a default value of 5.0. Its minimum value is 0.1, smaller values are pinned to 0.1 seconds. This parameter cannot be used when the commitCount parameter is used.
This parameter is only used when the operator is not part of a consistent region. When the operator participates in a consistent region, offsets are always committed when the region drains.
Specifies the credentials of the Event Streams cloud service instance in JSON. This parameter takes priority over a credentials file and credentials specified as property in an application configuration.
Specifies the name of the file that contains the complete Event Streams service credentials in JSON format. If not specified, this parameter will attempt to load the credentials from the file etc/eventstreams.json or etc/messagehub.json for backwardcompatibility. A relative path is always interpreted as relative to the application directory of the Streams application.
Credentials stored in a file take priority over credentials stored in an appclication configuration.This parameter deprecates the messageHubCredentialsFile parameter.
Specifies the group ID that should be used when connecting to the Kafka cluster. The value specified by this parameter will override the group.id Kafka property if specified. If this parameter is not specified and the group.id Kafka property is not specified, the operator will use a generated group ID, and be a single group member unless the partition parameter is used.
If Kerberos debugging is enabled, all Kerberos related protocol data and information is logged to the console. This setting is equivalent to vmArg: "-Dcom.ibm.security.krb5.Krb5Debug=all";. The default value for this parameter is false. The parameter is ignored when the com.ibm.security.krb5.Krb5Debug property is set via the vmArg parameter.
Specifies the output attribute name that should contain the key. If not specified, the operator will attempt to store the message in an attribute named 'key'.
Specifies the output attribute name that will contain the message. If not specified, the operator will attempt to store the message in an attribute named 'message'.
Specifies the output attribute name that should contain the offset. If not specified, the operator will attempt to store the message in an attribute named 'offset'. The attribute must have the SPL type 'int64' or 'uint64'.
Specifies the output attribute name that should contain the partition number. If not specified, the operator will attempt to store the partition number in an attribute named 'partition'. The attribute must have the SPL type 'int32' or 'uint32'.
Specifies the output attribute name that should contain the record's timestamp. It is presented in milliseconds since Unix epoch.If not specified, the operator will attempt to store the message in an attribute named 'messageTimestamp'. The attribute must have the SPL type 'int64' or 'uint64'.
Specifies the output attribute name that should contain the topic. If not specified, the operator will attempt to store the message in an attribute named 'topic'.
Specifies the partitions that the consumer should be assigned to for each of the topics specified. When you specify multiple topics, the consumer reads from the given partitions of all given topics. For example, if the topic parameter has the values "topic1", "topic2", and the partition parameter has the values 0, 1, then the consumer will assign to {topic1, partition 0}, {topic1, partition 1}, {topic2, partition 0}, and {topic2, partition 1}.
Specifies a regular expression to subscribe dynamically all matching topics. The pattern matching will be done periodically against topic existing at the time of check.
This parameter is incompatible with the topic and the partition parameters, and with the optional input port.
The regular expression syntax follows the Perl 5 regular expressions with some differences. For details see Regular Expressions in Java 8.
Specifies the name of the properties file containing Kafka properties. A relative path is always interpreted as relative to the application directory of the Streams application.
If SSL/TLS protocol debugging is enabled, all SSL protocol data and information is logged to the console. This setting is equivalent to vmArg: "-Djavax.net.debug=true";. The default value for this parameter is false. The parameter is ignored when the javax.net.debug property is set via the vmArg parameter.
This parameter indicates the start offset that the operator should begin consuming messages from. In order for this parameter's values to take affect, the startPosition parameter must be set to Offset. Furthermore, the specific partition(s) that the operator should consume from must be specified via the partition parameter.
If multiple partitions are specified via the partition parameter, then the same number of offset values must be specified. There is a one-to-one mapping between the position of the partition from the partition parameter and the position of the offset from the startOffset parameter. For example, if the partition parameter has the values 0, 1, and the startOffset parameter has the values 100l, 200l, then the operator will begin consuming messages from partition 0 at offset 100 and will consume messages from partition 1 at offset 200.
A limitation with using this parameter is that only one single topic can be specified via the topic parameter.
If this parameter is not specified, the start position is Default. This parameter is incompatible with the optional input port.
Note, that using a startPosition other than Default requires the application always to have a JobControlPlane operator in its graph. The startPosition is effective only when the Consumer has not yet committed offsets during the Streams job's life cycle.
When you need to specify the start position as an rstring type expression, for example when you want to use a submission time value as start position, use the startPositionStr parameter instead. This parameter is incompatible with the startPositionStr parameter. Specify only one of them.
Specifies where the operator should start reading from topics. Use this parameter when you need to specify the start position as an rstring expression, for example when you want to use a submission time value as start position.
The supported values, conditions, and restrictions are the same as for the startPosition parameter.
This parameter is incompatible with the startPosition parameter. Specify only one of them.
This parameter is only used when the startPosition parameter is set to Time. Then the operator will begin reading records from the earliest offset whose timestamp is greater than or equal to the timestamp specified by this parameter. If no offsets are found, then the operator will begin reading messages from what is is specified by the auto.offset.reset consumer property, which is latest as default value. The timestamp must be given as an 'int64' type in milliseconds since Unix epoch.
Enables static Kafka group membership (generates and sets a group.instance.id overriding a potentially user provided group instance identifier) and sets a higher default session timeout. when set to true.
This parameter is ignored when group management is not active.
Please note, that the Kafka server version must be at minimum 2.3 to use static group membership. With lower version, the operator will fail.
The default value of this parameter is false.
Specifies the topic or topics that the consumer should subscribe to. To assign the consumer to specific partitions, use the partitions parameter in addition. To specify multiple topics from which the operator should consume, separate the topic names by comma, for example topic: "topic1", "topic2";, or topic: "topic1,topic2";. The second form can also be used to provide multiple topics from a submission time parameter. To subscribe to multiple topics that match a regular expression, use the pattern parameter.
This parameter is incompatible with the optional input port.
This parameter specifies the number of tuples that will be submitted to the output port before triggering a consistent region. This parameter is only used if the operator is the start of an operator driven consistent region and is ignored otherwise.
Allows the user to specify paths to JAR files that should be loaded into the operators classpath. This is useful if the user wants to be able to specify their own partitioners or assignors. The value of this parameter can either point to a specific JAR file, or to a directory. In the case of a directory, the operator will load all files ending in .jar onto the classpath. By default, this parameter will load all jar files found in <application_dir>/etc/libs.
Shows the Kafka group management state of the operator. When the metric shows 1, group management is active. When the metric is 0, group management is not in place.
Number of topic partitions assigned to the consumer.
Number of topics consumed by this consumer. This is the number of topics of the assigned partitions. Note, that a consumer can subscribe to topics, or to a pattern matching numerous topics, but cannot have assigned partitions of the subscribed topics. This metric value is never higher than metric nAssignedPartitions.
Number of dropped malformed messages
Number of failed tuples received on control port
Number times message fetching was paused due to low memory.
Number of pending messages to be submitted as tuples.
Number times message fetching was paused due to full queue.