com.ibm.streamsx.topology.messaging.mqtt

Class MqttStreams

  • java.lang.Object
    • com.ibm.streamsx.topology.messaging.mqtt.MqttStreams


  • public class MqttStreams
    extends java.lang.Object
    A simple connector to a MQTT broker for publishing TStream<Message> tuples to MQTT topics, and subscribing to MQTT topics and creating TStream<Message> streams.

    A connector is for a specific MQTT Broker as specified in the configuration. Any number of publish() and subscribe() connections may be created from a single MqttStreams connector.

    Sample use:

    
     Topology t = new Topology("An MQTT application");
     // optionally, define submission properties for configuration information
     Supplier<T> serverID = t.createSubmissionParameter("mqtt.serverID", "tcp://localhost:1883");
     Supplier<T> userID = t.createSubmissionParameter("mqtt.userID", System.getProperty("user.name"));
     Supplier<T> password = t.createSubmissionParameter("mqtt.password", String.class);
     Supplier<T> pubTopic = t.createSubmissionParameter("mqtt.pubTopic", String.class);
     Supplier<T> subTopic = t.createSubmissionParameter("mqtt.subTopic", String.class);
     
     // create the connector's configuration property map
     Map<String,Object> config = new HashMap<>();
     config.put("serverID", serverID);
     config.put("userID", userID);
     config.put("password", password);
     
     // create the connector
     MqttStreams mqtt = new MqttStreams(t, config);
     
     // publish to the submission parameter "pubTopic"
     TStream<Message> msgsToPublish = ...
     mqtt.publish(msgsToPublish, pubTopic);
     
     // publish to a compile time topic
     // with Java8 Lambda expression...
     mqtt.publish(msgsToPublish, ()->"anotherTopic");
     // without Java8...
     mqtt.publish(msgsToPublish, new Value("anotherTopic"));
     
     // subscribe to the submission parameter "subTopic"
     TStream<Message> rcvdMsgs = mqtt.subscribe(subTopic);
     rcvdMsgs.print();
     
     // subscribe to a compile time topic
     // with Java8 Lambda expression...
     TStream<Message> rcvdMsgs2 = mqtt.subscribe(()->"anotherTopic");
     // without Java8...
     TStream<Message> rcvdMsgs2 = mqtt.subscribe(new Value("anotherTopic"));
     

    Configuration properties apply to publish and subscribe unless stated otherwise.
    All properties may be specified as submission parameters unless stated otherwise. See Topology.createSubmissionParameter(String, Class).

    PropertyDescription
    serverURI Required String. URI to the MQTT server, either tcp://<hostid>[:<port>] or ssl://<hostid>[:<port>]. The port defaults to 1883 for "tcp:" and 8883 for "ssl:" URIs.
    clientID Optional String. A unique identifier for a connection to the MQTT server. The MQTT broker only allows a single connection for a particular clientID. By default a unique client ID is automatically generated for each use of publish() and subscribe(). The specified clientID is used for the first use publish() or subscribe() use and suffix is added for each subsequent uses.
    keepAliveInterval Optional Integer. Automatically generate a MQTT ping message to the server if a message or ping hasn't been sent or received in the last keelAliveInterval seconds. Enables the client to detect if the server is no longer available without having to wait for the TCP/IP timeout. A value of 0 disables keepalive processing. The default is 60.
    commandTimeoutMsec Optional Long. The maximum time in milliseconds to wait for a MQTT connect or publish action to complete. A value of 0 causes the client to wait indefinitely. The default is 0.
    reconnectDelayMsec Optional Long. The time in milliseconds before attempting to reconnect to the server following a connection failure. The default is 60000.
    userID Optional String. The identifier to use when authenticating with a server configured to require that form of authentication.
    password Optional String. The identifier to use when authenticating with server configured to require that form of authentication.
    trustStore Optional String. The pathname to a file containing the public certificate of trusted MQTT servers. If a relative path is specified, the path is relative to the application directory. Required when connecting to a MQTT server with an ssl:/... serverURI.
    trustStorePassword Required String when trustStore is used. The password needed to access the encrypted trustStore file.
    keyStore Optional String. The pathname to a file containing the MQTT client's public private key certificates. If a relative path is specified, the path is relative to the application directory. Required when an MQTT server is configured to use SSL client authentication.
    keyStorePassword Required String when keyStore is used. The password needed to access the encrypted keyStore file.
    receiveBufferSize [subscribe] Optional Integer. The size, in number of messages, of the subscriber's internal receive buffer. Received messages are added to the buffer prior to being converted to a stream tuple. The receiver blocks when the buffer is full. The default is 50.
    retain [publish] Optional Boolean. Indicates if messages should be retained on the MQTT server. Default is false.
    defaultQOS Optional Integer. The default MQTT quality of service used for message handling. The default is 0.
    See Also:
    http://mqtt.org, com.ibm.streamsx.messaging
    • Constructor Summary

      Constructors 
      Constructor and Description
      MqttStreams(TopologyElement te, java.util.Map<java.lang.String,java.lang.Object> config)
      Create a MQTT connector for publishing tuples to topics subscribing to topics.
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method and Description
      java.util.Map<java.lang.String,java.lang.Object> getConfig()
      Get the connector's configuration information.
      TSink publish(TStream<? extends Message> stream, Supplier<java.lang.String> topic)
      Publish stream tuples to one or more MQTT topics.
      TSink publish(TStream<? extends Message> stream)
      Publish stream tuples to one or more MQTT topics.
      TStream<Message> subscribe(Supplier<java.lang.String> topic)
      Subscribe to a MQTT topic and create a stream of messages.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • MqttStreams

        public MqttStreams(TopologyElement te,
                           java.util.Map<java.lang.String,java.lang.Object> config)
        Create a MQTT connector for publishing tuples to topics subscribing to topics.

        Parameters:
        te - TopologyElement
        config - configuration property information.
    • Method Detail

      • getConfig

        public java.util.Map<java.lang.String,java.lang.Object> getConfig()
        Get the connector's configuration information.
        Returns:
        the unmodifiable configuration
      • publish

        public TSink publish(TStream<? extends Message> stream)
        Publish stream tuples to one or more MQTT topics.

        Each stream tuple is sent to the topic specified by its Message.getTopic() value. The Message.getKey() field is ignored.

        The message is handled with the quality of service indicated by configuration property defaultQOS.

        Same as publish(stream, null).

        Parameters:
        stream - the stream to publish
        Returns:
        the sink element
      • publish

        public TSink publish(TStream<? extends Message> stream,
                             Supplier<java.lang.String> topic)
        Publish stream tuples to one or more MQTT topics.

        If topic is null, each tuple is published to the topic specified by its Message.getTopic(). Otherwise, all tuples are published to topic.

        The messages added to MQTT include a topic and message. The Message.getKey() field is ignored.

        The message is handled with the quality of service indicated by configuration property defaultQOS.

        Parameters:
        stream - the stream to publish
        topic - topic to publish to. May be a submission parameter. May be null.
        Returns:
        the sink element
        See Also:
        Value, Topology.createSubmissionParameter(String, Class)
      • subscribe

        public TStream<Message> subscribe(Supplier<java.lang.String> topic)
        Subscribe to a MQTT topic and create a stream of messages.

        The quality of service for handling each topic is the value of configuration property defaultQOS.

        N.B., A topology that includes this will not support StreamsContext.Type.EMBEDDED.

        N.B. due to com.ibm.streamsx.messaging issue#124, terminating a StreamsContext.Type.STANDALONE topology may result in ERROR messages and a stranded standalone process.

        Parameters:
        topic - the MQTT topic. May be a submission parameter.
        Returns:
        TStream<Message> The generated Message tuples have a non-null topic. The tuple's key will be null.
        Throws:
        java.lang.IllegalArgumentException - if topic is null.
        See Also:
        Value, Topology.createSubmissionParameter(String, Class)
streamsx.topology 2.1 @ IBMStreams GitHub