Operator JMSSource

IBMStreams com.ibm.streamsx.jms Toolkit > com.ibm.streamsx.jms 2.0.0 > com.ibm.streamsx.jms > JMSSource

The JMSSource operator reads data from a WebSphere MQ or an Apache Active MQ queue or a topic and creates tuples from the read data.

The JMSSource operator converts each WebSphere MQ or Apache Active MQ message to a separate tuple and sends it to the output stream. A single message is converted into a single tuple.

SSL Support

The JMSSource operator provides support for SSL via these parameters: sslConnection, keyStore, keyStorePassword and trustStore. When sslConnection is set to true, the keyStore, keyStorePassword and trustStore parameters must be set.

Note: The JMSSource operator configures SSL by setting the JVM system properties via calls to System.property(). Java operators that are fused into the same PE share the same JVM. This implies that any other Java operators fused into the same PE as the JMSSource operator will have these SSL properties set. If this is undesirable, then the JMSSource operator should be placed into it's own PE.

Behavior in a consistent region

The JMSSource operator can participate in a consistent region. The operator must be at the start of a consistent region.

The operator supports periodic and operator-driven consistent region policies. If the consistent region policy is set as operatorDriven, the triggerCount parameter must be specified. The operator initiates a checkpoint after number of tuples specified by the triggerCount parameter have been processed. If the consistent region policy is set as periodic, the operator respects the period setting and establishes consistent states accordingly.

When a message queue is consumed by multiple message consumers, i.e. multiple JMSSource instances are used to read messages from a same queue, then deterministic routing is required. This requirement can be achieved through the messageSelector parameter. For example, if an SPL application has two JMSSource operator instances and a JMS property named "group" is present on messages that can take value of either 'g1' or 'g2', then each JMSSource operator instance can be assigned in the following manner:

MyPersonNamesStream1 = JMSSource()
{
	param
		connectionDocument : "/home/streamsuser/connections/JMSconnections.xml";
		connection         : "amqConn";
		access             : "amqAccess";
		messageSelector    : "group = 'g1'";
}
MyPersonNamesStream2 = JMSSource()
{
	param
        connectionDocument : "/home/streamsuser/connections/JMSconnections.xml";
		connection         : "amqConn";
		access             : "amqAccess";
		messageSelector    : "group = 'g2'";
}

Exceptions

