4.0 API features: User-defined parallelism
Edit meIf a particular portion of your graph is causing congestion because the application needs additional throughput at that point, you can define a parallel region in your graph. A parallel region enables the application to use multiple channels to run operations (such as filtering or transforming data) concurrently.
In the Developing your first application section, you created a topology and defined a pseudo temperature source. In this example, you want to convert all of the source tuples from Celsius to Kelvin.
To achieve this, add a function called convertToKelvin()
to the application you created in the Developing your first application section.
Your application should look like this:
The following code should be in the temperature_sensor.py file:
from streamsx.topology.topology import Topology
import streamsx.topology.context
import temperature_sensor_functions
def main():
topo = Topology("temperature_sensor")
source = topo.source(temperature_sensor_functions.readings)
kelvin = source.map(temperature_sensor_functions.convertToKelvin)
kelvin.sink(print)
streamsx.topology.context.submit("STANDALONE", topo.graph)
if __name__ == '__main__':
main()
The following code should be in the temperature_sensor_functions.py file:
import random
def readings():
while True:
yield random.gauss(0.0, 1.0)
def convertToKelvin(tuple) :
return tuple + 273.15
Converting a temperature reading from Celsius to Kelvin is not a resource-intensive task. However, you can use this example to see how using a parallel region can help distribute processing across resources when an operation is resource-intensive or inefficient and is causing a bottleneck in your application.
To parallelize an operation, invoke .parallel()
on a Stream
where you want to process data in parallel.
Restriction: Nested parallelism is not supported. You cannot invoke parallel()
on a Stream
that is already parallelized. If you do, your application will throw an exception.
To end parallel processing, invoke .end_parallel()
on the parallelized Stream
. When you invoke end_parallel()
subsequent operations on the Stream
that is returned by end_parallel()
are not processed in parallel. For example, call .end_parallel()
on the kelvin
Stream
before you call sink(print)
.
The above example becomes:
def main():
topo = Topology("temperature_sensor")
source = topo.source(temperature_sensor_functions.readings)
kelvin = source.parallel(4).map(temperature_sensor_functions.convertToKelvin)
end = kelvin.end_parallel()
end.sink(print)
streamsx.topology.context.submit("STANDALONE", topo.graph)
Any operations that are performed on the parallelized Stream
occur in parallel to the degree that is specified in the .parallel()
function. In the example above, you specified 4, which means that four channels process the data in the parallel region on the graph.