streamsx.spl.spl¶
SPL Python primitive operators.
Overview¶
SPL primitive operators that call a Python function or class methods are created by decorators provided by this module.
The name of the function or callable class becomes the name of the operator.
A decorated function is a stateless operator while a decorated class is an optionally stateful operator.
These are the supported decorators that create an SPL operator:
@spl.source
- Creates a source operator that produces tuples.
@spl.filter
- Creates a operator that filters tuples.
@spl.map
- Creates a operator that maps input tuples to output tuples.
@spl.for_each
- Creates a operator that terminates a stream processing each tuple.
@spl.primitive_operator
- Creates an SPL primitive operator that has an arbitrary number of input and output ports.
Decorated functions and classes must be located in the directory
opt/python/streams
in the SPL toolkit. Each module in that directory
will be inspected for operators during extraction. Each module defines
the SPL namespace for its operators by the function spl_namespace
,
for example:
from streamsx.spl import spl
def spl_namespace():
return 'com.example.ops'
@spl.map()
def Pass(*tuple_):
return tuple_
creates a pass-through operator com.example.ops::Pass
.
SPL primitive operators are created by executing the extraction script spl-python-extract against the SPL toolkit. Once created the operators become part of the toolkit and may be used like any other SPL operator.
Python classes as SPL operators¶
Overview¶
Decorating a Python class creates a stateful SPL operator where the instance fields of the class are the operator’s state. An instance of the class is created when the SPL operator invocation is initialized at SPL runtime. The instance of the Python class is private to the SPL operator and is maintained for the lifetime of the operator.
If the class has instance fields then they are the state of the operator and are private to each invocation of the operator.
If the __init__ method has parameters beyond the first self parameter then they are mapped to operator parameters. Any parameter that has a default value becomes an optional parameter to the SPL operator. Parameters of the form *args and **kwargs are not supported.
Warning
Parameter names must be valid SPL identifers, SPL identifiers start with an ASCII letter or underscore, followed by ASCII letters, digits, or underscores. The name also must not be an SPL keyword.
Parameter names suppress
and include
are reserved.
The value of the operator parameters at SPL operator invocation are passed to the __init__ method. This is equivalent to creating an instance of the class passing the operator parameters into the constructor.
For example, with this decorated class producing an SPL source operator:
@spl.source()
class Range(object):
def __init__(self, stop, start=0):
self.start = start
self.stop = stop
def __iter__(self):
return zip(range(self.start, self.stop))
The SPL operator Range has two parameters, stop is mandatory and start is optional, defaulting to zero. Thus the SPL operator may be invoked as:
// Produces the sequence of values from 0 to 99
//
// Creates an instance of the Python class
// Range using Range(100)
//
stream<int32 seq> R = Range() {
param
stop: 100;
}
or both operator parameters can be set:
// Produces the sequence of values from 50 to 74
//
// Creates an instance of the Python class
// Range using Range(75, 50)
//
stream<int32 seq> R = Range() {
param
start: 50;
stop: 75;
}
Operator state¶
Use of a class allows the operator to be stateful by maintaining state in instance attributes across invocations (tuple processing).
When the operator is in a consistent region or checkpointing then it is serialized using dill. The default serialization may be modified by using the standard Python pickle mechanism of __getstate__
and __setstate__
. This is required if the state includes objects that cannot be serialized, for example file descriptors. For details see See https://docs.python.org/3.5/library/pickle.html#handling-stateful-objects .
If the class has __enter__
and __exit__
context manager methods then __enter__
is called after the instance has been deserialized by dill. Thus __enter__
is used to recreate runtime objects that cannot be serialized such as open files or sockets.
Operator initialization & shutdown¶
Execution of an instance for an operator effectively run in a context manager so that an instance’s __enter__
method is called when the processing element containing the operator is initialized
and its __exit__
method called when the processing element is stopped. To take advantage of this
the class must define both __enter__
and __exit__
methods.
Note
Initialization such as opening files should be in __enter__
in order to support stateful operator restart & checkpointing.
Example of using __enter__
and __exit__
to open and close a file:
import streamsx.ec as ec
@spl.map()
class Sentiment(object):
def __init__(self, name):
self.name = name
self.file = None
def __enter__(self):
self.file = open(self.name, 'r')
def __exit__(self, exc_type, exc_value, traceback):
if self.file is not None:
self.file.close()
def __call__(self):
pass
When an instance defines a valid __exit__
method then it will be called with an exception when:
the instance raises an exception during processing of a tuple
a data conversion exception is raised converting a Python value to an SPL tuple or attribute
If __exit__
returns a true value then the exception is suppressed and processing continues, otherwise the enclosing processing element will be terminated.
Application log and trace¶
IBM Streams provides application trace and log services which are accesible through standard Python loggers from the logging module.
Python functions as SPL operators¶
Decorating a Python function creates a stateless SPL operator. In SPL terms this is similar to an SPL Custom operator, where the code in the Python function is the custom code. For operators with input ports the function is called for each input tuple, passing a Python representation of the SPL input tuple. For an SPL source operator the function is called to obtain an iterable whose contents will be submitted to the output stream as SPL tuples.
Operator parameters are not supported.
An example SPL sink operator that prints each input SPL tuple after its conversion to a Python tuple:
@spl.for_each()
def PrintTuple(*tuple_):
"Print each tuple to standard out."
print(tuple_, flush=True)
Processing SPL tuples in Python¶
SPL tuples are converted to Python objects and passed to a decorated callable.
Overview¶
For each SPL tuple arriving at an input port a Python function is called with the SPL tuple converted to Python values suitable for the function call. How the tuple is passed is defined by the tuple passing style.
Tuple Passing Styles¶
- An input tuple can be passed to Python function using a number of different styles:
dictionary
tuple
attributes by name not yet implemented
attributes by position
Dictionary¶
Passing the SPL tuple as a Python dictionary is flexible
and makes the operator independent of any schema.
A disadvantage is the reduction in code readability
for Python function by not having formal parameters,
though getters such as tuple['id']
mitigate that to some extent.
If the function is general purpose and can derive meaning
from the keys that are the attribute names then **kwargs
can be useful.
When the only function parameter is **kwargs
(e.g. def myfunc(**tuple_):
) then the passing style is dictionary.
All of the attributes are passed in the dictionary using the SPL schema attribute name as the key.
Tuple¶
Passing the SPL tuple as a Python tuple is flexible
and makes the operator independent of any schema
but is brittle to changes in the SPL schema.
Another disadvantage is the reduction in code readability
for Python function by not having formal parameters.
However if the function is general purpose and independent
of the tuple contents *args
can be useful.
When the only function parameter is *args
(e.g. def myfunc(*tuple_):
) then the passing style is tuple.
All of the attributes are passed as a Python tuple with the order of values matching the order of the SPL schema.
Attributes by name¶
(not yet implemented)
Passing attributes by name can be robust against changes in the SPL scheme, e.g. additional attributes being added in the middle of the schema, but does require that the SPL schema has matching attribute names.
When attributes by name is used then SPL tuple attributes
are passed to the function by name for formal parameters.
Order of the attributes and parameters need not match.
This is supported for function parameters of
kind POSITIONAL_OR_KEYWORD
and KEYWORD_ONLY
.
If the function signature also contains a parameter of the form
**kwargs
(VAR_KEYWORD
) then any attributes not bound to
formal parameters are passed in its dictionary using the
SPL schema attribute name as the key.
If the function signature also contains an arbitrary argument
list *args
then any attributes not bound to formal parameters
or to **kwargs
are passed in order of the SPL schema.
If there are only formal parameters any non-bound attributes are not passed into the function.
Attributes by position¶
Passing attributes by position allows the SPL operator to be independent of the SPL schema but is brittle to changes in the SPL schema. For example a function expecting an identifier and a sensor reading as the first two attributes would break if an attribute representing region was added as the first SPL attribute.
When attributes by position is used then SPL tuple attributes are passed to the function by position for formal parameters. The first SPL attribute in the tuple is passed as the first parameter. This is supported for function parameters of kind POSITIONAL_OR_KEYWORD.
If the function signature also contains an arbitrary argument list *args (VAR_POSITIONAL) then any attributes not bound to formal parameters are passed in order of the SPL schema.
The function signature must not contain a parameter of the form
**kwargs
(VAR_KEYWORD).
If there are only formal parameters any non-bound attributes are not passed into the function.
The SPL schema must have at least the number of positional arguments the function requires.
Selecting the style¶
For signatures only containing a parameter of the form
*args
or **kwargs
the style is implicitly defined:
def f(**tuple_)
- dictionary -tuple_
will contain a dictionary of all of the SPL tuple attribute’s values with the keys being the attribute names.
def f(*tuple_)
- tuple -tuple_
will contain all of the SPL tuple attribute’s values in order of the SPL schema definition.
Otherwise the style is set by the style
parameter to the decorator,
defaulting to attributes by name. The style value can be set to:
'name'
- attributes by name (the default)
'position'
- attributes by position
Examples
These examples show how an SPL tuple with the schema and value:
tuple<rstring id, float64 temp, boolean increase>
{id='battery', temp=23.7, increase=true}
is passed into a variety of functions by showing the effective Python call and the resulting values of the function’s parameters.
Dictionary consuming all attributes by **kwargs
:
@spl.map()
def f(**tuple_)
pass
# f({'id':'battery', 'temp':23.7, 'increase': True})
# tuple_={'id':'battery', 'temp':23.7, 'increase':True}
Tuple consuming all attributes by *args
:
@spl.map()
def f(*tuple_)
pass
# f('battery', 23.7, True)
# tuple_=('battery',23.7, True)
Attributes by name consuming all attributes:
@spl.map()
def f(id, temp, increase)
pass
# f(id='battery', temp=23.7, increase=True)
# id='battery'
# temp=23.7
# increase=True
Attributes by name consuming a subset of attributes:
@spl.map()
def f(id, temp)
pass
# f(id='battery', temp=23.7)
# id='battery'
# temp=23.7
Attributes by name consuming a subset of attributes in a different order:
@spl.map()
def f(increase, temp)
pass
# f(temp=23.7, increase=True)
# increase=True
# temp=23.7
Attributes by name consuming id by name and remaining attributes by **kwargs
:
@spl.map()
def f(id, **tuple_)
pass
# f(id='battery', {'temp':23.7, 'increase':True})
# id='battery'
# tuple_={'temp':23.7, 'increase':True}
Attributes by name consuming id by name and remaining attributes by *args
:
@spl.map()
def f(id, *tuple_)
pass
# f(id='battery', 23.7, True)
# id='battery'
# tuple_=(23.7, True)
Attributes by position consuming all attributes:
@spl.map(style='position')
def f(key, value, up)
pass
# f('battery', 23.7, True)
# key='battery'
# value=23.7
# up=True
Attributes by position consuming a subset of attributes:
@spl.map(style='position')
def f(a, b)
pass
# f('battery', 23.7)
# a='battery'
# b=23.7
Attributes by position consuming id by position and remaining attributes by *args
:
@spl.map(style='position')
def f(key, *tuple_)
pass
# f('battery', 23.7, True)
# key='battery'
# tuple_=(23.7, True)
In all cases the SPL tuple must be able to provide all parameters required by the function. If the SPL schema is insufficient then an error will result, typically an SPL compile time error.
The SPL schema can provide a subset of the formal parameters if the remaining attributes are optional (having a default).
Attributes by name consuming a subset of attributes with an optional parameter not matched by the schema:
@spl.map()
def f(id, temp, pressure=None)
pass
# f(id='battery', temp=23.7)
# id='battery'
# temp=23.7
# pressure=None
Submission of SPL tuples from Python¶
The return from a decorated callable results in submission of SPL tuples on the associated outut port.
- A Python function must return:
None
a Python tuple
a Python dictionary
a list containing any of the above.
None¶
When None
is return then no tuple will be submitted to
the operator output port.
Python tuple¶
When a Python tuple is returned it is converted to an SPL tuple and submitted to the output port.
The values of a Python tuple are assigned to an output SPL tuple by position, so the first value in the Python tuple is assigned to the first attribute in the SPL tuple:
# SPL input schema: tuple<int32 x, float64 y>
# SPL output schema: tuple<int32 x, float64 y, float32 z>
@spl.map(style='position')
def myfunc(a,b):
return (a,b,a+b)
# The SPL output will be:
# All values explictly set by returned Python tuple
# based on the x,y values from the input tuple
# x is set to: x
# y is set to: y
# z is set to: x+y
The returned tuple may be sparse, any attribute value in the tuple
that is None
will be set to their SPL default or copied from
a matching attribute in the input tuple
(same name and type,
or same name and same type as the underlying type of an output attribute
with an optional type),
depending on the operator kind:
# SPL input schema: tuple<int32 x, float64 y>
# SPL output schema: tuple<int32 x, float64 y, float32 z>
@spl.map(style='position')
def myfunc(a,b):
return (a,None,a+b)
# The SPL output will be:
# x is set to: x (explictly set by returned Python tuple)
# y is set to: y (set by matching input SPL attribute)
# z is set to: x+y
When a returned tuple has fewer values than attributes in the SPL output schema the attributes not set by the Python function will be set to their SPL default or copied from a matching attribute in the input tuple (same name and type, or same name and same type as the underlying type of an output attribute with an optional type), depending on the operator kind:
# SPL input schema: tuple<int32 x, float64 y>
# SPL output schema: tuple<int32 x, float64 y, float32 z>
@spl.map(style='position')
def myfunc(a,b):
return a,
# The SPL output will be:
# x is set to: x (explictly set by returned Python tuple)
# y is set to: y (set by matching input SPL attribute)
# z is set to: 0 (default int32 value)
When a returned tuple has more values than attributes in the SPL output schema then the additional values are ignored:
# SPL input schema: tuple<int32 x, float64 y>
# SPL output schema: tuple<int32 x, float64 y, float32 z>
@spl.map(style='position')
def myfunc(a,b):
return (a,b,a+b,a/b)
# The SPL output will be:
# All values explictly set by returned Python tuple
# based on the x,y values from the input tuple
# x is set to: x
# y is set to: y
# z is set to: x+y
#
# The fourth value in the tuple a/b = x/y is ignored.
Python dictionary¶
A Python dictionary is converted to an SPL tuple for submission to
the associated output port. An SPL attribute is set from the
dictionary if the dictionary contains a key equal to the attribute
name. The value is used to set the attribute, unless the value is
None
.
If the value in the dictionary is None
, or no matching key exists,
then the attribute value is set to its SPL default or copied from
a matching attribute in the input tuple (same name and type,
or same name and same type as the underlying type of an output attribute
with an optional type), depending on the operator kind.
Any keys in the dictionary that do not map to SPL attribute names are ignored.
Python list¶
When a list is returned, each value is converted to an SPL tuple and submitted to the output port, in order of the list starting with the first element (position 0). If the list contains None at an index then no SPL tuple is submitted for that index.
The list must only contain Python tuples, dictionaries or None. The list can contain a mix of valid values.
The list may be empty resulting in no tuples being submitted.
Module contents¶
Functions
Is a module being loaded by |
|
Decorator to ignore a Python function. |
Classes
Primitive operator super class. |
|
Decorator that creates a filter SPL operator from a callable class or function. |
|
Creates an SPL operator with a single input port. |
|
Declare an input port and its processor method. |
|
Decorator to create a map SPL operator from a callable class or function. |
|
Creates an SPL primitive operator with an arbitrary number of input ports and output ports. |
|
Create a source SPL operator from an iterable. |
-
class
streamsx.spl.spl.
source
(docpy=True)¶ Bases:
object
Create a source SPL operator from an iterable. The resulting SPL operator has a single output port.
When decorating a class the class must be iterable having an
__iter__
function. When the SPL operator is invoked an instance of the class is created and an iteration is created usingiter(instance)
.When decoratiing a function the function must have no parameters and must return an iterable or iteration. When the SPL operator is invoked the function is called and an iteration is created using
iter(value)
wherevalue
is the return of the function.For each value in the iteration SPL zero or more tuples are submitted to the output port, derived from the value, see Submission of SPL tuples from Python.
If the iteration completes then no more tuples are submitted and a window punctuation mark followed by final punctuation mark are submitted to the output port.
Example definition:
@spl.source() class Range(object): def __init__(self, stop, start=0): self.start = start self.stop = stop def __iter__(self): return zip(range(self.start, self.stop))
Example SPL invocation:
stream<int32 seq> R = Range() { param stop: 100; }
If
__iter__
or__next__
block then shutdown, checkpointing or consistent region processing may be delayed. Having__next__
returnNone
(no available tuples) or tuples to submit will allow such processing to proceed.A shutdown
threading.Event
is available throughstreamsx.ec.shutdown()
which becomes set when a shutdown of the processing element has been requested. This event my be waited on to perform a sleep that will terminate upon shutdown.- Parameters
docpy – Copy Python docstrings into SPL operator model for SPLDOC.
Exceptions raised by
__iter__
and__next__
can be suppressed when this decorator wraps a class with context manager__enter__
and__exit__
methods.If
__exit__
returns a true value when called with an exception then the exception is suppressed.Suppressing an exception raised by
__iter__
results in the source producing an empty iteration. No tuples will be submitted.Suppressing an exception raised by
__next__
results in the source not producing any tuples for that invocation. Processing continues with a call to__next__
.Data conversion errors of the value returned by
__next__
can also be suppressed by__exit__
. If__exit__
returns a true value when called with the exception then the exception is suppressed and the value that caused the exception is not submitted as an SPL tuple.
-
class
streamsx.spl.spl.
map
(style=None, docpy=True)¶ Bases:
object
Decorator to create a map SPL operator from a callable class or function.
Creates an SPL operator with a single input port and a single output port. For each tuple on the input port the callable is called passing the contents of the tuple.
The value returned from the callable results in zero or more tuples being submitted to the operator output port, see Submission of SPL tuples from Python.
Example definition:
@spl.map() class AddSeq(object): """Add a sequence number as the last attribute.""" def __init__(self): self.seq = 0 def __call__(self, *tuple_): id = self.seq self.seq += 1 return tuple_ + (id,)
Example SPL invocation:
stream<In, tuple<uint64 seq>> InWithSeq = AddSeq(In) { }
- Parameters
style – How the SPL tuple is passed into Python callable or function, see Processing SPL tuples in Python.
docpy – Copy Python docstrings into SPL operator model for SPLDOC.
Exceptions raised by
__call__
can be suppressed when this decorator wraps a class with context manager__enter__
and__exit__
methods. If__exit__
returns a true value when called with the exception then the exception is suppressed and the tuple that caused the exception is dropped.Data conversion errors of the value returned by
__call__
can also be suppressed by__exit__
. If__exit__
returns a true value when called with the exception then the exception is suppressed and the value that caused the exception is not submitted as an SPL tuple.
-
class
streamsx.spl.spl.
filter
(style=None, docpy=True)¶ Bases:
object
Decorator that creates a filter SPL operator from a callable class or function.
A filter SPL operator has a single input port and one mandatory and one optional output port. The schema of each output port must match the input port. For each tuple on the input port the callable is called passing the contents of the tuple. if the function returns a value that evaluates to True then it is submitted to mandatory output port 0. Otherwise it it submitted to the second optional output port (1) or discarded if the port is not specified in the SPL invocation.
- Parameters
style – How the SPL tuple is passed into Python callable or function, see Processing SPL tuples in Python.
docpy – Copy Python docstrings into SPL operator model for SPLDOC.
Example definition:
@spl.filter() class AttribThreshold(object): """ Filter based upon a single attribute being above a threshold. """ def __init__(self, attr, threshold): self.attr = attr self.threshold = threshold def __call__(self, **tuple_): return tuple_[self.attr] > self.threshold:
Example SPL invocation:
stream<rstring id, float64 voltage> Sensors = ... stream<Sensors> InterestingSensors = AttribThreshold(Sensors) { param attr: "voltage"; threshold: 225.0; }
Exceptions raised by
__call__
can be suppressed when this decorator wraps a class with context manager__enter__
and__exit__
methods. If__exit__
returns a true value when called with the exception then the expression is suppressed and the tuple that caused the exception is dropped.
-
class
streamsx.spl.spl.
for_each
(style=None, docpy=True)¶ Bases:
object
Creates an SPL operator with a single input port.
A SPL operator with a single input port and no output ports. For each tuple on the input port the decorated callable is called passing the contents of the tuple.
Example definition:
@spl.for_each() def PrintTuple(*tuple_): """Print each tuple to standard out.""" print(tuple_, flush=True)
Example SPL invocation:
() as PT = PrintTuple(SensorReadings) { }
Example definition with handling window punctuations:
@spl.for_each(style='position') class PrintPunct(object): def __init__(self): pass def __call__(self, value): assert value > 0 def on_punct(self): print('window marker received')
Note
Punctuation marks are in-band signals that are inserted between tuples in a stream. Window punctuations are inserted into a stream that are related to the semantics of the operator. One example is the
aggregate()
, which inserts a window marker into the output stream after each aggregation.- Parameters
style – How the SPL tuple is passed into Python callable, see Processing SPL tuples in Python.
docpy – Copy Python docstrings into SPL operator model for SPLDOC.
Exceptions raised by
__call__
can be suppressed when this decorator wraps a class with context manager__enter__
and__exit__
methods. If__exit__
returns a true value when called with the exception then the expression is suppressed and the tuple that caused the exception is ignored.Supports handling window punctuation markers in the Sink operator in
on_punct
method (new in version 1.16).
-
class
streamsx.spl.spl.
PrimitiveOperator
¶ Bases:
object
Primitive operator super class. Classes decorated with @spl.primitive_operator must extend this class if they have one or more output ports. This class provides the submit method to submit tuples to specified otuput port.
New in version 1.8.
-
all_ports_ready
()¶ Notifcation that the operator can submit tuples.
Called when the primitive operator can submit tuples using
submit()
. An operator must not submit tuples until this method is called or until a port processing method is called.Any implementation must not block. A typical use is to start threads that submit tuples.
An implementation must return a value that allows the SPL runtime to determine when an operator completes. An operator completes, and finalizes its output ports when:
All input ports (if any) have been finalized.
All background processing is complete.
The return from
all_ports_ready
defines when background processing, such as threads started byall_ports_ready
, is complete. The value is one of:A value that evaluates to False - No background processing exists.
A value that evaluates to True - Background processing exists and never completes. E.g. a source operator that processes real time events.
A callable - Background processing is complete when the callable returns. The SPL runtime invokes the callable once (passing no arguments) when the method returns background processing is assumed to be complete.
For example if an implementation starts a single thread then Thread.join is returned to complete the operator when the thread completes:
def all_ports_ready(self): submitter = threading.Thread(target=self._find_and_submit_data) submitter.start() return submitter.join def _find_and_submit_data(self): ...
- Returns
Value indicating active background processing.
This method implementation does nothing and returns
None
.
-
submit
(port_id, tuple_)¶ Submit a tuple to the output port.
The value to be submitted (
tuple_
) can be aNone
(nothing will be submitted),tuple
,dict` or ``list
of those types. For details on how thetuple_
is mapped to an SPL tuple see Submission of SPL tuples from Python.- Parameters
port_id – Identifier of the port specified in the
output_ports
parameter of the@spl.primitive_operator
decorator.tuple_ – Tuple (or tuples) to be submitted to the output port.
-
submit_punct
(port_id)¶ Submit a window punctuation marker to the output port.
Note
Punctuation marks are in-band signals that are inserted between tuples in a stream. Window punctuations are inserted into a stream that are related to the semantics of the operator. One example is the
aggregate()
, which inserts a window marker into the output stream after each aggregation.- Parameters
port_id – Identifier of the port specified in the
output_ports
parameter of the@spl.primitive_operator
decorator.
New in version 1.16.
-
-
class
streamsx.spl.spl.
input_port
(style=None)¶ Bases:
object
Declare an input port and its processor method.
Instance methods within a class decorated by
spl.primitive_operator
declare input ports by decorating methods with this decorator.Each tuple arriving on the input port will result in a call to the processor method passing the stream tuple converted to a Python representation depending on the style. The style is determined by the method signature or the style parameter, see Processing SPL tuples in Python.
The order of the methods within the class define the order of the ports, so the first port is the first method decorated with input_port.
- Parameters
style – How the SPL tuple is passed into the method, see Processing SPL tuples in Python.
New in version 1.8.
-
class
streamsx.spl.spl.
primitive_operator
(output_ports=None, docpy=True)¶ Bases:
object
Creates an SPL primitive operator with an arbitrary number of input ports and output ports.
Input ports are declared by decorating an instance method with
input_port()
. The method is the process method for the input port and is called for each tuple that arrives at the port. The order of the decorated process methods defines the order of the ports in the SPL operator, with the first process method being the first port at index zero.Output ports are declared by the
output_ports
parameter which is set to alist
of port identifiers. The port identifiers are arbitrary but must be hashable. Port identifiers allow the ability to submit tuples “logically’ rather than through a port index. Typically a port identifier will be a str or an enum. The size of the list defines the number of output ports with the first identifier in the list coresponding to the first output port of the operator at index zero. If the list is empty or not set then the operator has no output ports.Tuples are submitted to an output port using
submit()
.When an operator has output ports it must be a sub-class of
PrimitiveOperator
which provides thesubmit()
method and the ports ready notification mechanismall_ports_ready()
.Example definition of an operator with a single input port and two output ports:
@spl.primitive_operator(output_ports=['MATCH', 'NEAR_MATCH']) class SelectCustomers(spl.PrimitiveOperator): """ Score customers using a model. Customers that are a good match are submitted to port 0 ('MATCH') while customers that are a near match are submitted to port 1 ('NEAR_MATCH'). Customers that are not a good or near match are not submitted to any port. """ def __init__(self, match, near_match): self.match = match self.near_match = near_match @spl.input_port() def customers(self, **tuple_): customer_score = self.score(tuple_) if customer_score >= self.match: self.submit('MATCH', tuple_) elif customer_score >= self.near_match: self.submit('NEAR_MATCH', tuple_) def score(self, **customer): # Actual model scoring omitted score = ... return score
Example SPL invocation:
(stream<Customers> MakeOffer; stream<Customers> ImproveOffer>) = SelectCustomers(Customers) { param match: 0.9; near_match: 0.8; }
Example definition of an operator with punctuation handling:
@spl.primitive_operator(output_ports=['A']) class SimpleForwarder(spl.PrimitiveOperator): def __init__(self): pass @spl.input_port() def port0(self, *t): self.submit('A', t) def on_punct(self): self.submit_punct('A')
Supports handling window punctuation markers in the primitive operator in
on_punct
method (new in version 1.16).Note
Punctuation marks are in-band signals that are inserted between tuples in a stream. Window punctuations are inserted into a stream that are related to the semantics of the operator. One example is the
aggregate()
, which inserts a window marker into the output stream after each aggregation.- Parameters
output_ports (list) – List of identifiers for output ports.
docpy – Copy Python docstrings into SPL operator model for SPLDOC.
New in version 1.8.
-
streamsx.spl.spl.
extracting
()¶ Is a module being loaded by
spl-python-extract
.This can be used by modules defining SPL primitive operators using decorators such as
@spl.map
, to avoid runtime behavior. Typically not importing modules that are not available locally. The extraction script loads the module to determine method signatures and thus does not invoke any methods.For example if an SPL toolkit with primitive operators requires a package
extras
and is usingopt/python/streams/requirements.txt
to include it, then loading it at extraction time can be avoided by:from streamsx.spl import spl def spl_namespace(): return 'myns.extras' if not spl.extracting(): import extras @spl.map(): def myextras(*tuple_): return extras.process(tuple_)
New in version 1.11.
-
streamsx.spl.spl.
ignore
(wrapped)¶ Decorator to ignore a Python function.
If a Python callable is decorated with
@spl.ignore
then function is ignored byspl-python-extract.py
.- Parameters
wrapped – Function that will be ignored.