The following types of exceptions can occur:
  • Run time errors that halt the operator execution.
    • The JMSSource operator throws an exception and terminates in the following cases. For some exceptions, the trace and log information is logged in the console logs and also output to the optional output port if the application is configured to use the optional port.
      • During the initial connection attempt or during transient reconnection failures, if the reconnectionPolicy is set to NoRetry and the operator does not have a successful connection, or the reconnectionPolicy is set to BoundedRetry and the operator does not have a successful connection after the number of attempts that are specified in the reconnectionBound parameter. Successive data is lost.
      • The queue name is unknown.
      • The queue manager name is unknown.
      • The operator is unable to connect to the host or the port.
      • A MapMessage, StreamMessage, or BytesMessage does not contain the attributes that are specified in the native schema.
      • A MapMessage or StreamMessage contains attributes whose data type does not match the data type that is specified in the native schema and the conversion fails.
      • The message_class attribute in the access specification is bytes, stream, or map, and the name and type attributes of the <native_schema> element do not match the name and type attribute in the output stream schema.
      • When the message_class attribute is bytes and in the <native_schema> element, the type attribute is string or bytes and the length attribute is missing.
      • An invalid value is specified for the message_class attribute of the access specification.
      • If an attribute occurs more than once in the <native_schema> element.
      • The connectionDocument parameter refers to an file that does not exist.
      • The connectionDocument parameter is not specified and the connections.xml file is not present in the default location.
      • The jmsDestinationOutAttributeName parameter is specified but the attribute is not found in output schema or the type of attribute is not a rstring type. * The jmsDeliveryModeOutAttributeName parameter is specified but the attribute is not found in output schema or the type of attribute is not a int32 type.
      • The jmsExpirationOutAttributeName parameter is specified but the attribute is not found in output schema or the type of attribute is not a int64 type.
      • The jmsPriorityOutAttributeName parameter is specified but the attribute is not found in output schema or the type of attribute is not a int32 type.
      • The jmsMessageIDOutAttributeName parameter is specified but the attribute is not found in output schema or the type of attribute is not a rstring type.
      • The jmsTimestampOutAttributeName parameter is specified but the attribute is not found in output schema or the type of attribute is not a int64 type.
      • The jmsCorrelationIDOutAttributeName parameter is specified but the attribute is not found in output schema or the type of attribute is not a rstring type.
      • The jmsReplyToOutAttributeName parameter is specified but the attribute is not found in output schema or the type of attribute is not a rstring type.
      • The jmsTypeOutAttributeName parameter is specified but the attribute is not found in output schema or the type of attribute is not a rstring type.
      • The jmsRedeliveredOutAttributeName parameter is specified but the attribute is not found in output schema or the type of attribute is not a boolean type.
      • The jmsHeaderProperties parameter is specified but the contained configuration is erroneous.
      • The jmsHeaderPropertiesOutAttributeName parameter is specified but the attribute is not found in output schema or the type of attribute is not map<ustring,ustring>. * Run time errors that cause a message to be dropped and an error message to be logged.
    • The JMSSource operator throws an exception and discards the message in the following cases. The trace and log information for these exceptions is logged in the console logs and also output to the optional output port if the application is configured to use the optional port.
      • The JMSSource operator reads a message that does not match the message class in the &lt;native_schema&gt; element.
      • When a negative length (-2 or -4) is specified in the native schema for bytes and string data types and the message class is bytes, the operator expects the JMS message to start with a 2 or 4-byte length field. If there are insufficient bytes remaining in the JMS message, the operator discards the entire message and logs a run time error.
      • When a non-negative length is specified in the native schema for bytes and string, the operator attempts to read exactly that number of bytes from the BytesMessage. If there are insufficient bytes remaining in the JMS message, the operator discards the entire message and logs a run time error.
      • The reconnectionBound parameter is specified, but the reconnectionPolicy parameter is set to a value other than BoundedRetry.
  • Compile time errors.
    • The JMSSource operator throws a compile time error in the following cases. The trace and log information for these exceptions is logged in the console logs and also output to the optional output port if the application is configured to use the optional port.
      • The mandatory parameters, connection and access are not specified.
      • The period parameter is specified but the reconnectionPolicy parameter is not specified.
      • The reconnectionBound parameter is specified, but the reconnectionPolicy parameter is not specified.
      • The environment variables STREAMS_MESSAGING_WMQ_HOME and STREAMS_MESSAGING_AMQ_HOME are not set to the locations where the WMQ and AMQ libraries are installed.
Examples

Summary

Ports
This operator has 0 input ports and 2 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 32 parameters.

Required: access, connection

Optional: appConfigName, classLibs, codepage, connectionDocument, initDelay, jmsCorrelationIDOutAttributeName, jmsDeliveryModeOutAttributeName, jmsDestinationOutAttributeName, jmsExpirationOutAttributeName, jmsHeaderProperties, jmsHeaderPropertiesOutAttributeName, jmsMessageIDOutAttributeName, jmsPriorityOutAttributeName, jmsRedeliveredOutAttributeName, jmsReplyToOutAttributeName, jmsTimestampOutAttributeName, jmsTypeOutAttributeName, keyStore, keyStorePassword, messageSelector, passwordPropName, period, reconnectionBound, reconnectionPolicy, sslConnection, sslDebug, triggerCount, trustStore, trustStorePassword, userPropName

Metrics
This operator reports 3 metrics.

Properties

Implementation
Java

Output Ports

Assignments
Java operators do not support output assignments.
Ports (0)

The JMSSource operator has one mandatory output and one optional output port. If only the mandatory output port is specified, the operator submits a tuple for each message that is successfully read from the messaging provider. The mandatory output port is mutating and its punctuation mode is Free.

