Operator JMSSink

IBMStreams com.ibm.streamsx.jms Toolkit > com.ibm.streamsx.jms 2.0.0 > com.ibm.streamsx.jms > JMSSink

The JMSSink operator creates messages from InfoSphere Streams tuples and writes the messages to a WebSphere MQ or an Apache Active MQ queue or topic.

The incoming tuple from InfoSphere Streams can contain one or more attributes, each of which can be of the following data types: int8, uint8, int16, uint16, int32, uint32, int64, uint64, float32, float64, boolean, blob, rstring, decimal32, decimal64, decimal128, ustring, or timestamp. The input tuple is serialized into a JMS message. For the JMSSink operator, the following message classes are supported: map, stream, bytes, xml, wbe, wbe22, and empty. The type of message is specified as the value of the message_class attribute in the connection specifications document.

SSL Support

The JMSSink operator provides support for SSL via these parameters: sslConnection, keyStore, keyStorePassword and trustStore. When sslConnection is set to true, the keyStore, keyStorePassword and trustStore parameters must be set.

Note: The JMSSink operator configures SSL by setting the JVM system properties via calls to System.property(). Java operators that are fused into the same PE share the same JVM. This implies that any other Java operators fused into the same PE as the JMSSink operator will have these SSL properties set. If this is undesirable, then the JMSSink operator should be placed into it's own PE.

Behavior in a consistent region

The JMSSink operator can be an operator within the reachability graph of a operator-driven consistent region. It cannot be the start of a consistent region.

When this operator is participating in a consistent region, the parameter consistentRegionQueueName must be specified.

JMSSink operator is using transacted session to achieve the purpose of consistent region and parameters reconnectionPolicy, reconnectionBound, period, maxMessageSendRetries and messageSendRetryDelay must not be specified. This is due to the fact that if connection problem occurs, the messages sent since the last successful checkpoint would be lost and it is not necessary to continue to process message even after connection is established again. In this case the operator needs to reset to the last checkpoint and re-send the messages again, thus re-connection policy would not apply when it is participating in a consistent region

Exceptions

The following list describes the common types of exceptions that can occur:
  • Run time errors that halt the operator execution.
    • The JMSSink operator throws an exception and terminates in the following cases. For some exceptions, the trace and log information is logged in the console logs and also output to the optional output port if the application is configured to use the optional port.
      • During the initial connection attempt or during transient reconnection failures, if the reconnectionPolicy is set to NoRetry and the operator does not have a successful connection, or the reconnectionPolicy is set to BoundedRetry and the operator does not have a successful connection after the number of attempts that are specified in the reconnectionBound parameter. Successive data is lost.
      • The queue name is unknown.
      • The queue manager name is unknown.
      • The operator is unable to connect to the host.
      • The operator is unable to connect o the port.
      • There is a mismatch between the data type of one or more attributes in the native schema and the data type of attributes in the input stream.
      • One or more native schema attributes do not have a matching attribute in the input stream schema.
      • The connectionDocument parameter refers to an file that does not exist.
      • The connectionDocument parameter is not specified and the connections.xml file is not present in the default location.
      • An invalid value is specified for the message_class attribute of the access specification.
      • A negative length is specified for a string or blob data types in the native schema for a map, stream, xml, wbe, or wbe22 message class.
      • A negative length other than -2 or -4 is specified for a string/blob data type in the native_schema for a bytes message class.
      • The message_class attribute is empty, but the <native_schema> element is not empty.
      • The jmsHeaderProperties parameter is specified but the contained configuration is erroneous.
  • Run time errors that cause a message to be dropped and an error message to be logged.
    • The JMSink operator throws an exception and discards the message in the following cases. The trace and log information for these exceptions is logged in the console logs and also output to the optional output port if the application is configured to use the optional port.
      • The data being written is longer than the maximum message length specified in the queue. The discarded message is not sent to the WebSphere MQ or Apache Active MQ queue or topic.
      • The reconnectionBound parameter is specified, but the reconnectionPolicy parameter is set to a value other than BoundedRetry.
  • Compile time errors.
    • The JMSink operator throws a compile time error in the following cases. The trace and log information for these exceptions is logged in the console logs and also output to the optional output port if the application is configured to use the optional port.
      • The mandatory parameters, connection and access are not specified.
      • The period parameter is specified, but the reconnectionPolicy parameter is not specified.
      • The reconnectionBound parameter is specified, but the reconnectionPolicy parameter is not specified.
      • The environment variables STREAMS_MESSAGING_WMQ_HOME and STREAMS_MESSAGING_AMQ_HOME are not set to the locations where the WMQ and AMQ libraries are installed.
