Operator control input port

IBMStreams com.ibm.streamsx.messaging Toolkit > com.ibm.streamsx.messaging 5.4.3 > com.ibm.streamsx.messaging.mqtt > Operator control input port

Some of the operators in the Messaging Toolkit provide the capability to specify an optional control input port. This input port allows the configuration of the operator to be changed at run time without restarting or recompiling the application.

The control port is non-mutating and its punctuation mode is Oblivious. The list contains the operators that support the optional control port and describes its schema:
  • MQTTSink operator:
    • Schema for the control port: static ControlPortType = map<rstring, rstring> mqttConfig
      • This type is defined in the com.ibm.streams.messaging.mqtt::MQTTSinkUpdate composite in the MSGTypes.spl file, which is shipped as part of the toolkit.
    • Description:
      • Tuples received on this input port must contain a map attribute with two rstring elements. The rstring elements contain the name of the configuration option and its value. The configuration options contain the connection information that the operator uses to connect to an MQTT server.
      • The MQTTSink operator reads the map <rstring, rstring> attribute and uses the values to update the connection information at run time. If an invalid value is specified, the operator ignores the value and continues to use the earlier connection information.
      • For more information about the connection information that you can update at run time, see the list of configuration options for the input control port.
  • MQTTSource operator:
    • Schema for the control port: static ControlPortType = list<TopicsUpdateDesc> topic, map<rstring, rstring> mqttConfig
      • This type is defined in the com.ibm.streams.messaging.mqtt::MQTTSourceUpdate composite in the MSGTypes.spl file, which is shipped as part of the toolkit.
      • TopicsUpdateDesc is: type TopicsUpdateDesc = TopicsUpdateAction action, list<rstring> topics, uint32 qos;
      • action is of type TopicsUpdateAction and defines the following operations that you can perform on the topics:
        • ADD. Add the provided list of topics to the list of subscribed topics.
        • REMOVE. Remove the provided list of topics from the list of subscribed topics.
        • REPLACE. Replace the current list of subscribed topics with the list of topics provided.
        • UPDATE. Change the QoS values for the specified list of topics with the QoS values that are provided.
      • topics is the list of topics that you want to update
      • qos is the quality of service that you want to assign to the list of topics
    • Description:
      • Tuples received on this input port must contain a map attribute with two rstring elements and an attribute of type list<TopicsUpdateDesc>.
      • The map attribute contains connection information that you want to update at run time. The rstring elements contains the name of the configuration option and its value. If an invalid value is specified, the operator ignores the value and continues to use the earlier connection information. If you do not want to update the connection information, you can specify an empty map attribute.
      • For example, map<rstring, rstring> emptyCon = { "" : "" }; For more information about the connection information that you can update at run time, see the list of configuration options for the input control port.
      • The attribute of type list<TopicsUpdateDesc> contains three elements, which specify the operation that you want to perform on the topics, the list of topics, and the quality of service (QoS) to be provided for each of the topics.
      • The QoS applies only for ADD, REPLACE, and UPDATE operations. For ADD and REPLACE operations, the QoS is applied to the topics that are added or replaced.
      • Tip: To update the QoS for all the subscribed topics, use asterisk (*) as a wildcard character for the topics element.
The following list contains descriptions of the configuration options for the control port. The updated configuration options are used immediately and the operator does not wait until a connection failure to apply these values.
  • connection.trustStore
    • A new truststore file to establish a new SSL connection with the MQTT server. The connection.serverURI option must be provided and the protocol must be set to ssl. Otherwise, the tuple received on the control port is ignored and the operator continues to use its current connection configuration.
  • connection.keyStore
    • A new keystore to establish a new SSL connection with the MQTT server. The connection.serverURI option must be provided and the protocol must be set to ssl. Otherwise, the tuple received on the control port is ignored and the operator continues to use its current connection configuration.
  • connection.keyStorePassword
    • The password to decrypt the new keystore if the keystore is encrypted. The connection.serverURI and connection.keyStore options must be provided and the protocol must be set to ssl. Otherwise, the tuple received on the control port is ignored and the operator continues to use its current connection configuration.
  • connection.serverURI
    • URI of the new MQTT server that you want to connect to. If the connection is configured to use SSL, based on the type of SSL authentication, values for connection.trustStore, connection.keyStore, or both must be provided.

The following example shows how to add topics A, B, C, and D with different QoS values to the list of subscribed topics.

stream<MQTTSourceUpdate.ControlPortType> ControlOp_out1 = Custom(){
  logic
   onProcess :{
     // empty mqtt config because we are not changing connection information.
     map<rstring, rstring> emptyMqttConfig = { "" : "" } ;

     // Topic A and Topic B needs QoS = 1
     MQTTSourceUpdate.TopicsUpdateDesc set1 = { 
       action = MQTTSourceUpdate.ADD,topics = [ "topicA", "topicB" ], qos = 1u } ;

     // Topic C and Topic D needs QoS = 0   
     MQTTSourceUpdate.TopicsUpdateDesc set2 = { 
       action = MQTTSourceUpdate.ADD, topics = [ "topicC", "topicD" ], qos = 0u } ;

     // construct tuple to submit
     tuple<MQTTSourceUpdate.ControlPortType> tupleToSubmit = { 
       mqttConfig = emptyMqttConfig, topic = [ set1, set2 ] } ;
     submit(tupleToSubmit, ControlOp_out1) ;
   }
}

The following example shows how to update QoS for all the existing topics by using the wildcard character (*):

stream<MQTTSourceUpdate.ControlPortType> ControlOp_out2 = Custom(){
  logic
    onProcess :{
      // empty mqtt config because we are not changing connection information.
      map<rstring, rstring> emptyMqttConfig = { "" : "" } ;

      // Update all topics to QoS = 2
      MQTTSourceUpdate.TopicsUpdateDesc set1 = { action =
      MQTTSourceUpdate.UPDATE, topics = [ "*" ], qos = 2u } ;

      // construct tuple to submit
      tuple<MQTTSourceUpdate.ControlPortType> tupleToSubmit = { 
        mqttConfig = emptyMqttConfig, topic = [ set1 ] } ;
      submit(tupleToSubmit, ControlOp_out2) ;
    }
}