Properties

Ports (1)

The JMSSource operator has one optional output port, which submits tuples when an error occurs.

If both optional and mandatory output ports are specified, the operator submits a tuple to the mandatory port for each message that is read successfully and a tuple to the optional port if an error occurs when reading a message. The tuple submitted to the optional port contains an error message for each message that could not be read successfully. The optional output port has a single attribute of type rstring that contains this error message. The optional output port is mutating and its punctuation mode is Free.

Properties

Parameters

This operator supports 32 parameters.

Required: access, connection

Optional: appConfigName, classLibs, codepage, connectionDocument, initDelay, jmsCorrelationIDOutAttributeName, jmsDeliveryModeOutAttributeName, jmsDestinationOutAttributeName, jmsExpirationOutAttributeName, jmsHeaderProperties, jmsHeaderPropertiesOutAttributeName, jmsMessageIDOutAttributeName, jmsPriorityOutAttributeName, jmsRedeliveredOutAttributeName, jmsReplyToOutAttributeName, jmsTimestampOutAttributeName, jmsTypeOutAttributeName, keyStore, keyStorePassword, messageSelector, passwordPropName, period, reconnectionBound, reconnectionPolicy, sslConnection, sslDebug, triggerCount, trustStore, trustStorePassword, userPropName

access

This mandatory parameter identifies the access specification name.

Properties
appConfigName

This parameter specifies the name of application configuration that stores client credential information, the credential specified via application configuration overrides the one specified in connections file.

Properties
classLibs

Allows the user to specify paths to JAR files that should be loaded into the operators classpath. The values of this parameter may point to a specific JAR file, or to a directory. If pointing to a directory, the operator will load all files ending in .jar onto the classpath. If the parameter is set, only its values are used to identify class libraries. If the parameter is not set, the operator falls back to the old implementation referring to environment variables STREAMS_MESSAGING_WMQ_HOME and STREAMS_MESSAGING_AMQ_HOME.

Properties
codepage

This optional parameter specifies the code page of the target system that is used to convert ustring for a Bytes message type. If this parameter is specified, it must have exactly one value, which is a String constant. If the parameter is not specified, the operator uses the default value of UTF8.

Properties
connection

This mandatory parameter identifies the name of the connection specification that contains an JMS element.

Properties
connectionDocument

This optional parameter specifies the path name of the file that contains the connection and access specifications, which are identified by the connection and access parameters. If this parameter is not specified, the operator uses the file that is in the default location ./etc/connections.xml.

Properties
initDelay

Delay in seconds before the operator starts producing tuples.

Properties
jmsCorrelationIDOutAttributeName

Output attribute on output data stream to assign JMSCorrelationID to, the specified attribute in output stream must be of type rstring.

Properties
jmsDeliveryModeOutAttributeName

Output attribute on output data stream to assign JMSDeliveryMode to, the specified attribute in output stream must be of type int32.

Properties
jmsDestinationOutAttributeName

Output attribute on output data stream to assign JMSDestination to, the specified attribute in output stream must be of type rstring.

Properties
jmsExpirationOutAttributeName

Output attribute on output data stream to assign JMSExpiration to, the specified attribute in output stream must be of type int64.

Properties
jmsHeaderProperties

Specifies the mapping between JMS Header property values andStreams tuple attributes. The format of the mapping is:

"propertyName1/streamsAttributeName1/typeSpecifier1","propertyName2/streamsAttributeName2/typeSpecifier2","..." ;

Leading and trailing spaces are trimmed from property and

attribute names. The properties can be of the following types:

Type specifier

Java / Property type

SPL / Attribute type

bool

boolean

boolean

byte

byte

int8

short

short

int16

int

int

int32

long

long

int64

float

float

float32

double

double

float64

string

String

rstring

object

Object

ustring, uint8, uint16, uint32, uint64

If property or attribute type do not match the specified type, an error is logged during operator initialization.

Properties
jmsHeaderPropertiesOutAttributeName

Output attribute on output data stream to assign to the map containing all received properties. The specified attribute in output stream must be of type map<ustring,ustring>.