Examples

Summary

Ports
This operator has 1 input port and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 21 parameters.

Required: access, connection

Optional: appConfigName, classLibs, codepage, connectionDocument, consistentRegionQueueName, jmsHeaderProperties, keyStore, keyStorePassword, maxMessageSendRetries, messageSendRetryDelay, passwordPropName, period, reconnectionBound, reconnectionPolicy, sslConnection, sslDebug, trustStore, trustStorePassword, userPropName

Metrics
This operator reports 3 metrics.

Properties

Implementation
Java

Input Ports

Ports (0)

The JMSSink operator is configurable with a single input port. This input port is a data port and is required. The input port is non-mutating and its punctuation mode is Oblivious.

Properties

Output Ports

Assignments
Java operators do not support output assignments.
Ports (0)

The JMSSink operator is configurable with an optional output port that submits the error message and the tuple(optional) that caused this error. The optional output port is mutating and its punctuation mode is Free. This optional error output port contains an optional first attribute that contains the input tuple that caused the error and a second attribute of type rstring that contains the error message. Only one error message is sent for each failed tuple.

Properties

Parameters

This operator supports 21 parameters.

Required: access, connection

Optional: appConfigName, classLibs, codepage, connectionDocument, consistentRegionQueueName, jmsHeaderProperties, keyStore, keyStorePassword, maxMessageSendRetries, messageSendRetryDelay, passwordPropName, period, reconnectionBound, reconnectionPolicy, sslConnection, sslDebug, trustStore, trustStorePassword, userPropName

access

This mandatory parameter identifies the access specification name.

Properties
appConfigName

This parameter specifies the name of application configuration that stores client credential information, the credential specified via application configuration overrides the one specified in connections file.

Properties
classLibs

Allows the user to specify paths to JAR files that should be loaded into the operators classpath. The values of this parameter may point to a specific JAR file, or to a directory. If pointing to a directory, the operator will load all files ending in .jar onto the classpath. If the parameter is set, only its values are used to identify class libraries. If the parameter is not set, the operator falls back to the old implementation referring to environment variables STREAMS_MESSAGING_WMQ_HOME and STREAMS_MESSAGING_AMQ_HOME.

Properties
codepage

This optional parameter specifies the code page of the target system that is used to convert ustring for a Bytes message type. If this parameter is specified, it must have exactly one value, which is a String constant. If the parameter is not specified, the operator uses the default value of UTF8.

Properties
connection

This mandatory parameter identifies the name of the connection specification that contains a JMS element.

Properties
connectionDocument

This optional parameter specifies the path name of the file that contains the connection and access specifications, which are identified by the connection and access parameters. If the parameter is not specified, the operator uses the file that is in the default location ../etc/connections.xml.

Properties
consistentRegionQueueName

This is a required parameter if this operator is participating in a consistent region. This parameter specifies the queue to be used to store consistent region specific information and the operator will perform a JNDI lookup with the queue name specified at initialization state. The queue name specified must also exist on the same messaging server where this operator is establishing the connection.

Properties
jmsHeaderProperties

Specifies the mapping between JMS Header property values andStreams tuple attributes. The format of the mapping is:

