public class MqttStreams
extends java.lang.Object
TStream<Message>
tuples to MQTT topics, and
subscribing to MQTT topics and creating TStream<Message>
streams.
A connector is for a specific MQTT Broker as specified in
the configuration. Any number of publish()
and subscribe()
connections may be created from a single MqttStreams connector.
Sample use:
Topology t = new Topology("An MQTT application");
// optionally, define submission properties for configuration information
Supplier<T> serverID = t.createSubmissionParameter("mqtt.serverID", "tcp://localhost:1883");
Supplier<T> userID = t.createSubmissionParameter("mqtt.userID", System.getProperty("user.name"));
Supplier<T> password = t.createSubmissionParameter("mqtt.password", String.class);
Supplier<T> pubTopic = t.createSubmissionParameter("mqtt.pubTopic", String.class);
Supplier<T> subTopic = t.createSubmissionParameter("mqtt.subTopic", String.class);
// create the connector's configuration property map
Map<String,Object> config = new HashMap<>();
config.put("serverID", serverID);
config.put("userID", userID);
config.put("password", password);
// create the connector
MqttStreams mqtt = new MqttStreams(t, config);
// publish to the submission parameter "pubTopic"
TStream<Message> msgsToPublish = ...
mqtt.publish(msgsToPublish, pubTopic);
// publish to a compile time topic
// with Java8 Lambda expression...
mqtt.publish(msgsToPublish, ()->"anotherTopic");
// without Java8...
mqtt.publish(msgsToPublish, new Value("anotherTopic"));
// subscribe to the submission parameter "subTopic"
TStream<Message> rcvdMsgs = mqtt.subscribe(subTopic);
rcvdMsgs.print();
// subscribe to a compile time topic
// with Java8 Lambda expression...
TStream<Message> rcvdMsgs2 = mqtt.subscribe(()->"anotherTopic");
// without Java8...
TStream<Message> rcvdMsgs2 = mqtt.subscribe(new Value("anotherTopic"));
Configuration properties apply to publish
and
subscribe
unless stated otherwise.
All properties may be specified as submission parameters unless
stated otherwise.
See Topology.createSubmissionParameter(String, Class)
.
Property | Description |
---|---|
serverURI | Required String. URI to the MQTT server, either
tcp://<hostid>[:<port>]
or ssl://<hostid>[:<port>] .
The port defaults to 1883 for "tcp:" and 8883 for "ssl:" URIs.
|
clientID | Optional String. A unique identifier for a connection
to the MQTT server.
The MQTT broker only allows a single
connection for a particular clientID .
By default a unique client ID is automatically
generated for each use of publish() and subscribe() .
The specified clientID is used for the first
use publish() or subscribe() use and
suffix is added for each subsequent uses.
|
keepAliveInterval | Optional Integer. Automatically generate a MQTT ping message to the server if a message or ping hasn't been sent or received in the last keelAliveInterval seconds. Enables the client to detect if the server is no longer available without having to wait for the TCP/IP timeout. A value of 0 disables keepalive processing. The default is 60. |
commandTimeoutMsec | Optional Long. The maximum time in milliseconds to wait for a MQTT connect or publish action to complete. A value of 0 causes the client to wait indefinitely. The default is 0. |
reconnectDelayMsec | Optional Long. The time in milliseconds before attempting to reconnect to the server following a connection failure. The default is 60000. |
userID | Optional String. The identifier to use when authenticating with a server configured to require that form of authentication. |
password | Optional String. The identifier to use when authenticating with server configured to require that form of authentication. |
trustStore | Optional String. The pathname to a file containing the public certificate of trusted MQTT servers. If a relative path is specified, the path is relative to the application directory. Required when connecting to a MQTT server with an ssl:/... serverURI. |
trustStorePassword | Required String when trustStore is used.
The password needed to access the encrypted trustStore file.
|
keyStore | Optional String. The pathname to a file containing the MQTT client's public private key certificates. If a relative path is specified, the path is relative to the application directory. Required when an MQTT server is configured to use SSL client authentication. |
keyStorePassword | Required String when keyStore is used.
The password needed to access the encrypted keyStore file.
|
receiveBufferSize | [subscribe] Optional Integer. The size, in number of messages, of the subscriber's internal receive buffer. Received messages are added to the buffer prior to being converted to a stream tuple. The receiver blocks when the buffer is full. The default is 50. |
retain | [publish] Optional Boolean. Indicates if messages should be retained on the MQTT server. Default is false. |
defaultQOS | Optional Integer. The default MQTT quality of service used for message handling. The default is 0. |
Constructor and Description |
---|
MqttStreams(TopologyElement te,
java.util.Map<java.lang.String,java.lang.Object> config)
Create a MQTT connector for publishing tuples to topics
subscribing to topics.
|
Modifier and Type | Method and Description |
---|---|
java.util.Map<java.lang.String,java.lang.Object> |
getConfig()
Get the connector's configuration information.
|
TSink |
publish(TStream<? extends Message> stream,
Supplier<java.lang.String> topic)
Publish
stream tuples to one or more MQTT topics. |
TSink |
publish(TStream<? extends Message> stream)
Publish
stream tuples to one or more MQTT topics. |
TStream<Message> |
subscribe(Supplier<java.lang.String> topic)
Subscribe to a MQTT topic and create a stream of messages.
|
public MqttStreams(TopologyElement te, java.util.Map<java.lang.String,java.lang.Object> config)
te
- TopologyElement
config
- configuration property information.public java.util.Map<java.lang.String,java.lang.Object> getConfig()
public TSink publish(TStream<? extends Message> stream)
stream
tuples to one or more MQTT topics.
Each stream
tuple is sent to the topic specified by its
Message.getTopic()
value.
The Message.getKey()
field is ignored.
The message is handled with the quality of service indicated
by configuration property defaultQOS
.
Same as publish(stream, null)
.
stream
- the stream to publishpublic TSink publish(TStream<? extends Message> stream, Supplier<java.lang.String> topic)
stream
tuples to one or more MQTT topics.
If topic
is null, each tuple is published to the topic
specified by its Message.getTopic()
.
Otherwise, all tuples are published to topic
.
The messages added to MQTT include a topic and message.
The Message.getKey()
field is ignored.
The message is handled with the quality of service
indicated by configuration property defaultQOS
.
stream
- the stream to publishtopic
- topic to publish to. May be a submission parameter. May be null.Value
,
Topology.createSubmissionParameter(String, Class)
public TStream<Message> subscribe(Supplier<java.lang.String> topic)
The quality of service for handling each topic is
the value of configuration property defaultQOS
.
N.B., A topology that includes this will not support
StreamsContext.Type.EMBEDDED
.
N.B. due to com.ibm.streamsx.messaging
issue#124,
terminating a StreamsContext.Type.STANDALONE
topology may result
in ERROR messages and a stranded standalone process.
topic
- the MQTT topic. May be a submission parameter.Message
tuples have a non-null topic
.
The tuple's key
will be null.java.lang.IllegalArgumentException
- if topic is null.Value
,
Topology.createSubmissionParameter(String, Class)