Properties
jmsMessageIDOutAttributeName

Output attribute on output data stream to assign JMSMessageID to, the specified attribute in output stream must be of type rstring.

Properties
jmsPriorityOutAttributeName

Output attribute on output data stream to assign JMSPriority to, the specified attribute in output stream must be of type int32.

Properties
jmsRedeliveredOutAttributeName

Output attribute on output data stream to assign JMSRedelivered to, the specified attribute in output stream must be of type boolean.

Properties
jmsReplyToOutAttributeName

Output attribute on output data stream to assign JMSReplyTo to, the specified attribute in output stream must be of type rstring.

Properties
jmsTimestampOutAttributeName

Output attribute on output data stream to assign JMSTimestamp to, the specified attribute in output stream must be of type int64.

Properties
jmsTypeOutAttributeName

Output attribute on output data stream to assign JMSType to, the specified attribute in output stream must be of type rstring.

Properties
keyStore

This parameter specifies the path to the keyStore. If a relative path is specified, the path is relative to the application directory. The sslConnection parameter must be set to true for this parameter to have any effect.

Properties
keyStorePassword

This parameter specifies the password for the keyStore given by the keyStore parameter. The sslConnection parameter must be set to true for this parameter to have any effect.

Properties
messageSelector

This optional parameter is used as JMS Message Selector.

Properties
passwordPropName

This parameter specifies the property name of password in the application configuration. If the appConfigName parameter is specified and the passwordPropName parameter is not set, a compile time error occurs.

Properties
period

This optional parameter specifies the time period in seconds the operator waits before it tries to reconnect. You can use this parameter only when the reconnectionPolicy parameter is specified, otherwise a compile time error occurs. The default value for the period parameter is 60.

Properties
reconnectionBound

This optional parameter specifies the number of successive connections that are attempted for an operator. You can use this parameter only when the reconnectionPolicy parameter is specified and set to BoundedRetry, otherwise a run time error occurs. If the reconnectionBound parameter is specified and the reconnectionPolicy parameter is not set, a compile time error occurs. The default value for the reconnectionBound parameter is 5.

Properties
reconnectionPolicy

This is an optional parameter that specifies the reconnection policy. The valid values are NoRetry, InfiniteRetry, and BoundedRetry. If the parameter is not specified, the reconnection policy is set to BoundedRetry with a reconnectionBound of 5 and a period of 60 seconds.

Properties
sslConnection

This parameter specifies whether the operator should attempt to connect using SSL. If this parameter is specified, then the keyStore, keyStorePassword and trustStore parameters must also be specified. The default value is false.

Properties
sslDebug

If SSL/TLS protocol debugging is enabled, all protocol data and information is logged to the console. Use this to debug TLS connection problems. The default is 'false'.

Properties
triggerCount

This optional parameter specifies how many messages are submitted before the JMSSource operator starts to drain the pipeline and establish a consistent state. This parameter must be greater than zero and must be set if the JMSSource operator is the start operator of an operatorDriven consistent region.

Properties
trustStore

This parameter specifies the path to the trustStore. If a relative path is specified, the path is relative to the application directory. The sslConnection parameter must be set to true for this parameter to have any effect.

Properties
trustStorePassword

This parameter specifies the password for the trustStore given by the trustStore parameter. The sslConnection parameter must be set to true for this parameter to have any effect.

Properties
userPropName

This parameter specifies the property name of user name in the application configuration. If the appConfigName parameter is specified and the userPropName parameter is not set, a compile time error occurs.

Properties

Metrics

nMessagesDropped - Counter

The number of messages that are dropped in the application.

nMessagesRead - Counter

The number of messages that are read successfully from a queue or topic.

nReconnectionAttempts - Counter

The number of reconnection attempts that are made before a successful connection occurs.

Libraries

Operator class library
Library Path: ../../impl/lib/com.ibm.streamsx.jms.jar, ../../impl/lib/com.ibm.streamsx.jms.jar, ../../opt/downloaded/*