streamsx.topology.topology¶
Streaming application definition.
Overview¶
IBM Streams is an advanced analytic platform that allows user-developed applications to quickly ingest, analyze and correlate information as it arrives from thousands of real-time sources. Streams can handle very high data throughput rates, millions of events or messages per second.
With this API Python developers can build streaming applications that can be executed using IBM Streams, including the processing being distributed across multiple computing resources (hosts or machines) for scalability.
Topology¶
A Topology declares a graph of streams and operations against
tuples (data items) on those streams.
After being declared, a Topology is submitted to be compiled into a Streams application bundle (sab file) and then executed. The sab file is a self contained bundle that can be executed in a distributed Streams instance either using the Streaming Analytics service on IBM Cloud or an on-premise IBM Streams installation.
The compilation step invokes the Streams compiler to produce a bundle. This effectively, from a Python point of view, produces a runnable version of the Python topology that includes application specific Python C extensions to optimize performance.
The bundle also includes any required Python packages or modules
that were used in the declaration of the application. For example
the Python module containing the callable used in a
map() invocation. These modules are copied into
the bundle from their local location. This allows the bundle to
be self-contained, and thus not the Streams instance have all the required
Python packages pre-installed. The addition of packages to the bundle
can be controlled with Topology.include_packages and
Topology.exclude_packages.
The Streams runtime distributes the application’s operations across the resources available in the instance.
Note
Topology represents a declaration of a streaming application that will be executed by a Streams instance as a job, either using the Streaming Analytics service on IBM Cloud or an on-premises distributed instance. Topology does not represent a running application, so an instance of Stream class does not contain the tuples, it is only a declaration of a stream.
Stream¶
A Stream can be an infinite sequence of tuples, such as a stream for a traffic flow sensor.
Alternatively, a stream can be finite, such as a stream that is created from the contents of a file.
When a streams processing application contains infinite streams, the application runs continuously without ending.
A stream has a schema that defines the type of each tuple on the stream. The schema for a Python Topology is either:
- Python- A tuple may be any Python object. This is the default.
- String- Each tuple is a Unicode string.
- Binary- Each tuple is a blob.
- Json- Each tuple is a Python dict that can be expressed as a JSON object.
- Structured - A stream that has a structured schema of a ordered list of attributes, with each attribute having a fixed type (e.g. float64 or int32) and a name. The schema of a structured stream is defined using StreamSchema.
Stream processing¶
Callables¶
A stream is processed to produce zero or more transformed streams, such as filtering a stream to drop unwanted tuples, producing a stream that only contains the required tuples.
Streaming processing is per tuple based, as each tuple is submitted to a stream consuming operators have their processing logic invoked for that tuple.
A functional operator is declared by methods on Stream such as map() which
maps the tuples on its input stream to tuples on its output stream. Stream uses a functional model
where each stream processing operator is defined in terms a Python callable that is invoked passing
input tuples and whose return defines what output tuples are submitted for downstream processing.
The Python callable used for functional processing in this API may be:
- A Python lambda function.
- A Python function.
- An instance of a Python callable class.
For example a stream words containing only string objects can be
processed by a filter() using a lambda function:
# Filter the stream so it only contains words starting with py
pywords = words.filter(lambda word : word.startswith('py'))
Stateful operations¶
Use of a class instance allows the operation to be stateful by maintaining state in instance attributes across invocations.
Note
For support with consistent region or checkpointing instances should ensure that the object’s state can be pickled. See https://docs.python.org/3.5/library/pickle.html#handling-stateful-objects
Initialization and shutdown¶
Execution of a class instance effectively run in a context manager so that an instance’s __enter__
method is called when the processing element containing the instance  is initialized
and its __exit__ method called when the processing element is stopped. To take advantage of this
the class must define both __enter__ and __exit__ methods.
Note
Since an instance of a class is passed to methods such as
map() __init__ is only called when the topology is declared, not at runtime.
Initialization at runtime, such as opening connections, occurs through the __enter__ method.
Example of using __enter__ to create custom metrics:
import streamsx.ec as ec
class Sentiment(object):
    def __init__(self):
        pass
    def __enter__(self):
        self.positive_metric = ec.CustomMetric(self, "positiveSentiment")
        self.negative_metric = ec.CustomMetric(self, "negativeSentiment")
    def __exit__(self, exc_type, exc_value, traceback):
        pass
    def __call__(self):
        pass
