MQTT support

Toolkits > com.ibm.streamsx.topology 1.5.13.__dev__ > com.ibm.streamsx.topology.python > Python Application API > MQTT support

Publishing and subscribing to an MQTT broker.

A simple connector to a MQTT broker for publishing string tuples to MQTT topics, and subscribing to MQTT topics and creating streams.

  • MqttStreams(self, config) MQTT Connector

A connector is for a specific MQTT Broker as specified in the configuration object config. Any number of publish()and subscribe() connections may be created from a single MqttStreams connector.

  • publish(self, pub_stream, topic) function on MQTT Conector

    Publish this stream (pub_stream) on a topic to an MQTT server for applications to subscribe to. A Streams application may publish a stream to an MQTT server to allow other applications to subscribe to it. A subscriber matches a publisher if the topics and server URI match. The schema of the stream to publish must be tuple<rstring message>

    param: pub_stream: Stream to publish to MQTT server. param topic: Topic to publish this stream to.

    return: None

  • subscribe(self, topic) function on MQTT Conector

    Subscribe to a topic published to a MQTT server. A Streams application may subscribe to a stream published to a MQTT server. A subscriber matches a publisher if the topic matches. The schema of the stream returned from the MQTT server is tuple<rstring message>

    param topic: Topic to subscribe to. return: A Stream whose tuples have been published to the topic on a MQTT server.

Sample use:

topo = Topology("An MQTT application")
// optionally, define configuration information
config = {}
config['clientID'] = "test_MQTTpublishClient"
config['defaultQOS'] = 1  (needs to be int vs long)
config['qos'] = int("1") #(needs to be int vs long)
config['keepAliveInterval'] = int(20) (needs to be int vs long)
config['commandTimeoutMsec'] = 30000 (needs to be int vs long)
config['reconnectDelayMsec'] = 5000 (needs to be int vs long)
config['receiveBufferSize'] = 10 (needs to be int vs long)
config['reconnectionBound'] = int(20)
config['retain'] = True 
config['password'] = "mypw"
config['trustStore'] = "/tmp/no-such-trustStore"
config['trustStorePassword'] = "trustpw"
config['keyStore'] = "/tmp/no-such-keyStore"
config['keyStorePassword'] = "keypw"

// create the connector's configuration property map
config['serverURI'] = "tcp://localhost:1883"
config['userID'] = "user1id"
config[' password'] = "user1passwrd"

// create the connector
mqstream = MqttStreams(topo,config)

// publish a python source stream to the topic "python.topic1"
topic = "python.topic1"
src = topo.source(test_functions.mqtt_publish)
mqstream.publish(src, topic) 
streamsx.topology.context.submit("BUNDLE", topo.graph)

 # // subscribe to the topic "python.topic1"
topic = ["python.topic1", ]
mqs = mqstream.subscribe(topic) 
mqs.print()