streamsx.topology.topology module

Python API to allow creation of streaming applications for IBM Streams & Streaming Analytics service on Bluemix.

Overview

IBM® Streams is an advanced analytic platform that allows user-developed applications to quickly ingest, analyze and correlate information as it arrives from thousands of real-time sources. Streams can handle very high data throughput rates, millions of events or messages per second.

With this API Python developers can build streaming applications that can be executed using IBM Streams, including the processing being distributed across multiple computing resources (hosts or machines) for scalability.

Topology

A Topology declares a graph of streams and operations against tuples (data items) on those streams.

After being declared, a Topology is submitted to be compiled into a Streams application bundle (sab file) and then executed. The sab file is a self contained bundle that can be executed in a distributed Streams instance either using the Streaming Analytics service on IBM Bluemix cloud platform or an on-premise IBM Streams installation.

The compilation step invokes the Streams compiler to produce a bundle. This effectively, from a Python point of view, produces a runnable version of the Python topology that includes application specific Python C extensions to optimize performance.

The Streams runtime distributes the application’s operations across the resources available in the instance.

Note

Topology represents a declaration of a streaming application that will be executed by a Streams instance as a job, either using the Streaming Analytics service on IBM Bluemix cloud platform or an on-premises distributed instance. Topology does not represent a running application, so an instance of Stream class does not contain the tuples, it is only a declaration of a stream.

Stream

A Stream can be an infinite sequence of tuples, such as a stream for a traffic flow sensor. Alternatively, a stream can be finite, such as a stream that is created from the contents of a file. When a streams processing application contains infinite streams, the application runs continuously without ending.

A stream has a schema that defines the type of each tuple on the stream. The schema for a Python Topology is either:

  • Python - A tuple may be any Python object. This is the default.
  • String - Each tuple is a Unicode string.
  • Binary - Each tuple is a blob.
  • Json - Each tuple is a Python dict that can be expressed as a JSON object.
  • Structured - A stream that is an ordered list of attributes, with each attribute having a fixed type (e.g. float64 or int32) and a name.

Stream processing

Callables

A stream is processed to produce zero or more transformed streams, such as filtering a stream to drop unwanted tuples, producing a stream that only contains the required tuples.

Streaming processing is per tuple based, as each tuple is submitted to a stream consuming operators have their processing logic invoked for that tuple.

A functional operator is declared by methods on Stream such as map() which maps the tuples on its input stream to tuples on its output stream. Stream uses a functional model where each stream processing operator is defined in terms a Python callable that is invoked passing input tuples and whose return defines what output tuples are submitted for downstream processing.

The Python callable used for functional processing in this API may be:

  • A Python lambda function.
  • A Python function.
  • An instance of a Python callable class.

For example a stream words containing only string objects can be processed by a filter() using a lambda function:

# Filter the stream so it only contains words starting with py
pywords = words.filter(lambda word : tuple.startswith('py'))

Stateful operations

Use of a class instance allows the operation to be stateful by maintaining state in instance attributes across invocations.

Note

For future compatibility instances should ensure that the object’s state can be pickled. See https://docs.python.org/3.5/library/pickle.html#handling-stateful-objects

Initialization and shutdown

Execution of a class instance effectively run in a context manager so that an instance’s __enter__ method is called when the processing element containing the instance is initialized and its __exit__ method called when the processing element is stopped. To take advantage of this the class must define both __enter__ and __exit__ methods.

Note

Since an instance of a class is passed to methods such as map() __init__ is only called when the topology is declared, not at runtime. Initialization at runtime, such as opening connections, occurs through the __enter__ method.

Example of using __enter__ to create custom metrics:

import streamsx.ec as ec

class Sentiment(object):
    def __init__(self):
        pass

    def __enter__(self):
        self.positive_metric = ec.CustomMetric(self, "positiveSentiment")
        self.negative_metric = ec.CustomMetric(self, "negativeSentiment")

    def __exit__(self, exc_type, exc_value, traceback):
        pass

SPL operators

In addition an application declared by Topology can include stream processing defined by SPL primitive or composite operators. This allows reuse of adapters and analytics provided by IBM Streams, open source and third-party SPL toolkits.

class streamsx.topology.topology.Routing

Bases: enum.Enum

Defines how tuples are routed to channels in a parallel region.

A parallel region is started by parallel() and ended with end_parallel() or for_each().

HASH_PARTITIONED = <Routing.HASH_PARTITIONED: 3>

Tuples are routed based upon a hash value so that tuples with the same hash and thus same value are always routed to the same channel. When a hash function is specified it is passed the tuple and the return value is the hash. When no hash function is specified then hash(tuple) is used.

Each tuple is only sent to a single channel.

Warning