When an instance defines a valid __exit__ method then it will be called with an exception when:
- the instance raises an exception during processing of a tuple
- a data conversion exception is raised converting a value to an structutured schema tuple or attribute
If __exit__ returns a true value then the exception is suppressed and processing continues, otherwise the enclosing processing element will be terminated.
Tuple semantics¶
Python objects on a stream may be passed by reference between callables (e.g. the value returned by a map callable may be passed by reference to a following filter callable). This can only occur when the functions are executing in the same PE (process). If an object is not passed by reference a deep-copy is passed. Streams that cross PE (process) boundaries are always passed by deep-copy.
Thus if a stream is consumed by two map and one filter callables in the same PE they may receive the same object reference that was sent by the upstream callable. If one (or more) callable modifies the passed in reference those changes may be seen by the upstream callable or the other callables. The order of execution of the downstream callables is not defined. One can prevent such potential non-deterministic behavior by one or more of these techniques:
- Passing immutable objects
- Not retaining a reference to an object that will be submitted on a stream
- Not modifying input tuples in a callable
- Using copy/deepcopy when returning a value that will be submitted to a stream.
Applications cannot rely on pass-by reference, it is a performance optimization that can be made in some situations when stream connections are within a PE.
Application log and trace¶
IBM Streams provides application trace and log services which are accesible through standard Python loggers from the logging module.
SPL operators¶
In addition an application declared by Topology can include stream processing defined by SPL primitive or composite operators. This allows reuse of adapters and analytics provided by IBM Streams, open source and third-party SPL toolkits.
See streamsx.spl.op
Module contents¶
Module contents¶
Classes
| PendingStream | Pending stream connection. | 
| Routing | Defines how tuples are routed to channels in a parallel region. | 
| Sink | Termination of a Stream. | 
| Stream | The Stream class is the primary abstraction within a streaming application. | 
| SubscribeConnection | Connection mode between a subscriber and matching publishers. | 
| Topology | The Topology class is used to define data sources, and is passed as a parameter when submitting an application. | 
| View | The View class provides access to a continuously updated sampling of data items on a Stream after submission. | 
| Window | Declaration of a window of tuples on a Stream. | 
- 
class streamsx.topology.topology.PendingStream(topology)¶
- Pending stream connection. - A pending stream is an initially disconnected stream. The stream attribute can be used as an input stream when the required stream is not yet available. Once the required stream is available the connection is made using - complete().- The schema of the pending stream is defined by the stream passed into complete. - A simple example is creating a source stream after the filter that will use it: - # Create the pending or placeholder stream pending_source = PendingStream(topology) # Create a filter against the placeholder stream f = pending_source.stream.filter(lambda : t : t.startswith("H")) source = topology.source(['Hello', 'World']) # Now complete the connection pending_source.complete(source) - Streams allows feedback loops in its flow graphs, where downstream processing can produce a stream that is fed back into the input port of an upstream operator. Typically, feedback loops are used to modify the state of upstream transformations, rather than repeat processing of tuples. - A feedback loop can be created by using a PendingStream. The upstream transformation or operator that will end the feedback loop uses - streamas one of its inputs. A processing pipeline is then created and once the downstream starting point of the feedback loop is available, it is passed to- complete()to create the loop.- 
complete(stream)¶
- Complete the pending stream. - Any connections made to - streamare connected to stream once this method returns.- Parameters: - stream (Stream) – Stream that completes the connection. 
 - 
is_complete()¶
- Has this connection been completed. 
 
- 
- 
class streamsx.topology.topology.Routing¶
- Defines how tuples are routed to channels in a parallel region. - A parallel region is started by - parallel()and ended with- end_parallel()or- for_each().- 
BROADCAST= 0¶
- Tuples are routed to every channel in the parallel region. 
 - 
HASH_PARTITIONED= 3¶
- Tuples are routed based upon a hash value so that tuples with the same hash and thus same value are always routed to the same channel. When a hash function is specified it is passed the tuple and the return value is the hash. When no hash function is specified then hash(tuple) is used. - Each tuple is only sent to a single channel. - Warning - A consistent hash function is required to guarantee that a tuple with the same value is always routed to the same channel. hash() is not consistent in that for types str, bytes and datetime objects are “salted” with an unpredictable random value (Python 3.5). Thus if the processing element is restarted channel routing for a hash based upon a str, bytes or datetime will change. In addition code executing in the channels can see a different hash value to other channels and the execution that routed the tuple due to being in different processing elements. 
 - 
ROUND_ROBIN= 1¶
- Tuples are routed to maintain an even distribution of tuples to the channels. - Each tuple is only sent to a single channel. 
 
- 
- 
class streamsx.topology.topology.Sink(op)¶
- Termination of a Stream. - A - Streamis terminated by processing that typically sends the tuples to an external system.- Note - A Stream may have multiple terminations. - See also - New in version 1.7. - 
category¶
- Category for this processing logic. - An arbitrary application label allowing grouping of application elements by category. - Assign categories based on common function. For example, database is a common category that you can use to group all database sinks in an application. - A category is not required and defaults to - Nonemeaning no assigned category.- Streams console supports visualization based upon categories. - Raises: - TypeError– No directly associated processing logic.- Note - A category has no affect on the execution of the application. - New in version 1.9. 
 - 
colocate(others)¶
- Colocate this processing logic with others. - Colocating processing logic requires execution in the same Streams processing element (operating system process). - When a job is submitted Streams may colocate (fuse) processing logic into the same processing element based upon flow analysis and current resource usage. This call instructs that this logic and others must be executed in the same processing element. - Parameters: - others – Processing logic such as a - Streamor- Sink. A single value can be passed or an iterable, such as a list of streams.- Returns: - This logic. - Return type: - self 
 - Resource tags for this processing logic. - Tags are a mechanism for differentiating and identifying resources that have different physical characteristics or logical uses. For example a resource (host) that has external connectivity for public data sources may be tagged ingest. - Processing logic can be associated with one or more tags to require running on suitably tagged resources. For example adding tags ingest and db requires that the processing element containing the callable that created the stream runs on a host tagged with both ingest and db. - A - Streamthat was not created directly with a Python callable cannot have tags associated with it. For example a stream that is a- union()of multiple streams cannot be tagged. In this case this method returns an empty frozenset which cannot be modified.- See https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.2.1/com.ibm.streams.admin.doc/doc/tags.html for more details of tags within IBM Streams. - Returns: - Set of resource tags, initially empty. - Return type: - set - Warning - If no resources exist with the required tags then job submission will fail. - New in version 1.7. 
 
