streamsx.topology.composite¶
Composite transformations.
New in version 1.14.
Module contents¶
Classes
Composite transformations support a single logical transformation being a composite of one or more basic transformations. |
|
Abstract composite for each transformation. |
|
Abstract composite map transformation. |
|
Abstract composite source. |
-
class
streamsx.topology.composite.
Composite
¶ Bases:
abc.ABC
Composite transformations support a single logical transformation being a composite of one or more basic transformations. Composites encapsulate complex transformations for being re-used.
A composite transformation is implemented as a sub-class of
Source
,Map
orForEach
whosepopulate
method populates the topology with the required basic transformations. For example aSource
composite might have usesource()
followed by afilter()
to filter out unwanted events and then amap()
to parse the event into a structured schema.Composites may use other composites during
populate
. Thepopulate
function implements the specific transformations of a composite.Composites can control how the basic transformations are visually represented. By default any transformations within a composite are grouped visually. A composite may alter this using these attributes of the composite instance:
kind
- Sets the name of operator kind for a group or single operator. Defaults to a combination of the module and class name of the composite, e.g.streamsx.standard.utility::Sequence
. Set to a false value to disable any modification of the visual representation of the composite’s transformations.group
- Set to a false value to disable any grouping of multiple transformations. Defaults toTrue
to enable grouping.
The values of
kind
andgroup
are checked after the expansion of the composite usingpopulate
.
-
class
streamsx.topology.composite.
Source
¶ Bases:
streamsx.topology.composite.Composite
Abstract composite source.
An instance of a subclass can be passed to
source()
to create a source stream that is composed of one or more basic transformations, which must be implemented by thepopulate()
method of the subclass.Example assuming
RawTweets
is Python iterable that produces raw tweets:class Tweets(streamsx.topology.composite.Source): def __init__(self, track): self.track = track def populate(self, topology, name, **options): # get all the tweets tweets = topology.source(RawTweets(track=self.track), name=name) # filter so that only with a message are returned return tweets.filter(lambda tweet : tweet['text'])
This class can then be used as follows:
topo = Topology() gf_tweets = topo.source(Tweets(track=['glutenfree', 'gf']))
-
abstract
populate
(topology, name, **options)¶ Populate the topology with this composite source. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:topo = Topology() source_stream = topo.source(mySourceComposite)
-
abstract
-
class
streamsx.topology.composite.
Map
¶ Bases:
streamsx.topology.composite.Composite
Abstract composite map transformation.
An instance of a subclass can be passed to
map()
to create a stream that is composed of one or more basic transformations of an input stream, which must be implemented by thepopulate()
method of the subclass.Example:
class WordCount(streamsx.topology.composite.Map): def __init__(self, period, update): self.period = period self.update = update def populate(self, topology, stream, schema, name, **options): words = stream.flat_map(lambda line : line.split()) win = words.last(size=self.period).trigger(self.update).partition(lambda s : s) return win.aggregate(lambda values : (values[0], len(values)))
-
abstract
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:transformed_stream = input_stream.map(myMapComposite)
- Parameters
topology (
Topology
) – Topology containing the composite map.stream (
Stream
) – Stream to be transformed.schema (
Union
[StreamSchema
,CommonSchema
,str
,NamedTuple
]) – Schema passed intomap
.name (
Optional
[str
]) – Name passed intomap
.**options – Future options passed to
map
.
- Returns
Single stream representing the transformation of stream.
- Return type
-
abstract
-
class
streamsx.topology.composite.
ForEach
¶ Bases:
streamsx.topology.composite.Composite
Abstract composite for each transformation.
An instance of a subclass can be passed to
for_each()
to create a sink (stream termination) that is composed of one or more basic transformations of an input stream. These transformations and the sink function must be implemented by thepopulate()
method of the subclass.-
abstract
populate
(topology, stream, name, **options)¶ Populate the topology with this composite for each transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:sink = input_stream.for_each(myForEachComposite)
-
abstract