A consistent hash function is required to guarantee that a tuple with the same value is always routed to the same channel. hash() is not consistent in that for types str, bytes and datetime objects are “salted” with an unpredictable random value (Python 3.5). Thus if the processing element is restarted channel routing for a hash based upon a str, bytes or datetime will change. In addition code executing in the channels can see a different hash value to other channels and the execution that routed the tuple due to being in different processing elements.

KEY_PARTITIONED = <Routing.KEY_PARTITIONED: 2>
ROUND_ROBIN = <Routing.ROUND_ROBIN: 1>

Tuples are routed to maintain an even distribution of tuples to the channels.

Each tuple is only sent to a single channel.

class streamsx.topology.topology.Stream(topology, oport)

Bases: object

The Stream class is the primary abstraction within a streaming application. It represents a potentially infinite series of tuples which can be operated upon to produce another stream, as in the case of map(), or terminate a stream, as in the case of for_each().

as_string()

Declares a stream converting each tuple on this stream into a string using str(tuple).

The stream is typed as a stream of strings.

New in version 1.6.

Returns:Stream containing the string representations of tuples on this stream.
Return type:Stream
autonomous()

Starts an autonomous region for downstream processing. By default IBM Streams processing is executed in an autonomous region where any checkpointing of operator state is autonomous (independent) of other operators.

This method may be used to end a consistent region by starting an autonomous region. This may be called even if this stream is in an autonomous region.

Autonomous is not applicable when a topology is submitted to a STANDALONE contexts and will be ignored.

New in version 1.6.

Returns:Stream whose subsequent downstream processing is in an autonomous region.
Return type:Stream
end_low_latency()

Returns a Stream that is no longer guaranteed to run in the same process as the calling stream.

Returns:Stream
end_parallel()

Ends a parallel region by merging the channels into a single stream.

Returns:Stream for which subsequent transformations are no longer parallelized.
Return type:Stream
filter(func, name=None)

Filters tuples from this stream using the supplied callable func.

For each tuple on the stream func(tuple) is called, if the return evaluates to True the tuple will be present on the returned stream, otherwise the tuple is filtered out.

Parameters:func – Filter callable that takes a single parameter for the tuple.
Returns:A Stream containing tuples that have not been filtered out.
Return type:Stream
flat_map(func, name=None)

Maps and flatterns each tuple from this stream into 0 or more tuples.

For each tuple on this stream func(tuple) is called. If the result is not None then the the result is iterated over with each value from the iterator that is not None will be submitted

to the return stream.

If the result is None or an empty iterable then no tuples are submitted to the returned stream.

Parameters:func – A callable that takes a single parameter for the tuple.
Returns:A Stream containing transformed tuples.
Return type:Stream
Raises:TypeError – if func does not return an iterator nor None
for_each(func, name=None)

Sends information as a stream to an external system.

For each tuple on the stream func(tuple) is called.

Parameters:func – A callable that takes a single parameter for the tuple and returns None.
Returns:None
isolate()

Guarantees that the upstream operation will run in a separate processing element from the downstream operation

Returns:Stream whose subsequent immediate processing will occur in a separate processing element.
Return type:Stream
low_latency()

The function is guaranteed to run in the same process as the upstream Stream function. All streams that are created from the returned stream are also guaranteed to run in the same process until end_low_latency() is called.

Returns:Stream
map(func, name=None)

Maps each tuple from this stream into 0 or 1 tuples.

For each tuple on this stream func(tuple) is called. If the result is not None then the result will be submitted as a tuple on the returned stream. If the result is None then no tuple submission will occur.

Parameters:func – A callable that takes a single parameter for the tuple.
Returns:A stream containing tuples mapped by func.
Return type:Stream
multi_transform(func, name=None)

Equivalent to calling the flat_map() function

parallel(width, routing=<Routing.ROUND_ROBIN: 1>, func=None)

Parallelizes the stream into width parallel channels. Tuples are routed to parallel channels such that an even distribution is maintained. Each parallel channel can be thought of as being assigned its own thread. As such, each parallelized stream function are separate instances and operate independently from one another.

parallel() will only parallelize the stream operations performed after the call to parallel() and before the call to end_parallel().

Parallel regions aren’t required to have an output stream, and thus may be used as sinks. In other words, a parallel sink is created by calling parallel() and creating a sink operation. It is not necessary to invoke end_parallel() on parallel sinks.

Nested parallelism is not currently supported. A call to parallel() should never be made immediately after another call to parallel() without having an end_parallel() in between.

Every call to end_parallel() must have a call to parallel() preceding it.

Parameters:
  • width (int) – Degree of parallelism.
  • routing (Routing) – denotes what type of tuple routing to use.
  • func – Optional function called when Routing.HASH_PARTITIONED routing is specified. The function provides an integer value to be used as the hash that determines the tuple channel routing.
Returns:

