public class MqttStreams
extends java.lang.Object
TStream<Message> tuples to MQTT topics, and
 subscribing to MQTT topics and creating TStream<Message> streams.
 
 A connector is for a specific MQTT Broker as specified in
 the configuration. Any number of publish() and subscribe()
 connections may be created from a single MqttStreams connector.
 
Sample use:
 Topology t = new Topology("An MQTT application");
 // optionally, define submission properties for configuration information
 Supplier<T> serverID = t.createSubmissionParameter("mqtt.serverID", "tcp://localhost:1883");
 Supplier<T> userID = t.createSubmissionParameter("mqtt.userID", System.getProperty("user.name"));
 Supplier<T> password = t.createSubmissionParameter("mqtt.password", String.class);
 Supplier<T> pubTopic = t.createSubmissionParameter("mqtt.pubTopic", String.class);
 Supplier<T> subTopic = t.createSubmissionParameter("mqtt.subTopic", String.class);
 
 // create the connector's configuration property map
 Map<String,Object> config = new HashMap<>();
 config.put("serverID", serverID);
 config.put("userID", userID);
 config.put("password", password);
 
 // create the connector
 MqttStreams mqtt = new MqttStreams(t, config);
 
 // publish to the submission parameter "pubTopic"
 TStream<Message> msgsToPublish = ...
 mqtt.publish(msgsToPublish, pubTopic);
 
 // publish to a compile time topic
 // with Java8 Lambda expression...
 mqtt.publish(msgsToPublish, ()->"anotherTopic");
 // without Java8...
 mqtt.publish(msgsToPublish, new Value("anotherTopic"));
 
 // subscribe to the submission parameter "subTopic"
 TStream<Message> rcvdMsgs = mqtt.subscribe(subTopic);
 rcvdMsgs.print();
 
 // subscribe to a compile time topic
 // with Java8 Lambda expression...
 TStream<Message> rcvdMsgs2 = mqtt.subscribe(()->"anotherTopic");
 // without Java8...
 TStream<Message> rcvdMsgs2 = mqtt.subscribe(new Value("anotherTopic"));
 
 Configuration properties apply to publish and
 subscribe unless stated otherwise.
 
 All properties may be specified as submission parameters unless
 stated otherwise.  
 See Topology.createSubmissionParameter(String, Class).
 
| Property | Description | 
|---|---|
| serverURI | Required String. URI to the MQTT server, either tcp://<hostid>[:<port>]orssl://<hostid>[:<port>]. 
      The port defaults to 1883 for "tcp:" and 8883 for "ssl:" URIs. | 
| clientID | Optional String. A unique identifier for a connection
      to the MQTT server. 
      The MQTT broker only allows a single
      connection for a particular clientID.
      By default a unique client ID is automatically
      generated for each use ofpublish()andsubscribe().
      The specified clientID is used for the first
      usepublish()orsubscribe()use and
      suffix is added for each subsequent uses. | 
| keepAliveInterval | Optional Integer. Automatically generate a MQTT ping message to the server if a message or ping hasn't been sent or received in the last keelAliveInterval seconds. Enables the client to detect if the server is no longer available without having to wait for the TCP/IP timeout. A value of 0 disables keepalive processing. The default is 60. | 
| commandTimeoutMsec | Optional Long. The maximum time in milliseconds to wait for a MQTT connect or publish action to complete. A value of 0 causes the client to wait indefinitely. The default is 0. | 
| reconnectDelayMsec | Optional Long. The time in milliseconds before attempting to reconnect to the server following a connection failure. The default is 60000. | 
| userID | Optional String. The identifier to use when authenticating with a server configured to require that form of authentication. | 
| password | Optional String. The identifier to use when authenticating with server configured to require that form of authentication. | 
| trustStore | Optional String. The pathname to a file containing the public certificate of trusted MQTT servers. If a relative path is specified, the path is relative to the application directory. Required when connecting to a MQTT server with an ssl:/... serverURI. | 
| trustStorePassword | Required String when trustStoreis used.
      The password needed to access the encrypted trustStore file. | 
| keyStore | Optional String. The pathname to a file containing the MQTT client's public private key certificates. If a relative path is specified, the path is relative to the application directory. Required when an MQTT server is configured to use SSL client authentication. | 
| keyStorePassword | Required String when keyStoreis used.
      The password needed to access the encrypted keyStore file. | 
| receiveBufferSize | [subscribe] Optional Integer. The size, in number of messages, of the subscriber's internal receive buffer. Received messages are added to the buffer prior to being converted to a stream tuple. The receiver blocks when the buffer is full. The default is 50. | 
| retain | [publish] Optional Boolean. Indicates if messages should be retained on the MQTT server. Default is false. | 
| defaultQOS | Optional Integer. The default MQTT quality of service used for message handling. The default is 0. | 
| Constructor and Description | 
|---|
| MqttStreams(TopologyElement te,
           java.util.Map<java.lang.String,java.lang.Object> config)Create a MQTT connector for publishing tuples to topics
 subscribing to topics. | 
| Modifier and Type | Method and Description | 
|---|---|
| java.util.Map<java.lang.String,java.lang.Object> | getConfig()Get the connector's configuration information. | 
| TSink | publish(TStream<? extends Message> stream,
       Supplier<java.lang.String> topic)Publish  streamtuples to one or more MQTT topics. | 
| TSink | publish(TStream<? extends Message> stream)Publish  streamtuples to one or more MQTT topics. | 
| TStream<Message> | subscribe(Supplier<java.lang.String> topic)Subscribe to a MQTT topic and create a stream of messages. | 
public MqttStreams(TopologyElement te, java.util.Map<java.lang.String,java.lang.Object> config)
te - TopologyElementconfig - configuration property information.public java.util.Map<java.lang.String,java.lang.Object> getConfig()
public TSink publish(TStream<? extends Message> stream)
stream tuples to one or more MQTT topics.
 
 Each stream tuple is sent to the topic specified by its
 Message.getTopic() value.
 The Message.getKey() field is ignored.
 
 The message is handled with the quality of service indicated
 by configuration property defaultQOS.
 
 Same as publish(stream, null).
stream - the stream to publishpublic TSink publish(TStream<? extends Message> stream, Supplier<java.lang.String> topic)
stream tuples to one or more MQTT topics.
 
 If topic is null, each tuple is published to the topic
 specified by its Message.getTopic().
 Otherwise, all tuples are published to topic.
 
 The messages added to MQTT include a topic and message.
 The Message.getKey() field is ignored.
 
 The message is handled with the quality of service
 indicated by configuration property defaultQOS.
stream - the stream to publishtopic - topic to publish to.  May be a submission parameter. May be null.Value, 
Topology.createSubmissionParameter(String, Class)public TStream<Message> subscribe(Supplier<java.lang.String> topic)
 The quality of service for handling each topic is
 the value of configuration property defaultQOS.
 
 
 N.B., A topology that includes this will not support
 StreamsContext.Type.EMBEDDED.
 
 N.B. due to com.ibm.streamsx.messaging 
 issue#124,
 terminating a StreamsContext.Type.STANDALONE topology may result
 in ERROR messages and a stranded standalone process.
topic - the MQTT topic.  May be a submission parameter.Message tuples have a non-null topic.
      The tuple's key will be null.java.lang.IllegalArgumentException - if topic is null.Value, 
Topology.createSubmissionParameter(String, Class)