- 
- 
class streamsx.topology.topology.Stream(topology, oport)¶
- The Stream class is the primary abstraction within a streaming application. It represents a potentially infinite series of tuples which can be operated upon to produce another stream, as in the case of - map(), or terminate a stream, as in the case of- for_each().- 
aliased_as(name)¶
- Create an alias of this stream. - Returns an alias of this stream with name name. When invocation of an SPL operator requires an - Expressionagainst an input port this can be used to ensure expression matches the input port alias regardless of the name of the actual stream.- Example use where the filter expression for a - FilterSPL operator uses- INto access input tuple attribute- seq:- s = ... s = s.aliased_as('IN') params = {'filter': op.Expression.expression('IN.seq % 4ul == 0ul')} f = op.Map('spl.relational::Filter', stream, params = params) - Parameters: - name (str) – Name for returned stream. - Returns: - Alias of this stream with - nameequal to name.- Return type: - Stream - New in version 1.9. 
 - 
as_json(force_object=True, name=None)¶
- Declares a stream converting each tuple on this stream into a JSON value. - The stream is typed as a - JSON stream.- Each tuple must be supported by JSONEncoder. - If force_object is True then each tuple that not a dict will be converted to a JSON object with a single key payload containing the tuple. Thus each object on the stream will be a JSON object. - If force_object is False then each tuple is converted to a JSON value directly using json package. - If this stream is already typed as a JSON stream then it will be returned (with no additional processing against it and force_object and name are ignored). - Parameters: - force_object (bool) – Force conversion of non dicts to JSON objects.
- name (str) – Name of the resulting stream. When None defaults to a generated name.
 - New in version 1.6.1. - Returns: - Stream containing the JSON representations of tuples on this stream. - Return type: - Stream 
 - 
as_string(name=None)¶
- Declares a stream converting each tuple on this stream into a string using str(tuple). - The stream is typed as a - string stream.- If this stream is already typed as a string stream then it will be returned (with no additional processing against it and name is ignored). - Parameters: - name (str) – Name of the resulting stream. When None defaults to a generated name. - New in version 1.6. - New in version 1.6.1: name parameter added. - Returns: - Stream containing the string representations of tuples on this stream. - Return type: - Stream 
 - 
autonomous()¶
- Starts an autonomous region for downstream processing. By default IBM Streams processing is executed in an autonomous region where any checkpointing of operator state is autonomous (independent) of other operators. - This method may be used to end a consistent region by starting an autonomous region. This may be called even if this stream is in an autonomous region. - Autonomous is not applicable when a topology is submitted to a STANDALONE contexts and will be ignored. - New in version 1.6. - Returns: - Stream whose subsequent downstream processing is in an autonomous region. - Return type: - Stream 
 - 
batch(size)¶
- Declares a tumbling window to support batch processing against this stream. - The number of tuples in the batch is defined by size. - If size is an - intthen it is the count of tuples in the batch. For example, with- size=10each batch will nominally contain ten tuples. Thus processing against the returned- Window, such as- aggregate()will be executed every ten tuples against the last ten tuples on the stream. For example the first three aggregations would be against the first ten tuples on the stream, then the next ten tuples and then the third ten tuples, etc.- If size is an datetime.timedelta then it is the duration of the batch using wallclock time. With a timedelta representing five minutes then the window contains any tuples that arrived in the last five minutes. Thus processing against the returned - Window, such as- aggregate()will be executed every five minutes tuples against the batch of tuples arriving in the last five minutes on the stream. For example the first three aggregations would be against any tuples on the stream in the first five minutes, then the next five minutes and then minutes ten to fifteen. A batch can contain no tuples if no tuples arrived on the stream in the defined duration.- Each tuple on the stream appears only in a single batch. - The number of tuples seen by processing against the returned window may be less than size (count or time based) when: - the stream is finite, the final batch may contain less tuples than the defined size,
- the stream is in a consistent region, drain processing will complete the current batch without waiting for it to batch to reach its nominal size.
 - Examples: - # Create batches against stream s of 100 tuples each w = s.batch(size=100) - # Create batches against stream s every five minutes w = s.last(size=datetime.timedelta(minutes=5)) - Parameters: - size – The size of each batch, either an int to define the number of tuples or datetime.timedelta to define the duration of the batch. - Returns: - Window allowing batch processing on this stream. - Return type: - Window - New in version 1.11. 
 - 
category¶
- Category for this processing logic. - An arbitrary application label allowing grouping of application elements by category. - Assign categories based on common function. For example, database is a common category that you can use to group all database sinks in an application. - A category is not required and defaults to - Nonemeaning no assigned category.- Streams console supports visualization based upon categories. - Raises: - TypeError– No directly associated processing logic.- Note - A category has no affect on the execution of the application. - New in version 1.9. 
 - 
