streamsx.topology.composite¶
Composite transformations.
New in version 1.14.
Module contents¶
Classes
Composite transformations support a single logical transformation being a composite of one or more basic transformations. |
|
Abstract composite for each transformation. |
|
Abstract composite map transformation. |
|
Abstract composite source. |
-
class
streamsx.topology.composite.Composite¶ Bases:
abc.ABCComposite transformations support a single logical transformation being a composite of one or more basic transformations. Composites encapsulate complex transformations for being re-used.
A composite transformation is implemented as a sub-class of
Source,MaporForEachwhosepopulatemethod populates the topology with the required basic transformations. For example aSourcecomposite might have usesource()followed by afilter()to filter out unwanted events and then amap()to parse the event into a structured schema.Composites may use other composites during
populate. Thepopulatefunction implements the specific transformations of a composite.Composites can control how the basic transformations are visually represented. By default any transformations within a composite are grouped visually. A composite may alter this using these attributes of the composite instance:
kind- Sets the name of operator kind for a group or single operator. Defaults to a combination of the module and class name of the composite, e.g.streamsx.standard.utility::Sequence. Set to a false value to disable any modification of the visual representation of the composite’s transformations.group- Set to a false value to disable any grouping of multiple transformations. Defaults toTrueto enable grouping.
The values of
kindandgroupare checked after the expansion of the composite usingpopulate.
-
class
streamsx.topology.composite.Source¶ Bases:
streamsx.topology.composite.CompositeAbstract composite source.
An instance of a subclass can be passed to
source()to create a source stream that is composed of one or more basic transformations, which must be implemented by thepopulate()method of the subclass.Example assuming
RawTweetsis Python iterable that produces raw tweets:class Tweets(streamsx.topology.composite.Source): def __init__(self, track): self.track = track def populate(self, topology, name, **options): # get all the tweets tweets = topology.source(RawTweets(track=self.track), name=name) # filter so that only with a message are returned return tweets.filter(lambda tweet : tweet['text'])
This class can then be used as follows:
topo = Topology() gf_tweets = topo.source(Tweets(track=['glutenfree', 'gf']))
-
abstract
populate(topology, name, **options)¶ Populate the topology with this composite source. Subclasses must implement the
populatefunction.populateis called when the composite is added to the topology with:topo = Topology() source_stream = topo.source(mySourceComposite)
-
abstract
-
class
streamsx.topology.composite.Map¶ Bases:
streamsx.topology.composite.CompositeAbstract composite map transformation.
An instance of a subclass can be passed to
map()to create a stream that is composed of one or more basic transformations of an input stream, which must be implemented by thepopulate()method of the subclass.Example:
class WordCount(streamsx.topology.composite.Map): def __init__(self, period, update): self.period = period self.update = update def populate(self, topology, stream, schema, name, **options): words = stream.flat_map(lambda line : line.split()) win = words.last(size=self.period).trigger(self.update).partition(lambda s : s) return win.aggregate(lambda values : (values[0], len(values)))
-
abstract
populate(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation. Subclasses must implement the
populatefunction.populateis called when the composite is added to the topology with:transformed_stream = input_stream.map(myMapComposite)
- Parameters
topology (
Topology) – Topology containing the composite map.stream (
Stream) – Stream to be transformed.schema (
Union[StreamSchema,CommonSchema,str,NamedTuple]) – Schema passed intomap.name (
Optional[str]) – Name passed intomap.**options – Future options passed to
map.
- Returns
Single stream representing the transformation of stream.
- Return type
-
abstract
-
class
streamsx.topology.composite.ForEach¶ Bases:
streamsx.topology.composite.CompositeAbstract composite for each transformation.
An instance of a subclass can be passed to
for_each()to create a sink (stream termination) that is composed of one or more basic transformations of an input stream. These transformations and the sink function must be implemented by thepopulate()method of the subclass.-
abstract
populate(topology, stream, name, **options)¶ Populate the topology with this composite for each transformation. Subclasses must implement the
populatefunction.populateis called when the composite is added to the topology with:sink = input_stream.for_each(myForEachComposite)
-
abstract