streamsx.spl.op module

Invoking SPL Operators

IBM Streams supports Stream Processing Language (SPL), a domain specific language for streaming analytics. SPL creates an application by building a graph of operator invocations. These operators are declared in a SPL toolkit.

SPL streams have a structured schema, such as tuple<rstring id, timestamp ts, float64 value> for a sensor reading with a sensor identifier, timestamp and value. A schema is defined using StreamSchema.

A Python topology application can take advantage of SPL operators by using streams with structured schemas. A stream of Python objects can be converted to a structured stream using map() with the schema parameter set:

# s is stream of Python objects representing a sensor
s = ...

# map s to a structured stream using a lamda function
# for each sensor reading r a Python tuple is created
# with the required values matching the order of the
# structured schema.
s2 = s.map(lambda r : (r.sensor_id, r.reading_time, r.reading),
     schema='tuple<rstring id, timestamp ts, float64 value>'

An SPL operator is invoked in an application by creating an instance of:

  • Invoke - Invocation of an arbitrary SPL operator.
  • Source - Invocation of an SPL source operator with one input port.
  • Map - Invocation of an SPL map operator with one input port and one output port.
  • Sink - Invocation of an SPL sink operator with one output port.

In SPL operator invocation support a number of clauses of which these are supported from Python.

Param clause

Operator parameterization is through operator parameters that configure and modify the operator for the specific application.

Parameters are passed as a dict containing the parameter names and their values. A parameter value may be a constant, an input attribute or an arbitrary SPL expression.

Example, a Beacon operator from the SPL standard toolkit producing 100 tuples at the rate of two per second:

schema = StreamSchema('tuple<uint64 seq>')
beacon = op.Source(topology, 'spl.utility::Beacon', schema,
    params = {'iterations':100, 'period':0.5})

SPL is strictly typed so when passing a constant as a parameter value the value may need to be strongly typed. Python booleans, integers, floats and strings map automatically to SPL boolean, int32, float64 and rstring respectively. The module streamsx.spl.types provides functions to create typed SPL values.

For example to create a count parameter of type uint64 for the SPL DeDuplicate operator:

params['count'] = streamsx.spl.types.uint64(20)

After the instance representing the operator invocation has been created, addition parameters may be added through the params attribute. If the value is an expression that is only valid in the context of the operator invocation then the parameter must be added after

For example, the Filter operator uses an expression that is usually dependent on the context, filtering tuples based upon their attribute values:

fs = op.Map('spl.relational::Filter', beacon)
fs.params['filter'] = fs.expression('seq % 2ul == 0ul')

Output clause

The operator output clause defines the values of attributes on outgoing tuples on the operator invocation’s output ports.

When a tuple is submitted by an operator invocation its attributes are set one of three ways:

  • By the operator based upon its state and input tuples. For example a US ZIP code operator would set the zipcode attribute based upon its lookup of the ZIP code from the address details in the input tuple.
  • By the operator implicitly setting output attributes from matching input attributes. Many streaming operators implicitly set output attributes to allow attributes to flow through the operator without any explicit coding. This only occurs when an output attribute is not explicitly set by the operator or the output clause and the input tuple has an attribute that matches the name and type of the output attribute. For example in the US ZIP code operator if the output tuple included attributes of rstring city, rstring state matching input attributes then they would be implicitly copied from input tuple to output tuple.
  • By an ouput clause in the operator invocation. In this case the application invoking the operator is explicitly setting attributes using SPL expressions. An operator may provide output functions that return values based upon the operator’s state and input tuples. For example the US ZIP code operator might provide a ZIPCode() output function rather than explicitly setting an output attribute. Then the application is free to use any attribute name to represent ZIP code in its output tuple.

In Python an output tuple attribute is set by creating an attribute in the operator invocation instance that is set to a return from the output method.

For example invoking a SPL Beacon operator using an output function to set the sequence number of a tuple and a SPL expression to set the timestamp:

schema = StreamSchema('tuple<uint64 seq, timestamp ts>')
beacon = op.Source(topology, 'spl.utility::Beacon', schema, params = {'period':0.1})

# Set the seq attribute using an output function provided by Beacon
beacon.seq = beacon.output('IterationCount()')

# Set the ts attribute using an SPL function that returns the current time
beacon.ts = beacon.output('getTimestamp()')

See also

Streams Processing Language (SPL) Reference
Reference documentation.
Developing Streams applications
Developing Streams applications.
Operator invocations
Operator invocations from the SPL reference documentation.
class streamsx.spl.op.Expression(_type, _value)

Bases: object

An SPL expression.

static expression(value)

Create an SPL expression.

Parameters:value – Expression as a string or another Expression. If value is an instance of Expression then a new instance is returned containing the same type and value.
Returns:SPL expression from value.
Return type:Expression
spl_json()

Private method. May be removed at any time.

class streamsx.spl.op.Invoke(topology, kind, inputs=None, schemas=None, params=None, name=None)

Bases: streamsx.topology.exop.ExtensionOperator

Declaration of an invocation of an SPL operator in a Topology.

An SPL operator has an arbitrary of input ports and an arbitrary number of output ports. The kind of the operator places constraints on how many input and output ports it supports, and potentially the schemas for those ports. For example spl.relational::Filter has a single input port and one or two output ports, in addition the schemas of the ports must be identical.

When the operator has output ports an instance of SPLOperator has an outputs attributes which is a list of Stream instances.

Parameters:
  • topology (Topology) – Topology that will invoke the operator.
  • kind (str) – SPL operator kind, e.g. spl.utility::Beacon.
  • inputs – Streams to connect to the operator. If not set or set to None or an empty collection then the operator has no input ports. Otherwise a list or tuple of Stream instances where the number of items is the number of input ports.
  • schemas – Schemas of the output ports. If not set or set to None or an empty collection then the operator has no outut ports. Otherwise a list or tuple of schemas where the number of items is the number of output ports.
  • params – Operator parameters.
  • name – Name of the operator. When None defaults to a name derived from the operator kind.
attribute(stream, name)

Expression for an input attribute.

An input attribute is an attribute on one of the input ports of the operator invocation. stream must have been used to declare this invocation.

Parameters:
  • stream (Stream) – Stream the attribute is from.
  • name (str) – Name of the attribute.
Returns:

Expression representing the input attribute.

Return type:

Expression

expression(value)

SPL expression.

An arbitrary expression that is valid in the context of this operator.

Parameters:value (str) – Arbitrary SPL expression.
Returns:Expression that is valid in the context of this operator.
Return type:Expression
output(stream, value)

SPL output port assignment expression.

Parameters:
  • stream (Stream) – Output stream the assignment is for.
  • value (str) – SPL expression used for an output assignment.
Returns:

Output assignment expression that is valid as a the context of this operator.

Return type:

Expression

params

Parameters for the operator invocation.

class streamsx.spl.op.Map(kind, stream, schema=None, params=None, name=None)

Bases: streamsx.spl.op.Invoke

Declaration of an invocation of an SPL map operator.

Map operators have a single input port and single output port.

An instance of Map has an attribute stream that is Stream produced by the operator.

This is a utility class that allows simple invocation of the common case of a operator with a single input stream and single output stream.

Parameters:
  • kind (str) – SPL operator kind, e.g. spl.relational::Filter.
  • stream – Stream to connect to the operator.
  • schema – Schema of the output stream. If set to None then the output schema is the same as the schema of stream.
  • params – Operator parameters.
  • name – Name of the operator. When None defaults to a generated name.
attribute(name)

Expression for an input attribute.

An input attribute is an attribute on the input port of the operator invocation.

Parameters:name (str) – Name of the attribute.
Returns:Expression representing the input attribute.
Return type:Expression
expression(value)

SPL expression.

An arbitrary expression that is valid in the context of this operator.

Parameters:value (str) – Arbitrary SPL expression.
Returns:Expression that is valid in the context of this operator.
Return type:Expression
output(value)

SPL output port assignment expression.

Parameters:value (str) – SPL expression used for an output assignment.
Returns:Output assignment expression that is valid as a the context of this operator.
Return type:Expression
params

Parameters for the operator invocation.

stream

Stream produced by the operator invocation.

Returns:Stream produced by the operator invocation.
Return type:Stream
class streamsx.spl.op.Sink(kind, stream, params=None, name=None)

Bases: streamsx.spl.op.Invoke

Declaration of an invocation of an SPL sink operator.

Source operators typically send data on a stream to an external system. A sink operator has a single input port and no output ports.

This is a utility class that allows simple invocation of the common case of a operator with a single input port.

Parameters:
  • kind (str) – SPL operator kind, e.g. spl.adapter::FileSink.
  • input – Stream to connect to the operator.
  • params – Operator parameters.
  • name – Name of the operator. When None defaults to a generated name.
attribute(stream, name)

Expression for an input attribute.

An input attribute is an attribute on one of the input ports of the operator invocation. stream must have been used to declare this invocation.

Parameters:
  • stream (Stream) – Stream the attribute is from.
  • name (str) – Name of the attribute.
Returns:

Expression representing the input attribute.

Return type:

Expression

expression(value)

SPL expression.

An arbitrary expression that is valid in the context of this operator.

Parameters:value (str) – Arbitrary SPL expression.
Returns:Expression that is valid in the context of this operator.
Return type:Expression
output(stream, value)

SPL output port assignment expression.

Parameters:
  • stream (Stream) – Output stream the assignment is for.
  • value (str) – SPL expression used for an output assignment.
Returns:

Output assignment expression that is valid as a the context of this operator.

Return type:

Expression

params

Parameters for the operator invocation.

class streamsx.spl.op.Source(topology, kind, schema, params=None, name=None)

Bases: streamsx.spl.op.Invoke

Declaration of an invocation of an SPL source operator.

Source operators typically bring external data into a Streams application as a stream. A source operator has no input ports and a single output port.

An instance of Source has an attribute stream that is Stream produced by the operator.

This is a utility class that allows simple invocation of the common case of a operator with a single output port.

Parameters:
  • topology (Topology) – Topology that will invoke the operator.
  • kind (str) – SPL operator kind, e.g. spl.utility::Beacon.
  • schema – Schema of the output port.
  • params – Operator parameters.
  • name – Name of the operator. When None defaults to a generated name.
attribute(stream, name)

Expression for an input attribute.

An input attribute is an attribute on one of the input ports of the operator invocation. stream must have been used to declare this invocation.

Parameters:
  • stream (Stream) – Stream the attribute is from.
  • name (str) – Name of the attribute.
Returns:

Expression representing the input attribute.

Return type:

Expression

expression(value)

SPL expression.

An arbitrary expression that is valid in the context of this operator.

Parameters:value (str) – Arbitrary SPL expression.
Returns:Expression that is valid in the context of this operator.
Return type:Expression
output(value)

SPL output port assignment expression.

Parameters:value (str) – SPL expression used for an output assignment.
Returns:Output assignment expression that is valid as a the context of this operator.
Return type:Expression
params

Parameters for the operator invocation.

stream

Stream produced by the operator invocation.

Returns:Stream produced by the operator invocation.
Return type:Stream