colocate(others)¶
- Colocate this processing logic with others. - Colocating processing logic requires execution in the same Streams processing element (operating system process). - When a job is submitted Streams may colocate (fuse) processing logic into the same processing element based upon flow analysis and current resource usage. This call instructs that this logic and others must be executed in the same processing element. - Parameters: - others – Processing logic such as a - Streamor- Sink. A single value can be passed or an iterable, such as a list of streams.- Returns: - This logic. - Return type: - self 
 - 
end_low_latency()¶
- Returns a Stream that is no longer guaranteed to run in the same process as the calling stream. - Returns: - Stream 
 - 
end_parallel()¶
- Ends a parallel region by merging the channels into a single stream. - Returns: - Stream for which subsequent transformations are no longer parallelized. - Return type: - Stream 
 - 
filter(func, name=None)¶
- Filters tuples from this stream using the supplied callable func. - For each tuple on the stream - func(tuple)is called, if the return evaluates to- Truethe tuple will be present on the returned stream, otherwise the tuple is filtered out.- Parameters: - func – Filter callable that takes a single parameter for the tuple.
- name (str) – Name of the stream, defaults to a generated name.
 - If invoking - funcfor a tuple on the stream raises an exception then its processing element will terminate. By default the processing element will automatically restart though tuples may be lost.- If - funcis a callable object then it may suppress exceptions by return a true value from its- __exit__method. When an exception is suppressed no tuple is submitted to the filtered stream corresponding to the input tuple that caused the exception.- Returns: - A Stream containing tuples that have not been filtered out. - Return type: - Stream 
 - 
flat_map(func=None, name=None)¶
- Maps and flatterns each tuple from this stream into 0 or more tuples. - For each tuple on this stream - func(tuple)is called. If the result is not None then the the result is iterated over with each value from the iterator that is not None will be submitted to the return stream.- If the result is None or an empty iterable then no tuples are submitted to the returned stream. - Parameters: - func – A callable that takes a single parameter for the tuple.
If not supplied then a function equivalent to lambda tuple_ : tuple_is used. This is suitable when each tuple on this stream is an iterable to be flattened.
- name (str) – Name of the flattened stream, defaults to a generated name.
 - If invoking - funcfor a tuple on the stream raises an exception then its processing element will terminate. By default the processing element will automatically restart though tuples may be lost.- If - funcis a callable object then it may suppress exceptions by return a true value from its- __exit__method. When an exception is suppressed no tuples are submitted to the flattened and mapped stream corresponding to the input tuple that caused the exception.- Returns: - A Stream containing flattened and mapped tuples. - Return type: - Stream - Raises: - TypeError– if func does not return an iterator nor None- Changed in version 1.11: func is optional. 
- func – A callable that takes a single parameter for the tuple.
If not supplied then a function equivalent to 
 - 
for_each(func, name=None)¶
- Sends information as a stream to an external system. - For each tuple t on the stream - func(t)is called.- Parameters: - func – A callable that takes a single parameter for the tuple and returns None.
- name (str) – Name of the stream, defaults to a generated name.
 - If invoking - funcfor a tuple on the stream raises an exception then its processing element will terminate. By default the processing element will automatically restart though tuples may be lost.- If - funcis a callable object then it may suppress exceptions by return a true value from its- __exit__method. When an exception is suppressed no further processing occurs for the input tuple that caused the exception.- Returns: - Stream termination. - Return type: - streamsx.topology.topology.Sink - Changed in version 1.7: Now returns a - Sinkinstance.
 - 
isolate()¶
- Guarantees that the upstream operation will run in a separate processing element from the downstream operation - Returns: - Stream whose subsequent immediate processing will occur in a separate processing element. - Return type: - Stream 
 - 
last(size=1)¶
- Declares a slding window containing most recent tuples on this stream. - The number of tuples maintained in the window is defined by size. - If size is an int then it is the count of tuples in the window. For example, with - size=10the window always contains the last (most recent) ten tuples.- If size is an datetime.timedelta then it is the duration of the window. With a timedelta representing five minutes then the window contains any tuples that arrived in the last five minutes. - Parameters: - size – The size of the window, either an int to define the number of tuples or datetime.timedelta to define the duration of the window. - Examples: - # Create a window against stream s of the last 100 tuples w = s.last(size=100) - # Create a window against stream s of tuples # arrived on the stream in the last five minutes w = s.last(size=datetime.timedelta(minutes=5)) - Returns: - Window of the last (most recent) tuples on this stream. - Return type: - Window 
 - 
low_latency()¶
- The function is guaranteed to run in the same process as the upstream Stream function. All streams that are created from the returned stream are also guaranteed to run in the same process until end_low_latency() is called. - Returns: - Stream 
 - 
