SPL File topicsfilter.spl

Toolkits > com.ibm.streamsx.topology 2.1.0 > com.ibm.streamsx.topology.topic > topicsfilter.spl

Subscribe functionality with data filters. FilteredSubscribe subscribes to streams and filters the data, optimizing the filter where possible by pushing it to the publisher to reduce network traffic.

Content

Operators
Functions

Composites

composite FilteredSubscribe(output FilteredTopic)

Filtered subscribe to a topic. Generates a stream that is subscribed, through IBM Streams dynamic connections, to all streams published to the same topic and streamType.

The tuples on FilteredTopic are a subset of all tuples on streams published to topic.

For any published stream that allows filters (see Publish) remoteFilter is pushed to the publishing application reducing the number of tuples sent over the network between applications by only sending tuples that match the filter.

Tuples that match the remote filter and those from published streams that do not allow filters are then filtered locally using the function localFilterFunction. For each tuple that arrives to this operator invocation from published streams localFilterFunction(tuple, localFilterParameter) is called. If localFilterFunction returns true then the tuple is present on FilteredTopic.

The benefit of this operator over a Publish followed by a Filter is that remote filtering will reduce network use and local cpu processing cost by filtering tuples at the publishing side. The operator is also designed for microservice architecture so that an application can use this operator without any knowledge of how a topic is being published, and the output stream always contains the correctly filtered data.

Typically the remote filter and local filter are equivalent, for example, a parameterized composite that filters sensor readings by sensor identifier, in this case the filter expression is id == $id where $id is sensor identifier being filtered for:

type Sensor = tuple<rstring id, float64 reading>;

// Function that acts as the local filter
public boolean filterSensor(Sensor t, rstring id) {
    return t.id == id;
}

public composite SensorById(output stream<Sensor> Readings) {
    param expression<rstring> $id;

    graph

    stream<Sensor> Readings = FilteredSubscribe() {
       param
         topic: "sensors/readings";
         streamType: Sensor;

         // Remote filter uses a utility method to
         // create the filter
         remoteFilter: equalsFilter("id", $id);

         // local filtering uses a function
         localFilterFunction: filterSensor;
         localFilterParameter: $id;
    }
}

The local filter can be different to the remote filter to take advantage of the richer programming model in SPL compared to filter expressions. To maintain expected behavior the filters should be consistent with each other so that if localFilterFunction returns true then remote filter should evaluate to true. The local filter thus passes through a strict subset of tuples that would pass the remote filter.

The remote filter is a filter expression represented as a rstring using the format specified by the Import operator. This namespace provides a number of utility functions to build an expression.

The local filter is a SPL function that takes two arguments, the tuple and the parameter. The type of the first argument, the tuple, must match the output stream type streamType. The second argument matches the parameter type localFilterParameter and is specific to the invocation of this operator. If local filtering does not need a parameter as the expression is only against tuple attributes, e.g. t.reading > 80.0, then localFilterParameter need not be set. In this case int32 0 is passed so the function would need to be:

public boolean filterByReading(Sensor t, int32 unused) {
   return t.reading > 80.0;
}

Subscribe behavior in parallel regions matches the behavior described by Subscribe.

See namespace:com.ibm.streamsx.topology.topic for details.

Parameters

Output Ports

Functions

rstring getTopicSubscription(rstring topic, boolean allowFilter)

Get the subscription that matches a topic for topic based subscription with explicit matching for the publisher allowing filters.

This is a low-level function that allows applications to build custom composites with functionality similar to FilteredSubscribe. The preference should be to use FilteredSubscribe where possible.

See getTopicSubscription(rstring).

Parameters

Returns

rstring andFilter(rstring cond1, rstring cond2)

Return a string representation of cond1 && cond2.

Suitable for use as a remote filter for FilteredSubscribe.

Parameters

Returns

rstring orFilter(rstring cond1, rstring cond2)

Return a string representation of cond1 || cond2.

Suitable for use as a remote filter for FilteredSubscribe.

Parameters

Returns

rstring equalsFilter(rstring name, int64 value)

Return a string representation of name == value for a int64 attribute.

Suitable for use as a remote filter for FilteredSubscribe.

Parameters

Returns

rstring equalsFilter(rstring name, rstring value)

Return a string representation of name == value for a rstring attribute.

Suitable for use as a remote filter for FilteredSubscribe.

Parameters

Returns

rstring equalsFilter(rstring name, float64 value)

Return a string representation of name == value for a float64 attribute.

Suitable for use as a remote filter for FilteredSubscribe.

Parameters

Returns

rstring inFilter(rstring name, list<rstring> values)

Return a string representation of filter name in values for a rstring attribute.

If values is empty then the filter expression is equivalent name != "" with the intention of passing all tuples.

Suitable for use as a remote filter for FilteredSubscribe.

Parameters

Returns

int32 setSubscribeFilter(rstring filter)

Set filter for input port zero. Calls setInputPortImportFilterExpression logging an error if the filter could not be set.

Parameters

Returns