streamsx.spl.op¶
Integration of SPL operators.
Invoking SPL Operators¶
IBM Streams supports Stream Processing Language (SPL), a domain specific language for streaming analytics. SPL creates an application by building a graph of operator invocations. These operators are declared in an SPL toolkit.
SPL streams have a structured schema, such as
tuple<rstring id, timestamp ts, float64 value> for
a sensor reading with a sensor identifier, timestamp and value.
A schema is defined using StreamSchema.
A Python topology application can take advantage of SPL operators
by using streams with structured schemas. A stream of Python objects
can be converted to a structured stream using
map()
with the schema parameter set:
# s is stream of Python objects representing a sensor
s = ...
# map s to a structured stream using a lambda function
# for each sensor reading r a Python tuple is created
# with the required values matching the order of the
# structured schema.
s2 = s.map(lambda r : (r.sensor_id, r.reading_time, r.reading),
     schema='tuple<rstring id, timestamp ts, float64 value>'
An SPL operator is invoked in an application by creating an instance of:
In SPL, operator invocation supports a number of clauses that are supported in Python.
Values for operator clauses¶
When an operator clause requires a value, the value may be passed as
a constant,
an input attribute (passed using the attribute method of the invocation),
or an arbitrary SPL expression (passed as a string or an Expression).
Because a string is interpreted as an SPL expression, a string constant
should be passed by enclosing the quoted string in outer quotes (for example, ‘“a string constant”’).
SPL is strictly typed so when passing a constant as a value the value may need to be strongly typed.
bool,int,floatandstrvalues map automatically to SPL boolean, int32, float64 and rstring respectively.
Enumvalues map to an operator custom literal using the symbolic name of the value. For custom literals only the symbolic name needs to match a value expected by the operator, the class name and other values are arbitrary.The module
streamsx.spl.typesprovides functions to create typed SPL expressions from values.
An optional type may be set to SPL null by passing either Python None or
the value returned from null().
Param clause¶
Operator parameterization is through operator parameters that configure and modify the operator for the specific application.
Parameters are passed as a dict containing the parameter names and their values (see Values for operator clauses).
Examples
To invoke a Beacon operator from the SPL standard toolkit producing 100 tuples at the rate of two per second:
schema = StreamSchema('tuple<uint64 seq>')
beacon = op.Source(topology, 'spl.utility::Beacon', schema,
    params = {'iterations':100, 'period':0.5})
To use an IntEnum to pass a custom literal to the Parse operator:
from enum import IntEnum
class DataFormats(IntEnum):
    csv = 0
    txt = 1
...
params['format'] = DataFormats.csv
To create a count parameter of type uint64 for the SPL DeDuplicate operator:
params['count'] = streamsx.spl.types.uint64(20)
After the instance representing the operator invocation has been created, additional parameters may be added through the params attribute. If the value is an expression that is only valid in the context of the operator invocation then the parameter must be added after the operator invocation has been created.
For example, the Filter operator uses an expression that is usually dependent on the context, filtering tuples based upon their attribute values:
fs = op.Map('spl.relational::Filter', beacon)
fs.params['filter'] = fs.expression('seq % 2ul == 0ul')
Output clause¶
The operator output clause defines the values of attributes on outgoing tuples on the operator invocation’s output ports.
When a tuple is submitted by an operator invocation each of its attributes is set in one of three ways:
By the operator based upon its state and input tuples. For example, a US ZIP code operator might set the zipcode attribute based upon its lookup of the ZIP code from the address details in the input tuple.
By the operator implicitly setting output attributes from matching input attributes when those attributes have not been explicitly set elsewhere. Many streaming operators implicitly set output attributes to allow attributes to flow through the operator without any explicit coding. This only occurs when an output attribute is not explicitly set by the operator, or the output clause, and the input tuple has an attribute that matches the output attribute (same name and type, or same name and same type as the underlying type of an output attribute with an optional type). For example, in the US ZIP code operator, if the output tuple included attributes of
rstring city, rstring statethat matched input attributes, then they would be implicitly copied from the input tuple to the output tuple.By an output clause in the operator invocation. In this case the application invoking the operator is explicitly setting attributes using SPL expressions. An operator may provide output functions that return values based upon the operator’s state and input tuples. For example, the US ZIP code operator might provide a
ZIPCode()output function rather than explicitly setting an output attribute. Then the application is free to use any attribute name to represent the ZIP code in its output tuple.
In Python an output tuple attribute is set by creating an attribute in the operator invocation instance that is set to a return from the output method. The attribute value passed to the output method is passed as described in Values for operator clauses.
For example, invoking an SPL Beacon operator using an output function to set the sequence number of a tuple and an SPL expression to set the timestamp:
schema = StreamSchema('tuple<uint64 seq, timestamp ts>')
beacon = op.Source(topology, 'spl.utility::Beacon', schema, params = {'period':0.1})
# Set the seq attribute using an output function provided by Beacon
beacon.seq = beacon.output('IterationCount()')
# Set the ts attribute using an SPL function that returns the current time
beacon.ts = beacon.output('getTimestamp()')
See also
- Streams Processing Language (SPL) Reference
 Reference documentation.
- Developing Streams applications
 Developing Streams applications.
- Operator invocations
 Operator invocations from the SPL reference documentation.
Module contents¶
Functions
Wrap a main composite invocation as a Topology.  | 
Classes
An SPL expression.  | 
|
Declaration of an invocation of an SPL operator in a Topology.  | 
|
Declaration of an invocation of an SPL map operator.  | 
|
Declaration of an invocation of an SPL sink operator.  | 
|
Declaration of an invocation of an SPL source operator.  | 
- 
class 
streamsx.spl.op.Invoke(topology, kind, inputs=None, schemas=None, params=None, name=None)¶ Bases:
streamsx._streams._placement._Placement,streamsx.topology.exop.ExtensionOperatorDeclaration of an invocation of an SPL operator in a Topology.
An SPL operator has an arbitrary of input ports and an arbitrary number of output ports. The kind of the operator places constraints on how many input and output ports it supports, and potentially the schemas for those ports. For example,
spl.relational::Filterhas a single input port and one or two output ports, in addition the schemas of the ports must be identical.When the operator has output ports an instance of
SPLOperatorhas anoutputsattributes which is a list ofStreaminstances.- Parameters
 topology (Topology) – Topology that will invoke the operator.
kind (str) – SPL operator kind, e.g.
spl.utility::Beacon.inputs – Streams to connect to the operator. If not set or set to None or an empty collection then the operator has no input ports. Otherwise a list or tuple of
Streaminstances where the number of items is the number of input ports.schemas – Schemas of the output ports. If not set or set to None or an empty collection then the operator has no outut ports. Otherwise a list or tuple of schemas where the number of items is the number of output ports.
params – Operator parameters.
name – Name of the operator. When None defaults to a name derived from the operator kind.
- 
attribute(stream, name)¶ Expression for an input attribute.
An input attribute is an attribute on one of the input ports of the operator invocation. stream must have been used to declare this invocation.
- Parameters
 stream (Stream) – Stream the attribute is from.
name (str) – Name of the attribute.
- Returns
 Expression representing the input attribute.
- Return type
 
- 
property 
category¶ Category for this processing logic.
An arbitrary application label allowing grouping of application elements by category.
Assign categories based on common function. For example, database is a common category that you can use to group all database sinks in an application.
A category is not required and defaults to
Nonemeaning no assigned category.Streams console supports visualization based upon categories.
- Raises
 TypeError – No directly associated processing logic.
Note
A category has no affect on the execution of the application.
New in version 1.9.
- 
colocate(others)¶ Colocate this processing logic with others.
Colocating processing logic requires execution in the same Streams processing element (operating system process).
When a job is submitted Streams may colocate (fuse) processing logic into the same processing element based upon flow analysis and current resource usage. This call instructs that this logic and others must be executed in the same processing element.
- 
expression(value)¶ SPL expression.
An arbitrary expression that is valid in the context of this operator.
- Parameters
 value (str) – Arbitrary SPL expression.
- Returns
 Expression that is valid in the context of this operator.
- Return type
 
- 
output(stream, value)¶ SPL output port assignment expression.
- Parameters
 stream (Stream) – Output stream the assignment is for.
value (str) – SPL expression used for an output assignment. This can be a string, a constant, or an
Expression.
- Returns
 Output assignment expression that is valid as a the context of this operator.
- Return type
 
- 
property 
params¶ Parameters for the operator invocation.
Resource tags for this processing logic.
Tags are a mechanism for differentiating and identifying resources that have different physical characteristics or logical uses. For example a resource (host) that has external connectivity for public data sources may be tagged ingest.
Processing logic can be associated with one or more tags to require running on suitably tagged resources. For example adding tags ingest and db requires that the processing element containing the callable that created the stream runs on a host tagged with both ingest and db.
A
Streamthat was not created directly with a Python callable cannot have tags associated with it. For example a stream that is aunion()of multiple streams cannot be tagged. In this case this method returns an empty frozenset which cannot be modified.See https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.2.1/com.ibm.streams.admin.doc/doc/tags.html for more details of tags within IBM Streams.
- Returns
 Set of resource tags, initially empty.
- Return type
 set
Warning
If no resources exist with the required tags then job submission will fail.
New in version 1.7.
- 
class 
streamsx.spl.op.Source(topology, kind, schema, params=None, name=None)¶ Bases:
streamsx.spl.op.InvokeDeclaration of an invocation of an SPL source operator.
Source operators typically bring external data into a Streams application as a stream. A source operator has no input ports and a single output port.
An instance of Source has an attribute
streamthat isStreamproduced by the operator.This is a utility class that allows simple invocation of the common case of a operator with a single output port.
- Parameters
 topology (Topology) – Topology that will invoke the operator.
kind (str) – SPL operator kind, e.g.
spl.utility::Beacon.schema – Schema of the output port.
params – Operator parameters.
name – Name of the operator. When None defaults to a generated name.
- 
attribute(stream, name)¶ Expression for an input attribute.
An input attribute is an attribute on one of the input ports of the operator invocation. stream must have been used to declare this invocation.
- Parameters
 stream (Stream) – Stream the attribute is from.
name (str) – Name of the attribute.
- Returns
 Expression representing the input attribute.
- Return type
 
- 
property 
category¶ Category for this processing logic.
An arbitrary application label allowing grouping of application elements by category.
Assign categories based on common function. For example, database is a common category that you can use to group all database sinks in an application.
A category is not required and defaults to
Nonemeaning no assigned category.Streams console supports visualization based upon categories.
- Raises
 TypeError – No directly associated processing logic.
Note
A category has no affect on the execution of the application.
New in version 1.9.
- 
colocate(others)¶ Colocate this processing logic with others.
Colocating processing logic requires execution in the same Streams processing element (operating system process).
When a job is submitted Streams may colocate (fuse) processing logic into the same processing element based upon flow analysis and current resource usage. This call instructs that this logic and others must be executed in the same processing element.
- 
expression(value)¶ SPL expression.
An arbitrary expression that is valid in the context of this operator.
- Parameters
 value (str) – Arbitrary SPL expression.
- Returns
 Expression that is valid in the context of this operator.
- Return type
 
- 
output(value)¶ SPL output port assignment expression.
- Parameters
 value (str) – SPL expression used for an output assignment. This can be a string, a constant, or an
Expression.- Returns
 Output assignment expression that is valid as a the context of this operator.
- Return type
 
- 
property 
params¶ Parameters for the operator invocation.
Resource tags for this processing logic.
Tags are a mechanism for differentiating and identifying resources that have different physical characteristics or logical uses. For example a resource (host) that has external connectivity for public data sources may be tagged ingest.
Processing logic can be associated with one or more tags to require running on suitably tagged resources. For example adding tags ingest and db requires that the processing element containing the callable that created the stream runs on a host tagged with both ingest and db.
A
Streamthat was not created directly with a Python callable cannot have tags associated with it. For example a stream that is aunion()of multiple streams cannot be tagged. In this case this method returns an empty frozenset which cannot be modified.See https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.2.1/com.ibm.streams.admin.doc/doc/tags.html for more details of tags within IBM Streams.
- Returns
 Set of resource tags, initially empty.
- Return type
 set
Warning
If no resources exist with the required tags then job submission will fail.
New in version 1.7.
- 
class 
streamsx.spl.op.Map(kind, stream, schema=None, params=None, name=None)¶ Bases:
streamsx.spl.op.InvokeDeclaration of an invocation of an SPL map operator.
Map operators have a single input port and single output port.
An instance of Map has an attribute
streamthat isStreamproduced by the operator.This is a utility class that allows simple invocation of the common case of a operator with a single input stream and single output stream.
- Parameters
 kind (str) – SPL operator kind, e.g.
spl.relational::Filter.stream – Stream to connect to the operator.
schema – Schema of the output stream. If set to None then the output schema is the same as the schema of stream.
params – Operator parameters.
name – Name of the operator. When None defaults to a generated name.
- 
attribute(name)¶ Expression for an input attribute.
An input attribute is an attribute on the input port of the operator invocation.
- Parameters
 name (str) – Name of the attribute.
- Returns
 Expression representing the input attribute.
- Return type
 
- 
property 
category¶ Category for this processing logic.
An arbitrary application label allowing grouping of application elements by category.
Assign categories based on common function. For example, database is a common category that you can use to group all database sinks in an application.
A category is not required and defaults to
Nonemeaning no assigned category.Streams console supports visualization based upon categories.
- Raises
 TypeError – No directly associated processing logic.
Note
A category has no affect on the execution of the application.
New in version 1.9.
- 
colocate(others)¶ Colocate this processing logic with others.
Colocating processing logic requires execution in the same Streams processing element (operating system process).
When a job is submitted Streams may colocate (fuse) processing logic into the same processing element based upon flow analysis and current resource usage. This call instructs that this logic and others must be executed in the same processing element.
- 
expression(value)¶ SPL expression.
An arbitrary expression that is valid in the context of this operator.
- Parameters
 value (str) – Arbitrary SPL expression.
- Returns
 Expression that is valid in the context of this operator.
- Return type
 
- 
output(value)¶ SPL output port assignment expression.
- Parameters
 value (str) – SPL expression used for an output assignment. This can be a string, a constant, or an
Expression.- Returns
 Output assignment expression that is valid as a the context of this operator.
- Return type
 
- 
property 
params¶ Parameters for the operator invocation.
Resource tags for this processing logic.
Tags are a mechanism for differentiating and identifying resources that have different physical characteristics or logical uses. For example a resource (host) that has external connectivity for public data sources may be tagged ingest.
Processing logic can be associated with one or more tags to require running on suitably tagged resources. For example adding tags ingest and db requires that the processing element containing the callable that created the stream runs on a host tagged with both ingest and db.
A
Streamthat was not created directly with a Python callable cannot have tags associated with it. For example a stream that is aunion()of multiple streams cannot be tagged. In this case this method returns an empty frozenset which cannot be modified.See https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.2.1/com.ibm.streams.admin.doc/doc/tags.html for more details of tags within IBM Streams.
- Returns
 Set of resource tags, initially empty.
- Return type
 set
Warning
If no resources exist with the required tags then job submission will fail.
New in version 1.7.
- 
class 
streamsx.spl.op.Sink(kind, stream, params=None, name=None)¶ Bases:
streamsx.spl.op.InvokeDeclaration of an invocation of an SPL sink operator.
Source operators typically send data on a stream to an external system. A sink operator has a single input port and no output ports.
This is a utility class that allows simple invocation of the common case of a operator with a single input port.
- Parameters
 kind (str) – SPL operator kind, e.g.
spl.adapter::FileSink.input – Stream to connect to the operator.
params – Operator parameters.
name – Name of the operator. When None defaults to a generated name.
- 
attribute(stream, name)¶ Expression for an input attribute.
An input attribute is an attribute on one of the input ports of the operator invocation. stream must have been used to declare this invocation.
- Parameters
 stream (Stream) – Stream the attribute is from.
name (str) – Name of the attribute.
- Returns
 Expression representing the input attribute.
- Return type
 
- 
property 
category¶ Category for this processing logic.
An arbitrary application label allowing grouping of application elements by category.
Assign categories based on common function. For example, database is a common category that you can use to group all database sinks in an application.
A category is not required and defaults to
Nonemeaning no assigned category.Streams console supports visualization based upon categories.
- Raises
 TypeError – No directly associated processing logic.
Note
A category has no affect on the execution of the application.
New in version 1.9.
- 
colocate(others)¶ Colocate this processing logic with others.
Colocating processing logic requires execution in the same Streams processing element (operating system process).
When a job is submitted Streams may colocate (fuse) processing logic into the same processing element based upon flow analysis and current resource usage. This call instructs that this logic and others must be executed in the same processing element.
- 
expression(value)¶ SPL expression.
An arbitrary expression that is valid in the context of this operator.
- Parameters
 value (str) – Arbitrary SPL expression.
- Returns
 Expression that is valid in the context of this operator.
- Return type
 
- 
output(stream, value)¶ SPL output port assignment expression.
- Parameters
 stream (Stream) – Output stream the assignment is for.
value (str) – SPL expression used for an output assignment. This can be a string, a constant, or an
Expression.
- Returns
 Output assignment expression that is valid as a the context of this operator.
- Return type
 
- 
property 
params¶ Parameters for the operator invocation.
Resource tags for this processing logic.
Tags are a mechanism for differentiating and identifying resources that have different physical characteristics or logical uses. For example a resource (host) that has external connectivity for public data sources may be tagged ingest.
Processing logic can be associated with one or more tags to require running on suitably tagged resources. For example adding tags ingest and db requires that the processing element containing the callable that created the stream runs on a host tagged with both ingest and db.
A
Streamthat was not created directly with a Python callable cannot have tags associated with it. For example a stream that is aunion()of multiple streams cannot be tagged. In this case this method returns an empty frozenset which cannot be modified.See https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.2.1/com.ibm.streams.admin.doc/doc/tags.html for more details of tags within IBM Streams.
- Returns
 Set of resource tags, initially empty.
- Return type
 set
Warning
If no resources exist with the required tags then job submission will fail.
New in version 1.7.
- 
class 
streamsx.spl.op.Expression(_type, _value)¶ Bases:
objectAn SPL expression.
- 
static 
expression(value)¶ Create an SPL expression.
- Parameters
 value – Expression as a string or another Expression. If value is an instance of Expression then a new instance is returned containing the same type and value.
- Returns
 SPL expression from value.
- Return type
 
- 
static 
 
- 
streamsx.spl.op.main_composite(kind, toolkits=None, name=None)¶ Wrap a main composite invocation as a Topology.
Provides a bridge between an SPL application (main composite) and a Topology. Create a Topology that contains just the invocation of the main composite defined by kind.
The returned Topology may be used like any other topology instance including job configuration, tester or even addition of SPL operator invocations or functional transformations.
Note
Since a main composite by definition has no input or output ports any functionality added to the topology cannot interact directly with its invocation.
When name is
Noneand no additions or tests are made to the topology then SPL compilation uses kind directly. Otherwise the main composite invocation is invoked within a generated main composite.- Parameters
 kind (str) – Kind of the main composite operator invocation. Must be a namespace qualified name.
toolkits (list[str]) – Optional list of toolkits the main composite depends on.
name (str) – Invocation name for the main composite.
- Returns
 tuple containing:
Topology: Topology with main composite invocation.
Invoke: Invocation of the main composite
- Return type
 tuple