map(func=None, name=None, schema=None)¶
- Maps each tuple from this stream into 0 or 1 stream tuples. - For each tuple on this stream - result = func(tuple)is called. If result is not None then the result will be submitted as a tuple on the returned stream. If result is None then no tuple submission will occur.- By default the submitted tuple is - resultwithout modification resulting in a stream of picklable Python objects. Setting the schema parameter changes the type of the stream and modifies each- resultbefore submission.- Python- The default: result is submitted.
- String- A stream of strings:- str(result)is submitted.
- Json- A stream of JSON objects:- resultmust be convertable to a JSON object using json package.
- StreamSchema- A structured stream. result must be a dict or (Python) tuple. When a dict is returned the outgoing stream tuple attributes are set by name, when a tuple is returned stream tuple attributes are set by position.
 - Parameters: - func – A callable that takes a single parameter for the tuple.
If not supplied then a function equivalent to lambda tuple_ : tuple_is used.
- name (str) – Name of the mapped stream, defaults to a generated name.
- schema (StreamSchema) – Schema of the resulting stream.
 - If invoking - funcfor a tuple on the stream raises an exception then its processing element will terminate. By default the processing element will automatically restart though tuples may be lost.- If - funcis a callable object then it may suppress exceptions by return a true value from its- __exit__method. When an exception is suppressed no tuple is submitted to the mapped stream corresponding to the input tuple that caused the exception.- Returns: - A stream containing tuples mapped by func. - Return type: - Stream - New in version 1.7: schema argument added to allow conversion to a structured stream. - New in version 1.8: Support for submitting dict objects as stream tuples to a structured stream (in addition to existing support for tuple objects). - Changed in version 1.11: func is optional. 
 - 
multi_transform(func, name=None)¶
- Equivalent to calling - flat_map().- Deprecated since version 1.7: Replaced by - flat_map().
 - 
name¶
- Unique name of the stream. - When declaring a stream a name parameter can be provided. If the supplied name is unique within its topology then it will be used as-is, otherwise a variant will be provided that is unique within the topology. - If a name parameter was not provided when declaring a stream then the stream is assigned a unique generated name. - Returns: - Name of the stream. - Return type: - str - See also 
 - 
parallel(width, routing=<Routing.ROUND_ROBIN: 1>, func=None, name=None)¶
- Parallelizes the stream into width parallel channels. Tuples are routed to parallel channels such that an even distribution is maintained. Each parallel channel can be thought of as being assigned its own thread. As such, each parallelized stream function are separate instances and operate independently from one another. - parallel() will only parallelize the stream operations performed after the call to parallel() and before the call to - end_parallel().- Parallel regions aren’t required to have an output stream, and thus may be used as sinks. In other words, a parallel sink is created by calling parallel() and creating a sink operation. It is not necessary to invoke end_parallel() on parallel sinks. - Every call to end_parallel() must have a call to parallel() preceding it. - Parameters: - width (int) – Degree of parallelism.
- routing (Routing) – denotes what type of tuple routing to use.
- func – Optional function called when Routing.HASH_PARTITIONEDrouting is specified. The function provides an integer value to be used as the hash that determines the tuple channel routing.
- name (str) – The name to display for the parallel region.
 - Returns: - A stream for which subsequent transformations will be executed in parallel. - Return type: 
 - 
print(tag=None, name=None)¶
- Prints each tuple to stdout flushing after each tuple. - If tag is not None then each tuple has “tag: ” prepended to it before printing. - Parameters: - tag – A tag to prepend to each tuple.
- name (str) – Name of the resulting stream. When None defaults to a generated name.
 - Returns: - Stream termination. - Return type: - New in version 1.6.1: tag, name parameters. - Changed in version 1.7: Now returns a - Sinkinstance.
 - 
publish(topic, schema=None, name=None)¶
- Publish this stream on a topic for other Streams applications to subscribe to. A Streams application may publish a stream to allow other Streams applications to subscribe to it. A subscriber matches a publisher if the topic and schema match. - By default a stream is published using its schema. - A stream of - Python objectscan be subscribed to by other Streams Python applications.- If a stream is published setting schema to - Jsonthen it is published as a stream of JSON objects. Other Streams applications may subscribe to it regardless of their implementation language.- If a stream is published setting schema to - Stringthen it is published as strings Other Streams applications may subscribe to it regardless of their implementation language.- Supported values of schema are only - Jsonand- String.- Parameters: - topic (str) – Topic to publish this stream to.
- schema – Schema to publish. Defaults to the schema of this stream.
- name (str) – Name of the publish operator, defaults to a generated name.
 - Returns: - Stream termination. - Return type: - New in version 1.6.1: name parameter. - Changed in version 1.7: Now returns a - Sinkinstance.
 - Resource tags for this processing logic. - Tags are a mechanism for differentiating and identifying resources that have different physical characteristics or logical uses. For example a resource (host) that has external connectivity for public data sources may be tagged ingest. - Processing logic can be associated with one or more tags to require running on suitably tagged resources. For example adding tags ingest and db requires that the processing element containing the callable that created the stream runs on a host tagged with both ingest and db. - A - Streamthat was not created directly with a Python callable cannot have tags associated with it. For example a stream that is a- union()of multiple streams cannot be tagged. In this case this method returns an empty frozenset which cannot be modified.- See https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.2.1/com.ibm.streams.admin.doc/doc/tags.html for more details of tags within IBM Streams. - Returns: - Set of resource tags, initially empty. - Return type: - set - Warning - If no resources exist with the required tags then job submission will fail. - New in version 1.7. 
 - 