"propertyName1/streamsAttributeName1/typeSpecifier1","propertyName2/streamsAttributeName2/typeSpecifier2","..." ;

Leading and trailing spaces are trimmed from property and

attribute names. The properties can be of the following types:

Type specifier

Java / Property type

SPL / Attribute type

bool

boolean

boolean

byte

byte

int8

short

short

int16

int

int

int32

long

long

int64

float

float

float32

double

double

float64

string

String

rstring

object

Object

ustring, uint8, uint16, uint32, uint64

If property or attribute type do not match the specified type, an error is logged during operator initialization.

Properties
keyStore

This parameter specifies the path to the keyStore. If a relative path is specified, the path is relative to the application directory. The sslConnection parameter must be set to true for this parameter to have any effect.

Properties
keyStorePassword

This parameter specifies the password for the keyStore given by the keyStore parameter. The sslConnection parameter must be set to true for this parameter to have any effect.

Properties
maxMessageSendRetries

This optional parameter specifies the number of successive retries that are attempted for a message if a failure occurs when the message is sent. The default value is zero; no retries are attempted.

Properties
messageSendRetryDelay

This optional parameter specifies the time in milliseconds to wait before the next delivery attempt. If the maxMessageSendRetries is specified, you must also specify a value for this parameter.

Properties
passwordPropName

This parameter specifies the property name of password in the application configuration. If the appConfigName parameter is specified and the passwordPropName parameter is not set, a compile time error occurs.

Properties
period

This parameter specifies the time period in seconds the operator waits before it tries to reconnect. It is an optional parameter of type float64. You can use this parameter only when the reconnectionPolicy parameter is specified, otherwise a compile time error occurs. The default value for the period parameter is 60.

Properties
reconnectionBound

This optional parameter of type int32 specifies the number of successive connections that are attempted for an operator. You can use this parameter only when the reconnectionPolicy parameter is specified and set to BoundedRetry, otherwise a run time error occurs. If the reconnectionBound parameter is specified and the reconnectionPolicy parameter is not set, a compile time error occurs. The default value for the reconnectionBound parameter is 5.

Properties
reconnectionPolicy

This is an optional parameter that specifies the reconnection policy. The valid values are NoRetry, InfiniteRetry, and BoundedRetry. If the parameter is not specified, the reconnection policy is set to BoundedRetry with a reconnectionBound of 5 and a period of 60 seconds.

Properties
sslConnection

This parameter specifies whether the operator should attempt to connect using SSL. If this parameter is specified, then the keyStore, keyStorePassword and trustStore parameters must also be specified. The default value is false.

Properties
sslDebug

If SSL/TLS protocol debugging is enabled, all protocol data and information is logged to the console. Use this to debug TLS connection problems. The default is 'false'.

Properties
trustStore

This parameter specifies the path to the trustStore. If a relative path is specified, the path is relative to the application directory. The sslConnection parameter must be set to true for this parameter to have any effect.

Properties
trustStorePassword

This parameter specifies the password for the trustStore given by the trustStore parameter. The sslConnection parameter must be set to true for this parameter to have any effect.

Properties
userPropName

This parameter specifies the property name of user name in the application configuration. If the appConfigName parameter is specified and the userPropName parameter is not set, a compile time error occurs.

Properties

Metrics

nFailedInserts - Counter

The number of failed inserts to the WebSphere MQ or the Apache ActiveMQ. Failed insertions can occur when a message is dropped because of a run time error.

nReconnectionAttempts - Counter

The number of reconnection attempts that are made before a successful connection.

nTruncatedInserts - Counter

The number of tuples that had truncated attributes when they were converted to a message.

Libraries

Operator class library
Library Path: ../../impl/lib/com.ibm.streamsx.jms.jar, ../../impl/lib/com.ibm.streamsx.jms.jar, ../../opt/downloaded/*