streamsx.topology.topology¶
Streaming application definition.
Overview¶
IBM Streams is an advanced analytic platform that allows user-developed applications to quickly ingest, analyze and correlate information as it arrives from thousands of real-time sources. Streams can handle very high data throughput rates, millions of events or messages per second.
With this API Python developers can build streaming applications that can be executed using IBM Streams, including the processing being distributed across multiple computing resources (hosts or machines) for scalability.
Topology¶
A Topology
declares a graph of streams and operations against
tuples (data items) on those streams.
After being declared, a Topology is submitted to be compiled into a Streams application bundle (sab file) and then executed. The sab file is a self contained bundle that can be executed in a distributed Streams instance either using the Streaming Analytics service on IBM Cloud or an on-premise IBM Streams installation.
The compilation step invokes the Streams compiler to produce a bundle. This effectively, from a Python point of view, produces a runnable version of the Python topology that includes application specific Python C extensions to optimize performance.
The bundle also includes any required Python packages or modules
that were used in the declaration of the application, excluding
ones that are in a directory path containing site-packages
.
The Python standard package tool pip
uses a directory structure
including site-packages
when installing packages. Packages installed
with pip
can be included in the bundle with
add_pip_package()
when using a build service.
This avoids the requirement to have packages be preinstalled in cloud environments.
Local Python packages and modules containing callables used in transformations
such as map()
are copied into the bundle from their
local location. The addition of local packages to the bundle can be controlled
with Topology.include_packages
and
Topology.exclude_packages
.
The Streams runtime distributes the application’s operations across the resources available in the instance.
Note
Topology represents a declaration of a streaming application that will be executed by a Streams instance as a job, either using the Streaming Analytics service on IBM Cloud or an on-premises distributed instance. Topology does not represent a running application, so an instance of Stream class does not contain the tuples, it is only a declaration of a stream.
Stream¶
A Stream
can be an infinite sequence of tuples, such as a stream for a traffic flow sensor.
Alternatively, a stream can be finite, such as a stream that is created from the contents of a file.
When a streams processing application contains infinite streams, the application runs continuously without ending.
A stream has a schema that defines the type of each tuple on the stream. The schema for a stream is either:
Python
- A tuple may be any Python object. This is the default when the schema is not explictly or implicitly set.String
- Each tuple is a Unicode string.Binary
- Each tuple is a blob.Json
- Each tuple is a Python dict that can be expressed as a JSON object.Structured - A stream that has a structured schema of a ordered list of attributes, with each attribute having a fixed type (e.g. float64 or int32) and a name. The schema of a structured stream is defined using typed named tuple or
StreamSchema
.
A stream’s schema is implictly dervied from type hints declared for the callable
of the transform that produces it. For example readings defined as follows would have a structured schema matching SensorReading
class SensorReading(typing.NamedTuple):
sensor_id: str
ts: int
reading: float
def reading_from_json(value:dict) -> SensorReading:
return SensorReading(value['id'], value['timestamp'], value['reading'])
topo = Topology()
json_readings = topo.source(HttpReadings()).as_json()
readings = json_readings.map(reading_from_json)
Deriving schemas from type hints can be disabled by setting the topology’s
type_checking
attribute to false, for example this would change readings
in the previous example to have generic Python object schema Python
topo = Topology()
topo.type_checking = False
Stream processing¶
Callables¶
A stream is processed to produce zero or more transformed streams, such as filtering a stream to drop unwanted tuples, producing a stream that only contains the required tuples.
Streaming processing is per tuple based, as each tuple is submitted to a stream consuming operators have their processing logic invoked for that tuple.
A functional operator is declared by methods on Stream
such as map()
which
maps the tuples on its input stream to tuples on its output stream. Stream uses a functional model
where each stream processing operator is defined in terms a Python callable that is invoked passing
input tuples and whose return defines what output tuples are submitted for downstream processing.
The Python callable used for functional processing in this API may be:
A Python lambda function.
A Python function.
An instance of a Python callable class.
For example a stream words
containing only string objects can be
processed by a filter()
using a lambda function:
# Filter the stream so it only contains words starting with py
pywords = words.filter(lambda word : word.startswith('py'))
When a callable has type hints they are used to:
define the schema of the resulting transformation, see Stream.
type checking the correctness of the transformation at topology declaration time.
For example if the callable defining the source had type hints that indicated
it was an iterator of str
objects then the schema of the resultant stream
would be String
. If this
source stream then underwent a Stream.map()
transform with a callable
that had a type hint for its argument, a check is made to ensure
that the type of the argument is compatible with str
.
Type hints are maintained through transforms regardless of resultant schema.
For example a transform that has a return type hint of int
defines
the schema as Python
,
but the type hint is retained even though the schema is generic. Thus an
error is raised at topology declaration time if a downstream transformation
uses a callable with a type hint that is incompatible with being passed an int
.
How type hints are used is specific to each transformation, such as
source()
, map()
, filter()
etc.
Type checking can be disabled by setting the topology’s type_checking
attribute to false.
When a callable is a lambda or defined inline (defined in the main Python script,
a notebook or an interactive session) then a serialized copy of its definition becomes part of the
topology. The supported types of captured globals for these callables is limited to
avoid increasing the size of the application and serialization failures due non-serializable
objects directly or indirectly referenced from captured globals. The supported types of captured globals
are constants (int
, str
, float
, bool
, bytes
, complex
), modules, module attributes (e.g. classes, functions and variables
defined in a module), inline classes and functions. If a lambda or inline callable causes an exception due to unsupported global
capture then moving it to its own module is a solution.
Due to Python bug 36697 a lambda or inline callable can
incorrect capture a global variable. For example an inline class using a attribute of self.model
will incorrectly capture the global model
even if the global variable model
is never used within the class.
To workaround this bug use attribute or variable names that do not shadow global variables
(e.g. self._model
).
Due to issue 2336 an inline class using super()
will cause an AttributeError
at runtime. Workaround is to call the super class’s method directly, for example replace this code:
class A(X):
def __init__(self):
super().__init__()
with:
class A(X):
def __init__(self):
X.__init__(self)
or move the class to a module.
Stateful operations¶
Use of a class instance allows the operation to be stateful by maintaining state in instance attributes across invocations.
Note
For support with consistent region or checkpointing instances should ensure that the object’s state can be pickled. See https://docs.python.org/3.5/library/pickle.html#handling-stateful-objects
Initialization and shutdown¶
Execution of a class instance effectively run in a context manager so that an instance’s __enter__
method is called when the processing element containing the instance is initialized
and its __exit__
method called when the processing element is stopped. To take advantage of this
the class must define both __enter__
and __exit__
methods.
Note
Since an instance of a class is passed to methods such as
map()
__init__
is only called when the topology is declared, not at runtime.
Initialization at runtime, such as opening connections, occurs through the __enter__
method.
Example of using __enter__
to create custom metrics:
import streamsx.ec as ec
class Sentiment(object):
def __init__(self):
pass
def __enter__(self):
self.positive_metric = ec.CustomMetric(self, "positiveSentiment")
self.negative_metric = ec.CustomMetric(self, "negativeSentiment")
def __exit__(self, exc_type, exc_value, traceback):
pass
def __call__(self):
pass
When an instance defines a valid __exit__
method then it will be called with an exception when:
the instance raises an exception during processing of a tuple
a data conversion exception is raised converting a value to an structured schema tuple or attribute
If __exit__
returns a true value then the exception is suppressed and processing continues, otherwise the enclosing processing element will be terminated.
Note
The __exit__
method requires four parameters, whereas the last three parameters are set when exception is raised only:
def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
print(str(exc_type.__name__))
...
Tuple semantics¶
Python objects on a stream may be passed by reference between callables (e.g. the value returned by a map callable may be passed by reference to a following filter callable). This can only occur when the functions are executing in the same PE (process). If an object is not passed by reference a deep-copy is passed. Streams that cross PE (process) boundaries are always passed by deep-copy.
Thus if a stream is consumed by two map and one filter callables in the same PE they may receive the same object reference that was sent by the upstream callable. If one (or more) callable modifies the passed in reference those changes may be seen by the upstream callable or the other callables. The order of execution of the downstream callables is not defined. One can prevent such potential non-deterministic behavior by one or more of these techniques:
Passing immutable objects
Not retaining a reference to an object that will be submitted on a stream
Not modifying input tuples in a callable
Using copy/deepcopy when returning a value that will be submitted to a stream.
Applications cannot rely on pass-by reference, it is a performance optimization that can be made in some situations when stream connections are within a PE.
Application log and trace¶
IBM Streams provides application trace and log services which are accesible through standard Python loggers from the logging module.
SPL operators¶
In addition an application declared by Topology can include stream processing defined by SPL primitive or composite operators. This allows reuse of adapters and analytics provided by IBM Streams, open source and third-party SPL toolkits.
See streamsx.spl.op
Module contents¶
Module contents¶
Classes
Pending stream connection. |
|
Defines how tuples are routed to channels in a parallel region. |
|
Termination of a Stream. |
|
The Stream class is the primary abstraction within a streaming application. |
|
Connection mode between a subscriber and matching publishers. |
|
The Topology class is used to define data sources, and is passed as a parameter when submitting an application. |
|
The View class provides access to a continuously updated sampling of data items on a |
|
Declaration of a window of tuples on a Stream. |
-
class
streamsx.topology.topology.
Routing
¶ Bases:
enum.Enum
Defines how tuples are routed to channels in a parallel region.
A parallel region is started by
parallel()
and ended withend_parallel()
orfor_each()
.-
BROADCAST
= 0¶ Tuples are routed to every channel in the parallel region.
-
HASH_PARTITIONED
= 3¶ Tuples are routed based upon a hash value so that tuples with the same hash and thus same value are always routed to the same channel. When a hash function is specified it is passed the tuple and the return value is the hash. When no hash function is specified then hash(tuple) is used.
Each tuple is only sent to a single channel.
Warning
A consistent hash function is required to guarantee that a tuple with the same value is always routed to the same channel. hash() is not consistent in that for types str, bytes and datetime objects are “salted” with an unpredictable random value (Python 3.5). Thus if the processing element is restarted channel routing for a hash based upon a str, bytes or datetime will change. In addition code executing in the channels can see a different hash value to other channels and the execution that routed the tuple due to being in different processing elements.
-
KEY_PARTITIONED
= 2¶ Tuples are routed based upon specified partitioning keys. The splitter routes tuples that have the same values for these keys (list of attributes) to the same parallel channel. The keys must exist in the tuple type that is specified for the input stream. Requires a structured stream
StreamSchema
or named tuple as input stream.Each tuple is only sent to a single channel.
-
ROUND_ROBIN
= 1¶ Tuples are routed to maintain an even distribution of tuples to the channels.
Each tuple is only sent to a single channel.
-
-
class
streamsx.topology.topology.
SubscribeConnection
¶ Bases:
enum.Enum
Connection mode between a subscriber and matching publishers.
New in version 1.9.
See also
-
Buffered
= 1¶ Buffered connection between a subscriber and and matching publishers.
With a buffered connection tuples from publishers are placed in a single queue owned by the subscriber. This allows a slower subscriber to handle brief spikes in tuples from publishers.
A subscriber can fully isolate itself from matching publishers by adding a
CongestionPolicy
that drops tuples when the queue is full. In this case when the subscriber is not able to keep up with the tuple rate from all matching subscribers it will have a minimal effect on matching publishers.
-
Direct
= 0¶ Direct connection between a subscriber and and matching publishers.
When connected directly a slow subscriber will cause back-pressure against the publishers, forcing them to slow tuple processing to the slowest publisher.
-
-
class
streamsx.topology.topology.
Topology
(name=None, namespace=None, files=None)¶ Bases:
object
The Topology class is used to define data sources, and is passed as a parameter when submitting an application. Topology keeps track of all sources, sinks, and transformations within your application.
Submission of a Topology results in a Streams application that has the name namespace::name.
- Parameters
name (str) – Name of the topology. Defaults to a name dervied from the calling evironment if it can be determined, otherwise a random name.
namespace (str) – Namespace of the topology. Defaults to a name dervied from the calling evironment if it can be determined, otherwise a random name.
-
include_packages
¶ Python package names to be included in the built application. Any package in this list is copied into the bundle and made available at runtime to the Python callables used in the application. By default a
Topology
will automatically discover which packages and modules are required to be copied, this field may be used to add additional packages that were not automatically discovered. See alsoadd_pip_package()
. Package names in include_packages take precedence over package names in exclude_packages.- Type
set[str]
-
exclude_packages
¶ Python top-level package names to be excluded from the built application. Excluding a top-level packages excludes all sub-modules at any level in the package, e.g. sound excludes sound.effects.echo. Only the top-level package can be defined, e.g. sound rather than sound.filters. Behavior when adding a module within a package is undefined. When compiling the application using Anaconda this set is pre-loaded with Python packages from the Anaconda pre-loaded set.
- Type
set[str]
-
type_checking
¶ Set to false to disable type checking, defaults to
True
.- Type
bool
-
name_to_runtime_id
¶ Optional callable that returns a runtime identifier for a name. Used to override the default mapping of a name into a runtime identifer. It will be called with name and returns a valid SPL identifier or
None
. IfNone
is returned then the default mapping for name is used. Defaults toNone
indicating the default mapping is used. SeeStream.runtime_id
.
All declared streams in a Topology are available through their name using
topology[name]
. The stream’s name is defined byStream.name()
and will differ from the name parameter passed when creating the stream if the application uses duplicate names.Changed in version 1.11: Declared streams available through
topology[name]
.-
add_file_dependency
(path, location)¶ Add a file or directory dependency into an Streams application bundle.
Ensures that the file or directory at path on the local system will be available at runtime.
The file will be copied and made available relative to the application directory. Location determines where the file is relative to the application directory. Two values for location are supported etc and opt. The runtime path relative to application directory is returned.
The copy is made during the submit call thus the contents of the file or directory must remain availble until submit returns.
For example calling
add_file_dependency('/tmp/conf.properties', 'etc')
will result in contents of the local file conf.properties being available at runtime at the path application directory/etc/conf.properties. This call returnsetc/conf.properties
.Python callables can determine the application directory at runtime with
get_application_directory()
. For example the path above at runtime isos.path.join(streamsx.ec.get_application_directory(), 'etc', 'conf.properties')
- Parameters
path (str) – Path of the file on the local system.
location (str) – Location of the file in the bundle relative to the application directory.
- Returns
Path relative to application directory that can be joined at runtime with
get_application_directory
.- Return type
str
New in version 1.7.
-
add_pip_package
(requirement, name=None)¶ Add a Python package dependency for this topology.
If the package defined by the requirement specifier is not pre-installed on the build system then the package is installed using pip and becomes part of the Streams application bundle (sab file). The package is expected to be available from pypi.org.
If the package is already installed on the build system then it is not added into the sab file. The assumption is that the runtime hosts for a Streams instance have the same Python packages installed as the build machines. This is always true for IBM Cloud Pak for Data and the Streaming Analytics service on IBM Cloud.
The project name extracted from the requirement specifier is added to
exclude_packages
to avoid the package being added by the dependency resolver. Thus the package should be added before it is used in any stream transformation.When an application is run with trace level
info
the available Python packages on the running system are listed to application trace. This includes any packages added by this method.Example:
topo = Topology() # Add dependency on pint package # and astral at version 0.8.1 topo.add_pip_package('pint') topo.add_pip_package('astral==0.8.1')
Example for packages not provided on pypi.org:
topo = Topology() # Add dependency on package using whl file topo.add_pip_package(requirement='https://github.com/myrepo/raw/mydir/mypkg-1.0-py3-none-any.whl', name='mypkg')
- Parameters
requirement (str) – Package requirements specifier.
name (str) – Name added to
exclude_packages
. Set this argument when adding URLs only.
Warning
Only supported when using the build service with a Streams instance in Cloud Pak for Data or Streaming Analytics service on IBM Cloud.
Note
Installing packages through pip is preferred to the automatic dependency checking performed on local modules. This is because pip will perform a full install of the package including any dependent packages and additional files, such as shared libraries, that might be missed by dependency discovery.
New in version 1.9.
-
property
checkpoint_period
¶ Enable checkpointing for the topology, and define the checkpoint period.
When checkpointing is enabled, the state of all stateful operators is saved periodically. If the operator restarts, its state is restored from the most recent checkpoint.
The checkpoint period is the frequency at which checkpoints will be taken. It can either be a
timedelta
value or a floating point value in seconds. It must be at 0.001 seconds or greater.A stateful operator is an operator whose callable is an instance of a Python callable class.
Examples:
# Create a topology that will checkpoint every thirty seconds topo = Topology() topo.checkpoint_period = 30.0
# Create a topology that will checkpoint every two minutes topo = Topology() topo.checkpoint_period = datetime.timedelta(minutes=2)
New in version 1.11.
-
create_submission_parameter
(name, default=None, type_=None)¶ Create a submission parameter.
A submission parameter is a handle for a value that is not defined until topology submission time. Submission parameters enable the creation of reusable topology bundles.
A submission parameter has a name. The name must be unique within the topology.
The returned parameter is a callable. Prior to submitting the topology, while constructing the topology, invoking it returns
None
.After the topology is submitted, invoking the parameter within the executing topology returns the actual submission time value (or the default value if it was not set at submission time).
Submission parameters may be used within functional logic. e.g.:
threshold = topology.create_submission_parameter('threshold', 100); # s is some stream of integers s = ... s = s.filter(lambda v : v > threshold())
Submission parameters may be used to specify the degree of parallelism. e.g.:
stv_channels = topo.create_submission_parameter('num_channels', type_=int) s = topo.source(range(67)).set_parallel(stv_channels) s = s.filter(lambda v : v % stv_channels() == 0) s = s.end_parallel() jc = JobConfig() jc.submission_parameters['num_channels'] = 3 jc.add(cfg)
Note
The parameter (value returned from this method) is only supported within a lambda expression or a callable that is not a function.
The default type of a submission parameter’s value is a str. When a default is specified the type of the value matches the type of the default.
If default is not set, then the type can be set with type_.
The types supported are
str
,int
,float
andbool
.Topology submission behavior when a submission parameter lacking a default value is created and a value is not provided at submission time is defined by the underlying topology execution runtime.
Submission fails for contexts
DISTRIBUTED
,STANDALONE
, andSTREAMING_ANALYTICS_SERVICE
.
- Parameters
name (str) – Name for submission parameter.
default – Default parameter when submission parameter is not set.
type_ – Type of parameter value when default is not set. Supported values are str, int, float and bool.
New in version 1.9.
-
property
name
¶ Name of the topology.
- Returns
Name of the topology.
- Return type
str
-
property
namespace
¶ Namespace of the topology.
- Returns
Namespace of the topology.
- Return type
str
-
source
(func, name=None)¶ Declare a source stream that introduces tuples into the application.
Typically used to create a stream of tuple from an external source, such as a sensor or reading from an external system.
Tuples are obtained from an iterator obtained from the passed iterable or callable that returns an iterable.
Each tuple that is not None from the iterator is present on the returned stream.
Each tuple is a Python object and must be picklable to allow execution of the application to be distributed across available resources in the Streams instance.
If the iterator’s
__iter__
or__next__
block then shutdown, checkpointing or consistent region processing may be delayed. Having__next__
returnNone
(no available tuples) or tuples to submit will allow such processing to proceed.A shutdown
threading.Event
is available throughstreamsx.ec.shutdown()
which becomes set when a shutdown of the processing element has been requested. This event my be waited on to perform a sleep that will terminate upon shutdown.- Parameters
func (callable) – An iterable or a zero-argument callable that returns an iterable of tuples.
name (str) – Name of the stream, defaults to a generated name.
Exceptions raised by
func
or its iterator will cause its processing element will terminate.If
func
is a callable object then it may suppress exceptions by return a true value from its__exit__
method.Suppressing an exception raised by
func.__iter__
causes the source to be empty, no tuples are submitted to the stream.Suppressing an exception raised by
__next__
on the iterator results in no tuples being submitted for that call to__next__
. Processing continues with calls to__next__
to fetch subsequent tuples.- Returns
A stream whose tuples are the result of the iterable obtained from func.
- Return type
Type hints
Type hints on func define the schema of the returned stream, defaulting to
Python
if no type hints are present.For example
s_sensor
has a type hint that defines it as an iterable ofSensorReading
instances (typed named tuples). Thus readings has a structured schema matchingSensorReading
def s_sensor() -> typing.Iterable[SensorReading] : ... topo = Topology() readings = topo.source(s_sensor)
Simple examples
Finite constant source stream containing two tuples
Hello
andWorld
:topo = Topology() hw = topo.source(['Hello', 'World'])
Use of builtin range to produce a finite source stream containing 100 int tuples from 0 to 99:
topo = Topology() hw = topo.source(range(100))
Use of itertools.count to produce an infinite stream of int tuples:
import itertools topo = Topology() hw = topo.source(lambda : itertools.count())
Use of itertools to produce an infinite stream of tuples with a constant value and a sequence number:
import itertools topo = Topology() hw = topo.source(lambda : zip(itertools.repeat(), itertools.count()))
External system examples
Typically sources pull data in from external systems, such as files, REST apis, databases, message systems etc. Such a source will typically be implemented as class that when called returns an iterable.
To allow checkpointing of state standard methods
__enter__
and__exit__
are implemented to allow creation of runtime objects that cannot be persisted, for example a file handle.At checkpoint time state is preserved through standard pickling using
__getstate__
and (optionally)__setstate__
.Stateless source that polls a REST API every ten seconds to get a JSON object (dict) with current time details:
import requests import time class RestJsonReader(object): def __init__(self, url, period): self.url = url self.period = period self.session = None def __enter__(self): self.session = requests.Session() self.session.headers.update({'Accept': 'application/json'}) def __exit__(self, exc_type, exc_value, traceback): if self.session: self.session.close() self.session = None def __call__(self): return self def __iter__(self): return self def __next__(self): time.sleep(self.period) return self.session.get(self.url).json() def __getstate__(self): # Remove the session from the persisted state return {'url':self.url, 'period':self.period} def main(): utc_now = 'http://worldclockapi.com/api/json/utc/now' topo = Topology() times = topo.source(RestJsonReader(10, utc_now))
Warning
Source functions that use generators are not supported when checkpointing or within a consistent region. This is because generators cannot be pickled (even when using dill).
Changed in version 1.14: Type hints are used to define the returned stream schema.
-
property
streams
¶ Dict of all streams in the topology.
Key is the name of the stream, value is the corresponding
Stream
instance.The returned value is a shallow copy of current streams in this topology. This allows callers to iterate over the copy and perform operators that would add streams.
Note
Includes all streams created by composites and any internal streams created by topology.
New in version 1.14.
-
subscribe
(topic, schema=<CommonSchema.Python: <streamsx.topology.schema.StreamSchema object>>, name=None, connect=None, buffer_capacity=None, buffer_full_policy=None)¶ Subscribe to a topic published by other Streams applications. A Streams application may publish a stream to allow other Streams applications to subscribe to it. A subscriber matches a publisher if the topic and schema match.
By default a stream is subscribed as
Python
objects which connects to streams published to topic by Python Streams applications.Structured schemas are subscribed to using an instance of
StreamSchema
. A Streams application publishing structured schema streams may have been implemented in any programming language supported by Streams.JSON streams are subscribed to using schema
Json
. Each tuple on the returned stream will be a Python dictionary object created byjson.loads(tuple)
. A Streams application publishing JSON streams may have been implemented in any programming language supported by Streams.String streams are subscribed to using schema
String
. Each tuple on the returned stream will be a Python string object. A Streams application publishing string streams may have been implemented in any programming language supported by Streams.Subscribers can ensure they do not slow down matching publishers by using a buffered connection with a buffer full policy that drops tuples.
- Parameters
topic (str) – Topic to subscribe to.
schema (StreamSchema) – schema to subscribe to.
name (str) – Name of the subscribed stream, defaults to a generated name.
connect (SubscribeConnection) – How subscriber will be connected to matching publishers. Defaults to
Direct
connection.buffer_capacity (int) – Buffer capacity in tuples when connect is set to
Buffered
. Defaults to 1000 when connect is Buffered. Ignored when connect is None or Direct.buffer_full_policy (CongestionPolicy) – Policy when a pulished tuple arrives and the subscriber’s buffer is full. Defaults to Wait when connect is Buffered. Ignored when connect is None or Direct.
- Returns
A stream whose tuples have been published to the topic by other Streams applications.
- Return type
Changed in version 1.9: connect, buffer_capacity and buffer_full_policy parameters added.
-
class
streamsx.topology.topology.
Stream
(topology, oport, other=None)¶ Bases:
streamsx._streams._placement._Placement
,object
The Stream class is the primary abstraction within a streaming application. It represents a potentially infinite series of tuples which can be operated upon to produce another stream, as in the case of
map()
, or terminate a stream, as in the case offor_each()
.-
aliased_as
(name)¶ Create an alias of this stream.
Returns an alias of this stream with name name. When invocation of an SPL operator requires an
Expression
against an input port this can be used to ensure expression matches the input port alias regardless of the name of the actual stream.Example use where the filter expression for a
Filter
SPL operator usesIN
to access input tuple attributeseq
:s = ... s = s.aliased_as('IN') params = {'filter': op.Expression.expression('IN.seq % 4ul == 0ul')} f = op.Map('spl.relational::Filter', stream, params = params)
- Parameters
name (str) – Name for returned stream.
- Returns
Alias of this stream with
name
equal to name.- Return type
New in version 1.9.
-
as_json
(force_object=True, name=None)¶ Declares a stream converting each tuple on this stream into a JSON value.
The stream is typed as a
JSON stream
.Each tuple must be supported by JSONEncoder.
If force_object is True then each tuple that not a dict will be converted to a JSON object with a single key payload containing the tuple. Thus each object on the stream will be a JSON object.
If force_object is False then each tuple is converted to a JSON value directly using json package.
If this stream is already typed as a JSON stream then it will be returned (with no additional processing against it and force_object and name are ignored).
- Parameters
force_object (bool) – Force conversion of non dicts to JSON objects.
name (str) – Name of the resulting stream. When None defaults to a generated name.
New in version 1.6.1.
- Returns
Stream containing the JSON representations of tuples on this stream.
- Return type
-
as_string
(name=None)¶ Declares a stream converting each tuple on this stream into a string using str(tuple).
The stream is typed as a
string stream
.If this stream is already typed as a string stream then it will be returned (with no additional processing against it and name is ignored).
- Parameters
name (str) – Name of the resulting stream. When None defaults to a generated name.
New in version 1.6.
New in version 1.6.1: name parameter added.
- Returns
Stream containing the string representations of tuples on this stream.
- Return type
-
autonomous
()¶ Starts an autonomous region for downstream processing. By default IBM Streams processing is executed in an autonomous region where any checkpointing of operator state is autonomous (independent) of other operators.
This method may be used to end a consistent region by starting an autonomous region. This may be called even if this stream is in an autonomous region.
Autonomous is not applicable when a topology is submitted to a STANDALONE contexts and will be ignored.
New in version 1.6.
- Returns
Stream whose subsequent downstream processing is in an autonomous region.
- Return type
-
batch
(size)¶ Declares a tumbling window to support batch processing against this stream.
The number of tuples in the batch is defined by size.
If size is an
int
then it is the count of tuples in the batch. For example, withsize=10
each batch will nominally contain ten tuples. Thus processing against the returnedWindow
, such asaggregate()
will be executed every ten tuples against the last ten tuples on the stream. For example the first three aggregations would be against the first ten tuples on the stream, then the next ten tuples and then the third ten tuples, etc.If size is an datetime.timedelta then it is the duration of the batch using wallclock time. With a timedelta representing five minutes then the window contains any tuples that arrived in the last five minutes. Thus processing against the returned
Window
, such asaggregate()
will be executed every five minutes tuples against the batch of tuples arriving in the last five minutes on the stream. For example the first three aggregations would be against any tuples on the stream in the first five minutes, then the next five minutes and then minutes ten to fifteen. A batch can contain no tuples if no tuples arrived on the stream in the defined duration.For specifying the duration of the window with a submission parameter use
batchSeconds()
.Each tuple on the stream appears only in a single batch.
The number of tuples seen by processing against the returned window may be less than size (count or time based) when:
the stream is finite, the final batch may contain less tuples than the defined size,
the stream is in a consistent region, drain processing will complete the current batch without waiting for it to batch to reach its nominal size.
Examples:
# Create batches against stream s of 100 tuples each w = s.batch(size=100)
# Create a window size specified by submission parameter count = topo.create_submission_parameter('count', 100) w = s.batch(size=count)
# Create batches against stream s every five minutes w = s.batch(size=datetime.timedelta(minutes=5))
# Create a tumbling punctuation-based window w = s.batch('punct')
- Parameters
size (int|datetime.timedelta|submission parameter created by :py:meth:`Topology.create_submission_parameter`|’punct’) – The size of each batch, either an int to define the number of tuples or datetime.timedelta to define the duration of the batch or submission parameter created by
Topology.create_submission_parameter()
to define the number of tuples or string ‘punct’ to create a punctuation-based window.- Returns
Window allowing batch processing on this stream.
- Return type
New in version 1.11.
-
batchSeconds
(size)¶ Declares a tumbling window to support batch processing against this stream using a submission parameter created by
Topology.create_submission_parameter()
.The size of the window is defined by the parameter size in seconds.
- Parameters
size (submission parameter created by
Topology.create_submission_parameter()
) – The size of the window in seconds.
Examples:
# Create a tumbling window with submission parameter `time` and the default value 10 seconds time = topo.create_submission_parameter('time', 10) w = s.batchSeconds(time)
# Create a window with submission parameter `secs` and no default value time = topo.create_submission_parameter(name='secs', type_=int) w = s.batchSeconds(time)
- Returns
Window allowing batch processing on this stream.
- Return type
-
catch_exceptions
(exception_type='streams', tuple_trace=False, stack_trace=False)¶ When applied to a primitive operator, exceptions of the specified type that are thrown by the operator while processing a tuple are caught.
Note
You cannot use this on an operator without input streams.
Example using default values (tuple trace and stack trace disabled) and catch exceptions thrown by the Python primitive operator calling the
map()
transformation. This map callable raises a ValueError (“invalid literal for int() with base 10: ‘five’”) when processing the sixt tuple. Withcatch_exceptions()
applied the application is able to process all 10 tuples and does not stop processing.:from typing import NamedTuple class NumbersSchema(NamedTuple): num: int topo = Topology() str_stream = topo.source(['0','1','2','3','4','five','6','7','8','9']).as_string() num_stream = str_stream.map(lambda t: {'num': int(t)}, schema=NumbersSchema) num_stream.catch_exceptions() num_stream.print()
Example using the SPL operator Functor and enabled tuple trace:
topo = Topology() str_stream = topo.source(['0','1','2','3','4','five','6','7','8','9']).as_string() f = op.Map('spl.relational::Functor', s, schema='tuple<int64 num>') f.num = f.output('(int64) string') num_stream = f.stream num_stream.catch_exceptions(tuple_trace=True) num_stream.print()
- Parameters
exception_type (str) –
Indicates the type of exceptions to be caught by the run time environment. Supported options include:
none
: No exceptions of any type are caught.streams
: Only IBM® Streams exceptions are caught. This includes exceptions that are thrown by SPL native functions from the standard toolkit, other exceptions that extend from the C++ SPL::SPLRuntimeException, and exceptions that extend from the Java com.ibm.streams.operator.DataException (extending from java.lang.RuntimeException).std
: Both IBM Streams and standard exceptions are caught. For C++, standard exception means std::exception. For Java, standard exception means all checked exceptions that inherit from java.lang.Exception.all
: In C++, any thrown exception is caught. For Java, any checked and unchecked exception that inherits from java.lang.Exception is caught.
tuple_trace (bool) – Enables or disables the tracing of tuple data. Tracing of data can be enabled when tuples do not contain sensitive data and the data can show up into PE logs. Tuples are logged to the trace facility with the ERROR trace level.
stack_trace (bool) – Enables or disables the printout of the stack trace to the Streams trace facility. Stack traces are printed to the Streams trace facility with the trace level ERROR.
- Returns
Returns this stream.
- Return type
New in version 2.1.
-
property
category
¶ Category for this processing logic.
An arbitrary application label allowing grouping of application elements by category.
Assign categories based on common function. For example, database is a common category that you can use to group all database sinks in an application.
A category is not required and defaults to
None
meaning no assigned category.Streams console supports visualization based upon categories.
- Raises
TypeError – No directly associated processing logic.
Note
A category has no affect on the execution of the application.
New in version 1.9.
-
colocate
(others)¶ Colocate this processing logic with others.
Colocating processing logic requires execution in the same Streams processing element (operating system process).
When a job is submitted Streams may colocate (fuse) processing logic into the same processing element based upon flow analysis and current resource usage. This call instructs that this logic and others must be executed in the same processing element.
-
end_low_latency
()¶ Returns a Stream that is no longer guaranteed to run in the same process as the calling stream.
- Returns
Stream
-
end_parallel
()¶ Ends a parallel region by merging the channels into a single stream.
- Returns
Stream for which subsequent transformations are no longer parallelized.
- Return type
See also
-
filter
(func, non_matching=False, name=None)¶ Filters tuples from this stream using the supplied callable func.
For each stream tuple t on the stream
func(t)
is called, if the return evaluates toTrue
the tuple will be present on the returned stream, otherwise the tuple is filtered out.- Parameters
func – Filter callable that takes a single parameter for the stream tuple.
non_matching (bool) – Non-matching tuples are sent to a second optional output stream
name (str) – Name of the stream, defaults to a generated name.
If invoking
func
for a stream tuple raises an exception then its processing element will terminate. By default the processing element will automatically restart though tuples may be lost.If
func
is a callable object then it may suppress exceptions by return a true value from its__exit__
method. When an exception is suppressed no tuple is submitted to the filtered stream corresponding to the input tuple that caused the exception.Example with matching and non matching streams:
topo = Topology() s = topo.source(['Hello', 'World']) matches, non_matches = s.filter((lambda t : "Wor" in t), non_matching=True)
- Returns
A Stream containing tuples that have not been filtered out. The schema of the returned stream is the same as this stream’s schema. Optional second stream is returned for non matching tuples, if parameter non_matching is set to True.
- Return type
Type hints
The argument type hint on func is used (if present) to verify at topology declaration time that it is compatible with the type of tuples on this stream.
Note
Punctuation marks are in-band signals that are inserted between tuples in a stream. If sources or stream transforms insert window markers at all, and when they insert them depends on the source or the semantic of the stream transformation. One example is the
aggregate()
, which inserts a window marker into the output stream after each aggregation.The
filter()
punctuation mode is “preserving”. Incoming window punctuations are forwarded.
-
flat_map
(func=None, name=None)¶ Maps and flatterns each tuple from this stream into 0 or more tuples.
For each tuple on this stream
func(tuple)
is called. If the result is not None then the the result is iterated over with each value from the iterator that is not None will be submitted to the return stream.If the result is None or an empty iterable then no tuples are submitted to the returned stream.
- Parameters
func – A callable that takes a single parameter for the tuple. If not supplied then a function equivalent to
lambda tuple_ : tuple_
is used. This is suitable when each tuple on this stream is an iterable to be flattened.name (str) – Name of the flattened stream, defaults to a generated name.
If invoking
func
for a tuple on the stream raises an exception then its processing element will terminate. By default the processing element will automatically restart though tuples may be lost.If
func
is a callable object then it may suppress exceptions by return a true value from its__exit__
method. When an exception is suppressed no tuples are submitted to the flattened and mapped stream corresponding to the input tuple that caused the exception.Example: For a list of dict the
flat_map
emits n tuples for each input tuple received, with n the number of elements in the list:from typing import Iterable, List, NamedTuple class SampleSchema(NamedTuple): id: str flag: bool def flatten_dict(tpl) -> Iterable[SampleSchema]: return tpl # list_stream is a stream of list from dict as Python object, for example [{'id': '0', 'flag':True}] sample_stream = list_stream.flat_map(flatten_dict) # sample_stream is a named tuple stream of SampleSchema
Note
Punctuation marks are in-band signals that are inserted between tuples in a stream. If sources or stream transforms insert window markers at all, and when they insert them depends on the source or the semantic of the stream transformation. One example is the
aggregate()
, which inserts a window marker into the output stream after each aggregation.The
flat_map()
punctuation mode is “preserving”. Incoming window punctuations are forwarded.- Returns
A Stream containing flattened and mapped tuples.
- Return type
- Raises
TypeError – if func does not return an iterator nor None
Changed in version 1.11: func is optional.
-
for_each
(func, name=None, process_punct=None)¶ Sends information as a stream to an external system.
The transformation defined by func is a callable or a composite transformation.
Callable transformation
If func is callable then for each tuple t on this stream
func(t)
is called.If invoking
func
for a tuple on the stream raises an exception then its processing element will terminate. By default the processing element will automatically restart though tuples may be lost.If
func
is a callable object then it may suppress exceptions by return a true value from its__exit__
method. When an exception is suppressed no further processing occurs for the input tuple that caused the exception.Example with class handling punctuations in the Sink operator:
class FEClass(object): def __call__(self, t): return None def on_punct(self): print ('window punctuation marker received') ... ... s.for_each(FEClass(), name='SinkHandlingPunctuations', process_punct=True)
Note
Punctuation marks are in-band signals that are inserted between tuples in a stream. If sources or stream transforms insert window markers at all, and when they insert them depends on the source or the semantic of the stream transformation. One example is the
aggregate()
, which inserts a window marker into the output stream after each aggregation.Composite transformation
A composite transformation is an instance of
ForEach
. Composites allow the application developer to use the standard functional style of the topology api while allowing allowing expansion of a for_each transform to multiple basic transformations.- Parameters
func – A callable that takes a single parameter for the tuple and returns None.
name (str) – Name of the stream, defaults to a generated name.
process_punct (bool) – Specifies if
on_punct
on callablefunc
is called when window punctuation markers are received.
- Returns
Stream termination.
- Return type
Type hints
The argument type hint on func is used (if present) to verify at topology declaration time that it is compatible with the type of tuples on this stream.
Changed in version 1.7: Now returns a
Sink
instance.Changed in version 1.14: Support for type hints and composite transformations.
Changed in version 1.16: New parameter process_punct to support handling of window punctuation markers in callable.
-
isolate
()¶ Guarantees that the upstream operation will run in a separate processing element from the downstream operation
- Returns
Stream whose subsequent immediate processing will occur in a separate processing element.
- Return type
-
last
(size=1)¶ Declares a slding window containing most recent tuples on this stream.
The number of tuples maintained in the window is defined by size.
If size is an int then it is the count of tuples in the window. For example, with
size=10
the window always contains the last (most recent) ten tuples.If size is an datetime.timedelta then it is the duration of the window. With a timedelta representing five minutes then the window contains any tuples that arrived in the last five minutes.
If size is an submission parameter created by
Topology.create_submission_parameter()
then it is the count of tuples in the window. For specifying the duration of the window with a submission parameter uselastSeconds()
.- Parameters
size (int|datetime.timedelta|submission parameter created by
Topology.create_submission_parameter()
) – The size of the window, either an int to define the number of tuples or datetime.timedelta to define the duration of the window or submission parameter created byTopology.create_submission_parameter()
to define the number of tuples.
Examples:
# Create a window against stream s of the last 100 tuples w = s.last(size=100)
# Create a window against stream s of the last n tuples specified by submission parameter count = topo.create_submission_parameter('count', 100) w = s.last(size=count)
# Create a window against stream s of tuples # arrived on the stream in the last five minutes w = s.last(size=datetime.timedelta(minutes=5))
- Returns
Window of the last (most recent) tuples on this stream.
- Return type
-
lastSeconds
(size)¶ Declares a slding window containing most recent tuples on this stream using a submission parameter created by
Topology.create_submission_parameter()
.The number of tuples maintained in the window is defined by size in seconds.
- Parameters
size (submission parameter created by
Topology.create_submission_parameter()
) – The size of the window in seconds.
Examples:
# Create a window against stream s of the last with submission parameter `time` and the default value 10 seconds time = topo.create_submission_parameter('time', 10) w = s.lastSeconds(time)
# Create a window with submission parameter `secs` and no default value time = topo.create_submission_parameter(name='secs', type_=int) w = s.lastSeconds(time)
- Returns
Window of the last (most recent) tuples on this stream.
- Return type
-
low_latency
()¶ The function is guaranteed to run in the same process as the upstream Stream function. All streams that are created from the returned stream are also guaranteed to run in the same process until end_low_latency() is called.
- Returns
Stream
-
map
(func=None, name=None, schema=None)¶ Maps each tuple from this stream into 0 or 1 stream tuples.
The transformation defined by func is a callable or a composite transformation.
Callable transformation
For each tuple on this stream
result = func(tuple)
is called. If result is not None then the result will be submitted as a tuple on the returned stream. If result is None then no tuple submission will occur.By default the submitted tuple is
result
without modification resulting in a stream of picklable Python objects. Setting the schema parameter changes the type of the stream and modifies eachresult
before submission.object
orPython
- The default: result is submitted.str
type orString
- A stream of strings:str(result)
is submitted.json
orJson
- A stream of JSON objects:result
must be convertable to a JSON object using json package.StreamSchema
- A structured stream. result must be a dict or (Python) tuple. When a dict is returned the outgoing stream tuple attributes are set by name, when a tuple is returned stream tuple attributes are set by position.string value - Equivalent to passing
StreamSchema(schema)
Note
Punctuation marks are in-band signals that are inserted between tuples in a stream. If sources or stream transforms insert window markers at all, and when they insert them depends on the source or the semantic of the stream transformation. One example is the
aggregate()
, which inserts a window marker into the output stream after each aggregation.The
map()
punctuation mode is “preserving”. Incoming window punctuations are forwarded.Composite transformation
A composite transformation is an instance of
Map
. Composites allow the application developer to use the standard functional style of the topology api while allowing allowing expansion of a map transform to multiple basic transformations.- Parameters
func – A callable that takes a single parameter for the tuple. If not supplied then a function equivalent to
lambda tuple_ : tuple_
is used.name (str) – Name of the mapped stream, defaults to a generated name.
schema (StreamSchema|CommonSchema|str) – Schema of the resulting stream.
If invoking
func
for a tuple on the stream raises an exception then its processing element will terminate. By default the processing element will automatically restart though tuples may be lost.If
func
is a callable object then it may suppress exceptions by return a true value from its__exit__
method. When an exception is suppressed no tuple is submitted to the mapped stream corresponding to the input tuple that caused the exception.- Returns
A stream containing tuples mapped by func.
- Return type
Type hints
If schema is not set then the return type hint on func define the schema of the returned stream, defaulting to
Python
if no type hints are present.For example reading_from_json has a type hint that defines it as returning
SensorReading
instances (typed named tuples). Thus readings has a structured schema matchingSensorReading
def reading_from_json(value:dict) -> SensorReading: return SensorReading(value['id'], value['timestamp'], value['reading']) topo = Topology() json_readings = topo.source(HttpReadings()).as_json() readings = json_readings.map(reading_from_json)
The argument type hint on func is used (if present) to verify at topology declaration time that it is compatible with the type of tuples on this stream.
New in version 1.7: schema argument added to allow conversion to a structured stream.
New in version 1.8: Support for submitting dict objects as stream tuples to a structured stream (in addition to existing support for tuple objects).
Changed in version 1.11: func is optional.
-
property
name
¶ Unique name of the stream.
When declaring a stream a name parameter can be provided. If the supplied name is unique within its topology then it will be used as-is, otherwise a variant will be provided that is unique within the topology.
If a name parameter was not provided when declaring a stream then the stream is assigned a unique generated name.
- Returns
Name of the stream.
- Return type
str
See also
Warning
If the name is not a valid SPL identifier or longer than 80 characters then the name will be converted to a valid SPL identifier at compile and runtime. This identifier will be the name used in the REST api and log/trace.
Visualizations of the runtime graph uses name rather than the converted identifier.
A valid SPL identifier consists only of characters
A-Z
,a-z
,0-9
,_
and must not start with a number or be an SPL keyword.See
runtime_id
.
-
parallel
(width, routing=<Routing.ROUND_ROBIN: 1>, func=None, keys=None, name=None)¶ Split stream into channels and start a parallel region.
Returns a new stream that will contain the contents of this stream with tuples distributed across its channels.
The returned stream starts a parallel region where all downstream transforms are replicated across width channels. A parallel region is terminated by
end_parallel()
orfor_each()
.Any transform (such as
map()
,filter()
, etc.) in a parallel region has a copy of its callable executing independently in parallel. Channels remain independent of other channels until the region is terminated.For example with this topology fragment a parallel region of width 3 is created:
s = ... p = s.parallel(3) p = p.filter(F()).map(M()) e = p.end_parallel() e.for_each(E())
Tuples from
p
(parallelizeds
) are distributed across three channels, 0, 1 & 2 and are independently processed by three instances ofF
andM
. The tuples that pass the filterF
in channel 0 are then mapped by the instance ofM
in channel 0, and so on for channels 1 and 2.The channels are combined by
end_parallel
and so a single instance ofE
processes all the tuples from channels 0, 1 & 2.This stream instance (the original) is outside of the parallel region and so any downstream transforms are executed normally. Adding this map transform would result in tuples on
s
being processed by a single instance ofN
:n = s.map(N())
The number of channels is set by width which may be an int greater than zero or a submission parameter created by
Topology.create_submission_parameter()
.With IBM Streams 4.3 or later the number of channels can be dynamically changed at runtime.
Tuples are routed to channels based upon routing, see
Routing
.A parallel region can have multiple termination points, for example when a stream within the stream has multiple transforms against it:
s = ... p = s.parallel(3) m1p = p.map(M1()) m2p = p.map(M2()) p.for_each(E()) m1 = m1p.end_parallel() m2 = m2p.end_parallel()
Parallel regions can be nested, for example:
s = ... m = s.parallel(2).map(MO()).parallel(3).map(MI()).end_parallel().end_parallel()
In this case there will be two instances of
MO
(the outer region) and six (2x3) instances ofMI
(the inner region).Streams created by
source()
orsubscribe()
are placed in a parallel region byset_parallel()
.- Parameters
width (int|submission parameter created by
Topology.create_submission_parameter()
) – Degree of parallelism.routing (Routing) – Denotes what type of tuple routing to use.
func – Optional function called when
Routing.HASH_PARTITIONED
routing is specified. The function provides an integer value to be used as the hash that determines the tuple channel routing.keys ([str]) – Optional list of keys required when
Routing.KEY_PARTITIONED
routing is specified. Each key represents a tuple attribute.name (str) – The name to display for the parallel region.
- Returns
A stream for which subsequent transformations will be executed in parallel.
- Return type
See also
-
print
(tag=None, name=None, write_punctuations=None)¶ Prints each tuple to stdout flushing after each tuple.
If tag is not None then each tuple has “tag: ” prepended to it before printing.
Note
Punctuation marks are in-band signals that are inserted between tuples in a stream. If sources or stream transforms insert window markers at all, and when they insert them depends on the source or the semantic of the stream transformation. One example is the
aggregate()
, which inserts a window marker into the output stream after each aggregation.There are two kinds of punctuation markers, which are written to stdout when write_punctuations is set to True:
Window punctuation: indicates breaks in the data, which can be used by the transformation logic
Final punctuation: indicates the end of a stream
- Parameters
tag – A tag to prepend to each tuple.
name (str) – Name of the resulting stream. When None defaults to a generated name.
write_punctuations (bool) – Specifies to write punctuations to stdout
- Returns
Stream termination.
- Return type
New in version 1.6.1: tag, name parameters.
Changed in version 1.7: Now returns a
Sink
instance.New in version 1.16: write_punctuations parameter.
-
publish
(topic, schema=None, name=None)¶ Publish this stream on a topic for other Streams applications to subscribe to. A Streams application may publish a stream to allow other Streams applications to subscribe to it. A subscriber matches a publisher if the topic and schema match.
By default a stream is published using its schema.
A stream of
Python objects
can be subscribed to by other Streams Python applications.If a stream is published setting schema to
json
orJson
then it is published as a stream of JSON objects. Other Streams applications may subscribe to it regardless of their implementation language.If a stream is published setting schema to
str
orString
then it is published as strings. Other Streams applications may subscribe to it regardless of their implementation language.Supported values of schema are only
json
,Json
andstr
,String
.- Parameters
topic (str) – Topic to publish this stream to.
schema – Schema to publish. Defaults to the schema of this stream.
name (str) – Name of the publish operator, defaults to a generated name.
- Returns
Stream termination.
- Return type
New in version 1.6.1: name parameter.
Changed in version 1.7: Now returns a
Sink
instance.
-
punctor
(func, before=True, replace=False, name=None)¶ Adds window punctuation to this stream using the supplied callable func as condition that determines when a window punctuation is to be generated.
For each stream tuple t on the stream
func(t)
is called, if the return evaluates toTrue
the window punctuation will be generated and the tuple is forwarded, otherwise the tuple is just forwarded.Note
Punctuation marks are in-band signals that are inserted between tuples in a stream. If sources or stream transforms insert window markers at all, and when they insert them depends on the source or the semantic of the stream transformation. One example is the
aggregate()
, which inserts a window marker into the output stream after each aggregation.The
punctor()
punctuation mode is “generating” and inserts punctuation into the output stream according to custom logic. Incoming window punctuation is not forwarded.- Parameters
func – Punctor callable that takes a single parameter for the stream tuple.
before (bool) – If the value is True, the punctuation is generated before the output tuple; otherwise it is generated after the output tuple. If the parameter
replace
is set toTrue
then the parameterbefore
is ignored.replace (bool) – If the value is True, then in case
func(t)
returnsTrue
the window punctuation will be generated and the tuple is discarded (not forwarded). The parameterbefore
is ignored in this case.name (str) – Name of the stream, defaults to a generated name.
If invoking
func
for a stream tuple raises an exception then its processing element will terminate. By default the processing element will automatically restart though tuples may be lost.If
func
is a callable object then it may suppress exceptions by return a true value from its__exit__
method. When an exception is suppressed no tuple is submitted to the output stream corresponding to the input tuple that caused the exception.Example with adding punctuation after each tuple:
topo = Topology() s = topo.source([1,2,3,4]) s = s.punctor(lambda x: True, before=False)
Example with sending punctuation before a tuple:
topo = Topology() s = topo.source([1,2,3,4]) s = s.punctor(lambda t : 2 < t)
- Returns
A Stream containing tuples with generated punctuation. The schema of the returned stream is the same as this stream’s schema.
- Return type
Type hints
The argument type hint on func is used (if present) to verify at topology declaration time that it is compatible with the type of tuples on this stream.
New in version 1.16.
Resource tags for this processing logic.
Tags are a mechanism for differentiating and identifying resources that have different physical characteristics or logical uses. For example a resource (host) that has external connectivity for public data sources may be tagged ingest.
Processing logic can be associated with one or more tags to require running on suitably tagged resources. For example adding tags ingest and db requires that the processing element containing the callable that created the stream runs on a host tagged with both ingest and db.
A
Stream
that was not created directly with a Python callable cannot have tags associated with it. For example a stream that is aunion()
of multiple streams cannot be tagged. In this case this method returns an empty frozenset which cannot be modified.See https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.2.1/com.ibm.streams.admin.doc/doc/tags.html for more details of tags within IBM Streams.
- Returns
Set of resource tags, initially empty.
- Return type
set
Warning
If no resources exist with the required tags then job submission will fail.
New in version 1.7.
-
property
runtime_id
¶ Return runtime identifier.
If
name
is not a valid SPL identifier then the runtime identifier will be valid SPL identifier that represents name. Otherwise name is returned.The runtime identifier is how the underlying SPL operator or output port is named in the REST api and trace/log files.
If a topology unique name is supplied when creating a stream then runtime identifier is fixed regardless of other changes in the topology.
The algorithm to determine the runtime name (for clients that cannot call this method, for example, remote REST clients gathering metrics) is as follows.
If the length of
name
is less than or equal to 80 andname
is an SPL identifier thenname
is used. An SPL identifier consists only of the charactersA-Z
,a-z
0-9
and_
, must not start with0-9
and must not be an SPL keyword.Otherwise the identifier has the form
prefix_suffix
.prefix
is the kind of the SPL operator stripped of its namespace and::
. For all functional methods the operator kind is the method name with the first character upper-cased.For example,
Filter
forfilter()
,Beacon
forspl::utility::Beacon
.suffix
is a hashed version of name, an MD5 digestd
is calculated from the UTf-8 encoding ofname
.d
is shortened by having its first eight bytes xor folded with its last eight bytes.d
is then base64 encoded to produce a string. Padding=
and+
and/
characters are removed from the string.For example,
s.filter(lambda x : True, name='你好')
results in a runtime identifier ofFilter_oGwCfhWRg4
.The default mapping can be overridden by setting
Topology.name_to_runtime_id
to a callable that returns a valid identifier for its single argument. The returned identifier should be unique with the topology. For example usinig a pre-populated dict as the mapper:topo = Topology() names = {'你好', 'Buses', '培养':'Trains'} topo.name_to_runtime_id = names.get buses = toopo.source(..., name='你好') trains = topo.source(..., name='培养'} // buses.runtime_id will be Buses // trains.runtime_id will be Trains
- Returns
Runtime identifier of the stream.
- Return type
str
New in version 1.14.
-
set_consistent
(consistent_config)¶ Indicates that the stream is the start of a consistent region.
- Parameters
consistent_config (consistent.ConsistentRegionConfig) – the configuration of the consistent region.
- Returns
Returns this stream.
- Return type
New in version 1.11.
-
set_event_time
(name, lag=None, minimum_gap=None, resolution=None)¶ Emit a stream with event-time values and watermarks.
Defines the stream attribute with the parameter name that is used as event-time attribute in the event-time graph. An event-time graph starts with this stream and inserts watermarks into the stream from time to time.
Event-time connectivity extends only downstream.
The event-time graph ends at a sink or at an operator which does not output the event-time attribute.
A sample application creating an event_time stream and using a time-interval window (
streamsx.topology.topology.Stream.time_interval()
) is located in the samples directory: Event-Time-SampleSample with an attribute named
ts
of typetimestamp
used as event-time attribute:from streamsx.spl.types import Timestamp ts1 = Timestamp(1608196, 235000000, 0) s = topo.source([(1,ts1)]) # transform to structured schema ts_schema = StreamSchema('tuple<int64 num, timestamp ts>').as_tuple(named=True) s = s.map(lambda x : x, schema=ts_schema, name='event_time_source') # add event-time annotation for attribute ts to the "event_time_source" s = s.set_event_time('ts')
- Parameters
name (str) – Name of the event-time attribute.
lag (float|submission parameter created by
Topology.create_submission_parameter()
) – Defines the duration in seconds between the maximum event-time of submitted tuples and the value of the watermark to submit. If it is not specified, the default value is 0.0.minimum_gap (float|submission parameter created by
Topology.create_submission_parameter()
) – Defines the minimum event-time duration in seconds between subsequent watermarks. If it is not specified, the default value is 0.1 (100 milliseconds).resolution (str) – Specifies the resolution of the event-time attribute in: Milliseconds, Microseconds, Nanoseconds. If the event-time attribute is of type SPL timestamp, the default resolution value is nanoseconds. If the event-time attribute is of type int, the default resolution value is milliseconds.
- Returns
Returns this stream.
- Return type
New in version 2.1.
-
set_parallel
(width, name=None)¶ Set this source stream to be split into multiple channels as the start of a parallel region.
Calling
set_parallel
on a stream created bysource()
results in the stream having width channels, each created by its own instance of the callable:s = topo.source(S()) s.set_parallel(3) f = s.filter(F()) e = f.end_parallel()
Each channel has independent instances of
S
andF
. Tuples created by the instance ofS
in channel 0 are passed to the instance ofF
in channel 0, and so on for channels 1 and 2.Callable transforms instances within the channel can use the runtime functions
channel()
,local_channel()
,max_channels()
&local_max_channels()
to adapt to being invoked in parallel. For example a source callable can use its channel number to determine which partition to read from in a partitioned external system.Calling
set_parallel
on a stream created bysubscribe()
results in the stream having width channels. Subscribe ensures that the stream will contain all published tuples matching the topic subscription and type. A published tuple will appear on one of the channels though the specific channel is not known in advance.A parallel region is terminated by
end_parallel()
orfor_each()
.The number of channels is set by width which may be an int greater than zero or a submission parameter created by
Topology.create_submission_parameter()
.With IBM Streams 4.3 or later the number of channels can be dynamically changed at runtime.
Parallel regions are started on non-source streams using
parallel()
.- Parameters
width (int|submission parameter created by
Topology.create_submission_parameter()
) – The degree of parallelism for the parallel region.name (str) – Name of the parallel region. Defaults to the name of this stream.
- Returns
Returns this stream.
- Return type
See also
New in version 1.9.
Changed in version 1.11: name parameter added.
-
split
(into, func, names=None, name=None)¶ Splits tuples from this stream into multiple independent streams using the supplied callable func.
For each tuple on the stream
int(func(tuple))
is called, if the return is zero or positive then the (unmodified) tuple will be present on one, and only one, of the output streams. The specific stream will be at indexint(func(tuple)) % N
in the returned list, whereN
is the number of output streams. If the return is negative then the tuple is dropped.split
is used to declare disparate transforms on each split stream. This differs toparallel()
where each channel has the same logic transforms.- Parameters
into (int) – Number of streams the input is split into, must be greater than zero.
func – Split callable that takes a single parameter for the tuple.
names (list[str]) – Names of the returned streams, in order. If not supplied or a stream doesn’t have an entry in names then a generated name is used. Entries are used to generated the field names of the returned named tuple.
name (str) – Name of the split transform, defaults to a generated name.
If invoking
func
for a tuple on the stream raises an exception then its processing element will terminate. By default the processing element will automatically restart though tuples may be lost.If
func
is a callable object then it may suppress exceptions by return a true value from its__exit__
method. When an exception is suppressed no tuple is submitted to the filtered stream corresponding to the input tuple that caused the exception.- Returns
Named tuple of streams this stream is split across. All returned streams have the same schema as this stream.
- Return type
namedtuple
Type hints
The argument type hint on func is used (if present) to verify at topology declaration time that it is compatible with the type of tuples on this stream.
Note
Punctuation marks are in-band signals that are inserted between tuples in a stream. If sources or stream transforms insert window markers at all, and when they insert them depends on the source or the semantic of the stream transformation. One example is the
aggregate()
, which inserts a window marker into the output stream after each aggregation.The
split()
punctuation mode is “preserving”. Incoming window punctuations are forwarded to each output stream.Examples
Example of splitting a stream based upon message severity, dropping any messages with unknown severity, and then performing different transforms for each severity:
msgs = topo.source(ReadMessages()) SEVS = {'H':0, 'M':1, 'L':2} severities = msgs.split(3, lambda t:SEVS.get(t.sev), names=['high','medium','low'], name='SeveritySplit') high_severity = severities.high high_severity.for_each(SendAlert()) medium_severity = severities.medium medium_severity.for_each(LogMessage()) low_severity = severities.low low_severity.for_each(Archive())
See also
New in version 1.13.
-
time_interval
(interval_duration, creation_period=None, discard_age=None, interval_offset=None)¶ Declares a time-interval window and specifies that the window-kind tuples are placed into panes which correspond to equal intervals in the event-time domain.
A time-interval window collects tuples into fixed-duration intervals defined over event time. Time-interval windows collect tuples into window panes specified by event-time intervals. A pane includes tuples with an event time greater or equal to the start time of the pane and lower than the end time.
Find a sample application creating an event-time stream (
streamsx.topology.topology.Stream.set_event_time()
) and using a time-interval window in the samples directory: Event-Time-Sample- Parameters
interval_duration (float) – Specifies the required duration between the lower and upper interval endpoints. It must be greater than zero (0.0). The parameter value represents seconds.
creation_period (float) – Specifies the duration between adjacent intervals. The default value is equal to interval_duration. It must be greater than zero (0.0). The parameter value represents seconds.
discard_age (float) – Defines the duration between the point in time when a window pane becomes complete and the point in time when the window does not accept late tuples any longer. It must be greater or equal to zero (0.0). The default value is zero (0.0). The parameter value represents seconds.
interval_offset (float) – Defines a point-in-time value which coincides with an interval start time. Panes partition the event time domain into intervals of the form:
[N * creation_period + interval_offset, N * creation_period + interval_duration + interval_offset)
where 0.0 is the Unix Epoch: 1970-01-01T00:00:00Z UTC. The parameter value represents seconds.
Examples:
w = s.time_interval(interval_duration=60.0, creation_period=1.0)
- Returns
Event-time window on this stream.
- Return type
New in version 2.1.
-
union
(streamSet)¶ Creates a stream that is a union of this stream and other streams
- Parameters
streamSet – a set of Stream objects to merge with this stream
- Returns
- Return type
-
view
(buffer_time=10.0, sample_size=10000, name=None, description=None, start=False)¶ Defines a view on a stream.
A view is a continually updated sampled buffer of a streams’s tuples. Views allow visibility into a stream from external clients such as Jupyter Notebooks, the Streams console, Microsoft Excel or REST clients.
The view created by this method can be used by external clients and through the returned
View
object after the topology is submitted. For example a Jupyter Notebook can declare and submit an application with views, and then use the resultant View objects to visualize live data within the streams.When the stream contains Python objects then they are converted to JSON.
- Parameters
buffer_time – Specifies the buffer size to use measured in seconds.
sample_size – Specifies the number of tuples to sample per second.
name (str) – Name of the view. Name must be unique within the topology. Defaults to a generated name.
description – Description of the view.
start (bool) – Start buffering data when the job is submitted. If False then the view starts buffering data when the first remote client accesses it to retrieve data.
- Returns
View object which can be used to access the data when the topology is submitted.
- Return type
Note
Views are only supported when submitting to distributed contexts including Streaming Analytics service.
-
-
class
streamsx.topology.topology.
View
(name)¶ Bases:
object
The View class provides access to a continuously updated sampling of data items on a
Stream
after submission. A view object is produced byview()
, and will access data items from the stream on which it is invoked.For example, a View object could be created and used as follows:
>>> topology = Topology() >>> rands = topology.source(lambda: iter(random.random, None)) >>> view = rands.view() >>> submit(ContextTypes.DISTRIBUTED, topology) >>> queue = view.start_data_fetch() >>> for val in iter(queue.get, 60): ... print(val) ... 0.6527 0.1963 0.0512
-
display
(duration=None, period=2)¶ Display a view within a Jupyter or IPython notebook.
Provides an easy mechanism to visualize data on a stream using a view.
Tuples are fetched from the view and displayed in a table within the notebook cell using a
pandas.DataFrame
. The table is continually updated with the latest tuples from the view.This method calls
start_data_fetch()
and will callstop_data_fetch()
when completed if duration is set.- Parameters
duration (float) – Number of seconds to fetch and display tuples. If
None
then the display will be updated untilstop_data_fetch()
is called.period (float) – Maximum update period.
Note
A view is a sampling of data on a stream so tuples that are on the stream may not appear in the view.
Note
Python modules ipywidgets and pandas must be installed in the notebook environment.
Warning
Behavior when called outside a notebook is undefined.
New in version 1.12.
-
fetch_tuples
(max_tuples=20, timeout=None)¶ Fetch a number of tuples from this view.
Fetching of data must have been started with
start_data_fetch()
before calling this method.If
timeout
isNone
then the returned list will containmax_tuples
tuples. Otherwise if the timeout is reached the list may contain less thanmax_tuples
tuples.- Parameters
max_tuples (int) – Maximum number of tuples to fetch.
timeout (float) – Maximum time to wait for
max_tuples
tuples.
- Returns
List of fetched tuples.
- Return type
list
New in version 1.12.
-
start_data_fetch
()¶ Starts a background thread which begins accessing data from the remote Stream. The data items are placed asynchronously in a queue, which is returned from this method.
- Returns
A Queue object which is populated with the data items of the stream.
- Return type
queue.Queue
-
stop_data_fetch
()¶ Terminates the background thread fetching stream data items.
-
-
class
streamsx.topology.topology.
PendingStream
(topology)¶ Bases:
object
Pending stream connection.
A pending stream is an initially disconnected stream. The stream attribute can be used as an input stream when the required stream is not yet available. Once the required stream is available the connection is made using
complete()
.The schema of the pending stream is defined by the stream passed into complete.
A simple example is creating a source stream after the filter that will use it:
# Create the pending or placeholder stream pending_source = PendingStream(topology) # Create a filter against the placeholder stream f = pending_source.stream.filter(lambda : t : t.startswith("H")) source = topology.source(['Hello', 'World']) # Now complete the connection pending_source.complete(source)
Streams allows feedback loops in its flow graphs, where downstream processing can produce a stream that is fed back into the input port of an upstream operator. Typically, feedback loops are used to modify the state of upstream transformations, rather than repeat processing of tuples.
A feedback loop can be created by using a PendingStream. The upstream transformation or operator that will end the feedback loop uses
stream
as one of its inputs. A processing pipeline is then created and once the downstream starting point of the feedback loop is available, it is passed tocomplete()
to create the loop.-
complete
(stream)¶ Complete the pending stream.
Any connections made to
stream
are connected to stream once this method returns.- Parameters
stream (Stream) – Stream that completes the connection.
-
is_complete
()¶ Has this connection been completed.
-
-
class
streamsx.topology.topology.
Window
(stream, window_type)¶ Bases:
object
Declaration of a window of tuples on a Stream.
A Window enables transforms against collection (or window) of tuples on a stream rather than per-tuple transforms. Windows are created against a stream using
Stream.batch()
,Stream.batchSeconds()
orStream.last()
,Stream.lastSeconds()
orStream.time_interval()
.Supported transforms are:
aggregate()
- Aggregate the window contents into a single tuple.
A window is optionally
partitioned
to create independent sub-windows per partition key.A Window can be also passed as the input of an SPL operator invocation to indicate the operator’s input port is windowed.
Example invoking the SPL Aggregate operator with a sliding window of the last two minutes, triggering every five tuples:
win = s.last(datetime.timedelta(minutes=2)).trigger(5) agg = op.Map('spl.relational::Aggregate', win, schema = 'tuple<uint64 sum, uint64 max>') agg.sum = agg.output('Sum(val)') agg.max = agg.output('Max(val)')
-
aggregate
(function, name=None)¶ Aggregates the contents of the window when the window is triggered.
Upon a window trigger, the supplied function is passed a list containing the contents of the window:
function(items)
. The order of the window items in the list are the order in which they were each received by the window. If the function’s return value is not None then the result will be submitted as a tuple on the returned stream. If the return value is None then no tuple submission will occur.For example, a window that calculates a moving average of the last 10 tuples could be written as follows:
win = s.last(10).trigger(1) moving_averages = win.aggregate(lambda tuples: sum(tuples)/len(tuples))
When the window is
partitioned
then each partition is triggered and aggregated using function independently.For example, this partitioned window aggregation will independently call
summarize_sensors
with ten tuples all having the same id when triggered. Each partition triggers independently so thatsummarize_sensors
is invoked for a specific id every time two tuples with that id have been inserted into the window partition:win = s.last(10).trigger(2).partition(key='id') moving_averages = win.aggregate(summarize_sensors)
Example for building a rolling average window aggregation with stream tuples passed as a named tuple:
from streamsx.topology.topology import Topology from streamsx.topology import context from streamsx.topology.context import submit, ContextTypes, ConfigParams import random import itertools from typing import Iterable, NamedTuple class AggregateSchema(NamedTuple): count: int = 0 avg: float = 0.0 min: int = 0 max: int = 0 class Average: def __call__(self, tuples_in_window) -> AggregateSchema: values = [tpl.value for tpl in tuples_in_window] mn = min(values) mx = max(values) num_of_tuples = len(tuples_in_window) average = sum(values) / len(tuples_in_window) output_event = AggregateSchema( count = num_of_tuples, avg = average, min = mn, max = mx ) return output_event class NumbersSchema(NamedTuple): value: int = 0 class Numbers(object): def __call__(self) -> Iterable[NumbersSchema]: for num in itertools.count(1): yield {"value": num} topo = Topology("Rolling Average") src = topo.source(Numbers()) # sliding window with eviction count as submission parameter window = src.last(size=topo.create_submission_parameter('count', 10)) rolling_average = window.aggregate(Average())
Note
If a tumbling (
batch()
) window’s stream is finite then a final aggregation is performed if the window is not empty. Thusfunction
may be passed fewer tuples for a window sized using a count. For example a stream with 105 tuples and a batch size of 25 tuples will perform four aggregations with 25 tuples each and a final aggregation of 5 tuples.Note
Punctuation marks are in-band signals that are inserted between tuples in a stream. If sources or stream transforms insert window markers at all, and when they insert them depends on the source or the semantic of the stream transformation.
The
aggregate()
inserts a window marker into the output stream after each aggregation.- Parameters
function – The function which aggregates the contents of the window
name (str) – The name of the returned stream. Defaults to a generated name.
- Returns
A Stream of the returned values of the supplied function.
- Return type
Warning
In Python 3.5 or later if the stream being aggregated has a structured schema that contains a
blob
type then anyblob
value will not be maintained in the window. Instead itsmemoryview
object will have been released. If theblob
value is required then perform amap()
transformation (without settingschema
) copying any required blob value in the tuple usingmemoryview.tobytes()
.New in version 1.8.
Changed in version 1.11: Support for aggregation of streams with structured schemas.
Changed in version 1.13: Support for partitioned aggregation.
-
partition
(key)¶ Declare a window with this window’s eviction and trigger policies, and a partition.
In a partitioned window, a subwindow will be created for each distinct value received for the attribute used for partitioning. Each subwindow is treated as if it were a separate window, and each subwindow shares the same trigger and eviction policy.
The key may either be a string containing the name of an attribute, or a python callable.
The key parameter may be a string only with a structured schema, and the value of the key parameter must be the name of a single attribute in the schema.
The key parameter may be a python callable object. If it is, the callable is evaluated for each tuple, and the return from the callable determines the partition into which the tuple is placed. The return value must have a
__hash__
method. If checkpointing is enabled, and the callable object has a state, the state of the callable object will be saved and restored in checkpoints. However,__enter__
and__exit__
methods may not be called on the callable object.- Parameters
key – The name of the attribute to be used for partitioning, or the python callable object used for partitioning.
- Returns
Window that will be triggered.
- Return type
New in version 1.13.
-
trigger
(when=1)¶ Declare a window with this window’s size and a trigger policy.
When the window is triggered is defined by when.
If when is an int then the window is triggered every when tuples. For example, with
when=5
the window will be triggered every five tuples.If when is an datetime.timedelta then it is the period of the trigger. With a timedelta representing one minute then the window is triggered every minute.
By default, when trigger has not been called on a Window it triggers for every tuple inserted into the window (equivalent to
when=1
).- Parameters
when – The size of the window, either an int to define the number of tuples or datetime.timedelta to define the duration of the window.
- Returns
Window that will be triggered.
- Return type
Warning
A trigger is only supported for a sliding window such as one created by
last()
.
-
class
streamsx.topology.topology.
Sink
(op)¶ Bases:
streamsx._streams._placement._Placement
,object
Termination of a Stream.
A
Stream
is terminated by processing that typically sends the tuples to an external system.Note
A Stream may have multiple terminations.
See also
New in version 1.7.
-
property
category
¶ Category for this processing logic.
An arbitrary application label allowing grouping of application elements by category.
Assign categories based on common function. For example, database is a common category that you can use to group all database sinks in an application.
A category is not required and defaults to
None
meaning no assigned category.Streams console supports visualization based upon categories.
- Raises
TypeError – No directly associated processing logic.
Note
A category has no affect on the execution of the application.
New in version 1.9.
-
colocate
(others)¶ Colocate this processing logic with others.
Colocating processing logic requires execution in the same Streams processing element (operating system process).
When a job is submitted Streams may colocate (fuse) processing logic into the same processing element based upon flow analysis and current resource usage. This call instructs that this logic and others must be executed in the same processing element.
Resource tags for this processing logic.
Tags are a mechanism for differentiating and identifying resources that have different physical characteristics or logical uses. For example a resource (host) that has external connectivity for public data sources may be tagged ingest.
Processing logic can be associated with one or more tags to require running on suitably tagged resources. For example adding tags ingest and db requires that the processing element containing the callable that created the stream runs on a host tagged with both ingest and db.
A
Stream
that was not created directly with a Python callable cannot have tags associated with it. For example a stream that is aunion()
of multiple streams cannot be tagged. In this case this method returns an empty frozenset which cannot be modified.See https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.2.1/com.ibm.streams.admin.doc/doc/tags.html for more details of tags within IBM Streams.
- Returns
Set of resource tags, initially empty.
- Return type
set
Warning
If no resources exist with the required tags then job submission will fail.
New in version 1.7.
-
property