set_consistent(consistent_config)¶
- Indicates that the stream is the start of a consistent region. - Parameters: - consistent_config (consistent.ConsistentRegionConfig) – the configuration of the consistent region. - Returns: - Returns this stream. - Return type: - Stream - New in version 1.11. 
 - 
set_parallel(width, name=None)¶
- Indicates that the stream is the start of a parallel region. Should only be invoked on source operators. :param width: The degree of parallelism for the parallel region. :param name: Name of the parallel region. Defaults to the name of this stream. :type name: str - Returns: - Returns this stream. - Return type: - Stream - New in version 1.9. - Changed in version 1.11: name parameter added. 
 - 
sink(func, name=None)¶
- Equivalent to calling - for_each().- Deprecated since version 1.7: Replaced by - for_each().
 - 
transform(func, name=None)¶
- Equivalent to calling - map(func,name).- Deprecated since version 1.7: Replaced by - map().
 - 
union(streamSet)¶
- Creates a stream that is a union of this stream and other streams - Parameters: - streamSet – a set of Stream objects to merge with this stream - Returns: - Return type: - Stream 
 - 
view(buffer_time=10.0, sample_size=10000, name=None, description=None, start=False)¶
- Defines a view on a stream. - A view is a continually updated sampled buffer of a streams’s tuples. Views allow visibility into a stream from external clients such as the Streams console, Microsoft Excel or REST clients. - The view created by this method can be used by external clients and through the returned object after the topology is submitted. - When the stream contains Python objects then they are converted to JSON. - Parameters: - buffer_time – Specifies the buffer size to use measured in seconds.
- sample_size – Specifies the number of tuples to sample per second.
- name (str) – Name of the view. Name must be unique within the topology. Defaults to a generated name.
- description – Description of the view.
- start (bool) – Start buffering data when the job is submitted. If False then the view starts buffering data when the first remote client accesses it to retrieve data.
 - Returns: - View object which can be used to access the data when the topology is submitted. 
 
- 
- 
class streamsx.topology.topology.SubscribeConnection¶
- Connection mode between a subscriber and matching publishers. - New in version 1.9. - See also - 
Buffered= 1¶
- Buffered connection between a subscriber and and matching publishers. - With a buffered connection tuples from publishers are placed in a single queue owned by the subscriber. This allows a slower subscriber to handle brief spikes in tuples from publishers. - A subscriber can fully isolate itself from matching publishers by adding a - CongestionPolicythat drops tuples when the queue is full. In this case when the subscriber is not able to keep up with the tuple rate from all matching subscribers it will have a minimal effect on matching publishers.
 - 
Direct= 0¶
- Direct connection between a subscriber and and matching publishers. - When connected directly a slow subscriber will cause back-pressure against the publishers, forcing them to slow tuple processing to the slowest publisher. 
 
- 
- 
class streamsx.topology.topology.Topology(name=None, namespace=None, files=None)¶
- The Topology class is used to define data sources, and is passed as a parameter when submitting an application. Topology keeps track of all sources, sinks, and transformations within your application. - Submission of a Topology results in a Streams application that has the name namespace::name. - Parameters: - name (str) – Name of the topology. Defaults to a name dervied from the calling evironment if it can be determined, otherwise a random name.
- namespace (str) – Namespace of the topology. Defaults to a name dervied from the calling evironment if it can be determined, otherwise a random name.
 - 
include_packages¶
- set[str] – Python package names to be included in the built application. Any package in this list is copied into the bundle and made available at runtime to the Python callables used in the application. By default a - Topologywill automatically discover which packages and modules are required to be copied, this field may be used to add additional packages that were not automatically discovered.
 - 
exclude_packages¶
- set[str] – Python top-level package names to be excluded from the built application. Excluding a top-level packages excludes all sub-modules at any level in the package, e.g. sound excludes sound.effects.echo. Only the top-level package can be defined, e.g. sound rather than sound.filters. Behavior when adding a module within a package is undefined. When compiling the application using Anaconda this set is pre-loaded with Python packages from the Anaconda pre-loaded set. 
 - Package names in include_packages take precedence over package names in exclude_packages. - All declared streams in a Topology are available through their name using - topology[name]. The stream’s name is defined by- Stream.name()and will differ from the name parameter passed when creating the stream if the application uses duplicate names.- Changed in version 1.11: Declared streams available through - topology[name].- 
add_file_dependency(path, location)¶
- Add a file or directory dependency into an Streams application bundle. - Ensures that the file or directory at path on the local system will be available at runtime. - The file will be copied and made available relative to the application directory. Location determines where the file is relative to the application directory. Two values for location are supported etc and opt. The runtime path relative to application directory is returned. - The copy is made during the submit call thus the contents of the file or directory must remain availble until submit returns. - For example calling - add_file_dependency('/tmp/conf.properties', 'etc')will result in contents of the local file conf.properties being available at runtime at the path application directory/etc/conf.properties. This call returns- etc/conf.properties.- Python callables can determine the application directory at runtime with - get_application_directory(). For example the path above at runtime is- os.path.join(streamsx.ec.get_application_directory(), 'etc', 'conf.properties')- Parameters: - path (str) – Path of the file on the local system.
- location (str) – Location of the file in the bundle relative to the application directory.
 - Returns: - Path relative to application directory that can be joined at runtime with - get_application_directory.- Return type: - str - New in version 1.7. 
 - 