A stream for which subsequent transformations will be executed in parallel.

Return type:

Stream

print()

Prints each tuple to stdout flushing after each tuple.

Returns:None
publish(topic, schema=<CommonSchema.Python: <streamsx.topology.schema.StreamSchema object at 0x7f5bc3b137f0>>)

Publish this stream on a topic for other Streams applications to subscribe to. A Streams application may publish a stream to allow other Streams applications to subscribe to it. A subscriber matches a publisher if the topic and schema match.

By default a stream is published as Python objects (CommonSchema.Python) which allows other Streams Python applications to subscribe to the stream using the same topic.

If a stream is published with CommonSchema.Json then it is published as JSON, other Streams applications may subscribe to it regardless of their implementation language. A Python tuple is converted to JSON using json.dumps(tuple, ensure_ascii=False).

If a stream is published with CommonSchema.String then it is published as strings, other Streams applications may subscribe to it regardless of their implementation language. A Python tuple is converted to a string using str(tuple).

Parameters:
  • topic (str) – Topic to publish this stream to.
  • schema – Schema to publish. Defaults to CommonSchema.Python representing Python objects.
Returns:

None

sink(func, name=None)

Equivalent to calling for_each().

transform(func, name=None)

Equivalent to calling map().

union(streamSet)

Creates a stream that is a union of this stream and other streams

Parameters:streamSet – a set of Stream objects to merge with this stream
Returns:
Return type:Stream
view(buffer_time=10.0, sample_size=10000, name=None)

Defines a view on a stream. Returns a view object which can be used to access the data :param buffer_time The window of time over which tuples will be :param name Name of the view. Name must be unique within the topology. Defaults to a generated name.

class streamsx.topology.topology.Topology(name=None, namespace=None, files=None)

Bases: object

The Topology class is used to define data sources, and is passed as a parameter when submitting an application. Topology keeps track of all sources, sinks, and data operations within your application.

Submission of a Topology results in a Streams application that has the name namespace::name.

Parameters:
  • name (str) – Name of the topology. Defaults to a name dervied from the calling evironment if it can be determined, otherwise a random name.
  • namespace (str) – Namespace of the topology. Defaults to a name dervied from the calling evironment if it can be determined, otherwise a random name.
Instance variables:

include_packages(set): Python package names to be included in the built application.

exclude_packages(set): Python top-level package names to be excluded from the built application. Excluding a top-level packages excludes all sub-modules at any level in the package, e.g. sound excludes sound.effects.echo. Only the top-level package can be defined, e.g. sound rather than sound.filters. Behavior when adding a module within a package is undefined. When compiling the application using Anaconda this set is pre-loaded with Python packages from the Anaconda pre-loaded set.

Package names in include_packages take precedence over package names in exclude_packages.

name

Return the name of the topology. Returns:str:Name of the topology.

namespace

Return the namespace of the topology. Returns:str:Namespace of the topology.

source(func, name=None)

Declare a source stream that introduces tuples into the application.

Typically used to create a stream of tuple from an external source, such as a sensor or reading from an external system.

Tuples are obtained from an iterator obtained from the passed iterable or callable that returns an iterable.

Each tuple that is not None from the iterator is present on the returned stream.

Each tuple is a Python object and must be picklable to allow execution of the application to be distributed across available resources in the Streams instance.

Parameters:
  • func (callable) – An iterable or a zero-argument callable that returns an iterable of tuples.
  • name (str) – Name of the stream, defaults to a generated name.
Returns:

A stream whose tuples are the result of the iterable obtained from func.

Return type:

Stream

subscribe(topic, schema=<CommonSchema.Python: <streamsx.topology.schema.StreamSchema object at 0x7f5bc3b137f0>>)

Subscribe to a topic published by other Streams applications. A Streams application may publish a stream to allow other Streams applications to subscribe to it. A subscriber matches a publisher if the topic and schema match.

By default a stream is subscribed as Python objects which connects to streams published to topic by Python Streams applications.

JSON streams are subscribed to using schema Json. Each tuple on the returned stream will be a Python dictionary object created by json.loads(tuple). A Streams application publishing JSON streams may have been implemented in any programming language supported by Streams.

String streams are subscribed to using schema String. Each tuple on the returned stream will be a Python string object. A Streams application publishing JSON streams may have been implemented in any programming language supported by Streams.

Parameters:
  • topic (str) – Topic to subscribe to.
  • schema (StreamSchema) – schema to subscribe to.
Returns:

A stream whose tuples have been published to the topic by other Streams applications.

Return type:

Stream

class streamsx.topology.topology.View(name)

Bases: object

A View is an object which is associated with a Stream, and provides access to the items on the stream.

initialize_rest()
set_streams_connection(sc)
set_streams_connection_config(conf)
start_data_fetch()
stop_data_fetch()