Toolkits > com.ibm.streamsx.topology 1.5.13.__dev__ > com.ibm.streamsx.topology.python > Python Application API > MQTT support
Publishing and subscribing to an MQTT broker.
A simple connector to a MQTT broker for publishing string tuples to MQTT topics, and subscribing to MQTT topics and creating streams.
A connector is for a specific MQTT Broker as specified in the configuration object config. Any number of publish()and subscribe() connections may be created from a single MqttStreams connector.
Publish this stream (pub_stream) on a topic to an MQTT server for applications to subscribe to. A Streams application may publish a stream to an MQTT server to allow other applications to subscribe to it. A subscriber matches a publisher if the topics and server URI match. The schema of the stream to publish must be tuple<rstring message>
param: pub_stream: Stream to publish to MQTT server. param topic: Topic to publish this stream to.
return: None
subscribe(self, topic) function on MQTT Conector
Subscribe to a topic published to a MQTT server. A Streams application may subscribe to a stream published to a MQTT server. A subscriber matches a publisher if the topic matches. The schema of the stream returned from the MQTT server is tuple<rstring message>
param topic: Topic to subscribe to. return: A Stream whose tuples have been published to the topic on a MQTT server.
topo = Topology("An MQTT application") // optionally, define configuration information config = {} config['clientID'] = "test_MQTTpublishClient" config['defaultQOS'] = 1 (needs to be int vs long) config['qos'] = int("1") #(needs to be int vs long) config['keepAliveInterval'] = int(20) (needs to be int vs long) config['commandTimeoutMsec'] = 30000 (needs to be int vs long) config['reconnectDelayMsec'] = 5000 (needs to be int vs long) config['receiveBufferSize'] = 10 (needs to be int vs long) config['reconnectionBound'] = int(20) config['retain'] = True config['password'] = "mypw" config['trustStore'] = "/tmp/no-such-trustStore" config['trustStorePassword'] = "trustpw" config['keyStore'] = "/tmp/no-such-keyStore" config['keyStorePassword'] = "keypw" // create the connector's configuration property map config['serverURI'] = "tcp://localhost:1883" config['userID'] = "user1id" config[' password'] = "user1passwrd" // create the connector mqstream = MqttStreams(topo,config) // publish a python source stream to the topic "python.topic1" topic = "python.topic1" src = topo.source(test_functions.mqtt_publish) mqstream.publish(src, topic) streamsx.topology.context.submit("BUNDLE", topo.graph) # // subscribe to the topic "python.topic1" topic = ["python.topic1", ] mqs = mqstream.subscribe(topic) mqs.print()