add_pip_package(requirement)¶
- Add a Python package dependency for this topology. - If the package defined by the requirement specifier is not pre-installed on the build system then the package is installed using pip and becomes part of the Streams application bundle (sab file). The package is expected to be available from pypi.org. - If the package is already installed on the build system then it is not added into the sab file. The assumption is that the runtime hosts for a Streams instance have the same Python packages installed as the build machines. This is always true for the Streaming Analytics service on IBM Cloud. - The project name extracted from the requirement specifier is added to - exclude_packagesto avoid the package being added by the dependency resolver. Thus the package should be added before it is used in any stream transformation.- When an application is run with trace level - infothe available Python packages on the running system are listed to application trace. This includes any packages added by this method.- Example: - topo = Topology() # Add dependency on pint package # and astral at version 0.8.1 topo.add_pip_package('pint') topo.add_pip_package('astral==0.8.1') - Parameters: - requirement (str) – Package requirements specifier. - Warning - Only supported when using the remote build service with the Streaming Analytics service. - New in version 1.9. 
 - 
checkpoint_period¶
- Enable checkpointing for the topology, and define the checkpoint period. - When checkpointing is enabled, the state of all stateful operators is saved periodically. If the operator restarts, its state is restored from the most recent checkpoint. - The checkpoint period is the frequency at which checkpoints will be taken. It can either be a - timedeltavalue or a floating point value in seconds. It must be at 0.001 seconds or greater.- A stateful operator is an operator whose callable is an instance of a Python callable class. - Examples: - # Create a topology that will checkpoint every thirty seconds topo = Topology() topo.checkpoint_period = 30.0 - # Create a topology that will checkpoint every two minutes topo = Topology() topo.checkpoint_period = datetime.timedelta(minutes=2) - New in version 1.11. 
 - 
create_submission_parameter(name, default=None, type_=None)¶
- Create a submission parameter. - A submission parameter is a handle for a value that is not defined until topology submission time. Submission parameters enable the creation of reusable topology bundles. - A submission parameter has a name. The name must be unique within the topology. - The returned parameter is a callable. Prior to submitting the topology, while constructing the topology, invoking it returns - None.- After the topology is submitted, invoking the parameter within the executing topology returns the actual submission time value (or the default value if it was not set at submission time). - Submission parameters may be used within functional logic. e.g.: - threshold = topology.create_submission_parameter('threshold', 100); # s is some stream of integers s = ... s = s.filter(lambda v : v > threshold()) - Note - The parameter (value returned from this method) is only supported within a lambda expression or a callable that is not a function. - The default type of a submission parameter’s value is a str (unicode on Python 2.7). When a default is specified the type of the value matches the type of the default. - If default is not set, then the type can be set with type_. - The types supported are - str,- int,- floatand- bool.- Topology submission behavior when a submission parameter lacking a default value is created and a value is not provided at submission time is defined by the underlying topology execution runtime. - Submission fails for contexts DISTRIBUTED,STANDALONE, andSTREAMING_ANALYTICS_SERVICE.
 - Parameters: - name (str) – Name for submission parameter.
- default – Default parameter when submission parameter is not set.
- type – Type of parameter value when default is not set. Supported values are str, int, float and bool.
 - New in version 1.9. 
- Submission fails for contexts 
 - 
name¶
- Name of the topology. - Returns: - Name of the topology. - Return type: - str 
 - 
namespace¶
- Namespace of the topology. - Returns: - Namespace of the topology. - Return type: - str 
 - 
source(func, name=None)¶
- Declare a source stream that introduces tuples into the application. - Typically used to create a stream of tuple from an external source, such as a sensor or reading from an external system. - Tuples are obtained from an iterator obtained from the passed iterable or callable that returns an iterable. - Each tuple that is not None from the iterator is present on the returned stream. - Each tuple is a Python object and must be picklable to allow execution of the application to be distributed across available resources in the Streams instance. - Parameters: - func (callable) – An iterable or a zero-argument callable that returns an iterable of tuples.
- name (str) – Name of the stream, defaults to a generated name.
 - Exceptions raised by - funcor its iterator will cause its processing element will terminate.- If - funcis a callable object then it may suppress exceptions by return a true value from its- __exit__method.- Suppressing an exception raised by - func.__iter__causes the source to be empty, no tuples are submitted to the stream.- Suppressing an exception raised by - __next__on the iterator results in no tuples being submitted for that call to- __next__. Processing continues with calls to- __next__to fetch subsequent tuples.- Returns: - A stream whose tuples are the result of the iterable obtained from func. - Return type: - Stream 
 - 
subscribe(topic, schema=<CommonSchema.Python: <streamsx.topology.schema.StreamSchema object>>, name=None, connect=None, buffer_capacity=None, buffer_full_policy=None)¶
- Subscribe to a topic published by other Streams applications. A Streams application may publish a stream to allow other Streams applications to subscribe to it. A subscriber matches a publisher if the topic and schema match. - By default a stream is subscribed as - Pythonobjects which connects to streams published to topic by Python Streams applications.- JSON streams are subscribed to using schema - Json. Each tuple on the returned stream will be a Python dictionary object created by- json.loads(tuple). A Streams application publishing JSON streams may have been implemented in any programming language supported by Streams.- String streams are subscribed to using schema - String. Each tuple on the returned stream will be a Python string object. A Streams application publishing JSON streams may have been implemented in any programming language supported by Streams.- Subscribers can ensure they do not slow down matching publishers by using a buffered connection with a buffer full policy that drops tuples. - Parameters: - topic (str) – Topic to subscribe to.
- schema (StreamSchema) – schema to subscribe to.
- name (str) – Name of the subscribed stream, defaults to a generated name.
- connect (SubscribeConnection) – How subscriber will be connected to matching publishers. Defaults to Directconnection.
- buffer_capacity (int) – Buffer capacity in tuples when connect is set to Buffered. Defaults to 1000 when connect is Buffered. Ignored when connect is None or Direct.
- buffer_full_policy (CongestionPolicy) – Policy when a pulished tuple arrives and the subscriber’s buffer is full. Defaults to Wait when connect is Buffered. Ignored when connect is None or Direct.
 - Returns: - A stream whose tuples have been published to the topic by other Streams applications. - Return type: - Changed in version 1.9: connect, buffer_capacity and buffer_full_policy parameters added. 
 
- 
class streamsx.topology.topology.View(name)¶
- The View class provides access to a continuously updated sampling of data items on a Stream after submission. A view object is produced by the view method, and will access data items from the stream on which it is invoked. - For example, a View object could be created and used as follows: - >>> topology = Topology() >>> rands = topology.source(lambda: random.random()) >>> view = rands.view() >>> submit(ContextTypes.DISTRIBUTED, topology) >>> queue = view.start_data_fetch() >>> for val in iter(queue.get, None): ... print(val) ... 0.6527 0.1963 0.0512 - 
initialize_rest()¶
- Used to initialize the View object on first use. 
 - 
start_data_fetch()¶
- Starts a background thread which begins accessing data from the remote Stream. The data items are placed asynchronously in a queue, which is returned from this method. - Returns: - A Queue object which is populated with the data items of the stream. 
 - 
stop_data_fetch()¶
- Terminates the background thread fetching stream data items. 
 
- 
- 
class streamsx.topology.topology.Window(stream, window_type)¶
- Declaration of a window of tuples on a Stream. - A Window can be passed as the input of an SPL operator invocation to indicate the operator’s input port is windowed. - Example invoking the SPL Aggregate operator with a sliding window of the last two minutes, triggering every five tuples: - win = s.last(datetime.timedelta(minutes=2)).trigger(5) agg = op.Map('spl.relational::Aggregate', win, schema = 'tuple<uint64 sum, uint64 max>') agg.sum = agg.output('Sum(val)') agg.max = agg.output('Max(val)') - 
aggregate(function, name=None)¶
- Aggregates the contents of the window when the window is triggered. - Upon a window trigger, the supplied function is passed a list containing the contents of the window: - function(items). The order of the window items in the list are the order in which they were each received by the window. If the function’s return value is not None then the result will be submitted as a tuple on the returned stream. If the return value is None then no tuple submission will occur. For example, a window that calculates a moving average of the last 10 tuples could be written as follows:- win = s.last(10).trigger(1) moving_averages = win.aggregate(lambda tuples: sum(tuples)/len(tuples)) - Note - If a tumbling ( - batch()) window’s stream is finite then a final aggregation is performed if the window is not empty. Thus- functionmay be passed fewer tuples for a window sized using a count. For example a stream with 105 tuples and a batch size of 25 tuples will perform four aggregations with 25 tuples each and a final aggregation of 5 tuples.- Parameters: - function – The function which aggregates the contents of the window
- name (str) – The name of the returned stream. Defaults to a generated name.
 - Returns: - A Stream of the returned values of the supplied function. - Return type: - Warning - In Python 3.5 or later if the stream being aggregated has a structured schema that contains a - blobtype then any- blobvalue will not be maintained in the window. Instead its- memoryviewobject will have been released. If the- blobvalue is required then perform a- map()transformation (without setting- schema) copying any required blob value in the tuple using- memoryview.tobytes().- New in version 1.8. - Changed in version 1.11: Support for aggregation of streams with structured schemas. 
 - 
trigger(when=1)¶
- Declare a window with this window’s size and a trigger policy. - When the window is triggered is defined by when. - If when is an int then the window is triggered every when tuples. For example, with - when=5the window will be triggered every five tuples.- If when is an datetime.timedelta then it is the period of the trigger. With a timedelta representing one minute then the window is triggered every minute. - By default, when trigger has not been called on a Window it triggers for every tuple inserted into the window (equivalent to - when=1).- Parameters: - when – The size of the window, either an int to define the number of tuples or datetime.timedelta to define the duration of the window. - Returns: - Window that will be triggered. - Return type: - Window - Warning - A trigger is only supported for a sliding window such as one created by - last().
 
-