streamsx.topology.tester module¶
Testing support for streaming applications.
Allows testing of a streaming application by creation conditions on streams that are expected to become valid during the processing. Tester is designed to be used with Python’s unittest module.
A complete application may be tested or fragments of it, for example a sub-graph can be tested in isolation that takes input data and scores it using a model.
Supports execution of the application on
STREAMING_ANALYTICS_SERVICE
,
DISTRIBUTED
or STANDALONE
.
A Tester
instance is created and associated with the Topology
to be tested.
Conditions are then created against streams, such as a stream must receive 10 tuples using
tuple_count()
.
Here is a simple example that tests a filter correctly only passes tuples with values greater than 5:
import unittest
from streamsx.topology.topology import Topology
from streamsx.topology.tester import Tester
class TestSimpleFilter(unittest.TestCase):
def setUp(self):
# Sets self.test_ctxtype and self.test_config
Tester.setup_streaming_analytics(self)
def test_filter(self):
# Declare the application to be tested
topology = Topology()
s = topology.source([5, 7, 2, 4, 9, 3, 8])
s = s.filter(lambda x : x > 5)
# Create tester and assign conditions
tester = Tester(topology)
tester.contents(s, [7, 9, 8])
# Submit the application for test
# If it fails an AssertionError will be raised.
tester.test(self.test_ctxtype, self.test_config)
A stream may have any number of conditions and any number of streams may be tested.
A local_check()
is supported where a method of the
unittest class is executed once the job becomes healthy. This performs
checks from the context of the Python unittest class, such as
checking external effects of the application or using the REST api to
monitor the application.
- A test fails-fast if any of the following occur:
- Any condition fails. E.g. a tuple failing a
tuple_check()
. - The
local_check()
(if set) raises an error. - The job for the test:
- Fails to become healthy.
- Becomes unhealthy during the test run.
- Any processing element (PE) within the job restarts.
- Any condition fails. E.g. a tuple failing a
A test timeouts if it does not fail but its conditions do not become valid. The timeout is not fixed as an absolute test run time, but as a time since “progress” was made. This can allow tests to pass when healthy runs are run in a constrained environment that slows execution. For example with a tuple count condition of ten, progress is indicated by tuples arriving on a stream, so that as long as gaps between tuples are within the timeout period the test remains running until ten tuples appear.
Note
The test timeout value is not configurable.
Warning
Python 3.5 and Streaming Analytics service or IBM Streams 4.2 or later is required when using Tester.
-
class
streamsx.topology.tester.
Tester
(topology)¶ Bases:
object
Testing support for a Topology.
Allows testing of a Topology by creating conditions against the contents of its streams.
Conditions may be added to a topology at any time before submission.
If a topology is submitted directly to a context then the graph is not modified. This allows testing code to be inserted while the topology is being built, but not acted upon unless the topology is submitted in test mode.
If a topology is submitted through the test method then the topology may be modified to include operations to ensure the conditions are met.
Warning
For future compatibility applications under test should not include intended failures that cause a processing element to stop or restart. Thus, currently testing is against expected application behavior.
Parameters: topology – Topology to be tested. -
add_condition
(stream, condition)¶ Add a condition to a stream.
Conditions are normally added through
tuple_count()
,contents()
ortuple_check()
.This allows an additional conditions that are implementations of
Condition
.Parameters: Returns: stream
Return type:
-
contents
(stream, expected, ordered=True)¶ Test that a stream contains the expected tuples.
Parameters: - stream (Stream) – Stream to be tested.
- expected (list) – Sequence of expected tuples.
- ordered (bool) – True if the ordering of received tuples must match expected.
Returns: stream
Return type:
-
local_check
(callable)¶ Perform local check while the application is being tested.
A call to callable is made after the application under test is submitted and becomes healthy. The check is in the context of the Python runtime executing the unittest case, typically the callable is a method of the test case.
The application remains running until all the conditions are met and callable returns. If callable raises an error, typically through an assertion method from unittest then the test will fail.
Used for testing side effects of the application, typically with STREAMING_ANALYTICS_SERVICE or DISTRIBUTED. The callable may also use the REST api for context types that support it to dynamically monitor the running application.
The callable can use submission_result and streams_connection attributes from
Tester
instance to interact with the job or the running Streams instance.Simple example of checking the job is healthy:
import unittest from streamsx.topology.topology import Topology from streamsx.topology.tester import Tester class TestLocalCheckExample(unittest.TestCase): def setUp(self): Tester.setup_distributed(self) def test_job_is_healthy(self): topology = Topology() s = topology.source(['Hello', 'World']) self.tester = Tester(topology) self.tester.tuple_count(s, 2) # Add the local check self.tester.local_check = self.local_checks # Run the test self.tester.test(self.test_ctxtype, self.test_config) def local_checks(self): job = self.tester.submission_result.job self.assertEqual('healthy', job.health)
Warning
A local check must not cancel the job (application under test).
Parameters: callable – Callable object.
-
static
setup_distributed
(test)¶ Set up a unittest.TestCase to run tests using IBM Streams distributed mode.
Requires a local IBM Streams install define by the STREAMS_INSTALL environment variable. If STREAMS_INSTALL is not set then the test is skipped.
- The Streams instance to use is defined by the environment variables:
- STREAMS_ZKCONNECT - Zookeeper connection string
- STREAMS_DOMAIN_ID - Domain identifier
- STREAMS_INSTANCE_ID - Instance identifier
- Two attributes are set in the test case:
- test_ctxtype - Context type the test will be run in.
- test_config - Test configuration.
Parameters: test (unittest.TestCase) – Test case to be set up to run tests using Tester Returns: None
-
static
setup_standalone
(test)¶ Set up a unittest.TestCase to run tests using IBM Streams standalone mode.
Requires a local IBM Streams install define by the STREAMS_INSTALL environment variable. If STREAMS_INSTALL is not set, then the test is skipped.
- Two attributes are set in the test case:
- test_ctxtype - Context type the test will be run in.
- test_config- Test configuration.
Parameters: test (unittest.TestCase) – Test case to be set up to run tests using Tester Returns: None
-
static
setup_streaming_analytics
(test, service_name=None, force_remote_build=False)¶ Set up a unittest.TestCase to run tests using Streaming Analytics service on IBM Bluemix cloud platform.
- The service to use is defined by:
- VCAP_SERVICES environment variable containing streaming_analytics entries.
- service_name which defaults to the value of STREAMING_ANALYTICS_SERVICE_NAME environment variable.
If VCAP_SERVICES is not set or a service name is not defined, then the test is skipped.
- Two attributes are set in the test case:
- test_ctxtype - Context type the test will be run in.
- test_config - Test configuration.
Parameters: - test (unittest.TestCase) – Test case to be set up to run tests using Tester
- service_name (str) – Name of Streaming Analytics service to use. Must exist as an entry in the VCAP services. Defaults to value of STREAMING_ANALYTICS_SERVICE_NAME environment variable.
Returns: None
-
test
(ctxtype, config=None, assert_on_fail=True, username=None, password=None)¶ Test the topology.
Submits the topology for testing and verifies the test conditions are met and the job remained healthy through its execution.
The submitted application (job) is monitored for the test conditions and will be canceled when all the conditions are valid or at least one failed. In addition if a local check was specified using
local_check()
then that callable must complete before the job is cancelled.The test passes if all conditions became valid and the local check callable (if present) completed without raising an error.
The test fails if the job is unhealthy, any condition fails or the local check callable (if present) raised an exception.
Parameters: - ctxtype (str) – Context type for submission.
- config – Configuration for submission.
- assert_on_fail (bool) – True to raise an assertion if the test fails, False to return the passed status.
- username (str) – username for distributed tests
- password (str) – password for distributed tests
-
streams_connection
¶ StreamsConnection – Connection object that can be used to interact with the REST API of the Streaming Analytics service or instance.
Returns: True if test passed, False if test failed if assert_on_fail is False. Return type: bool
-
tuple_check
(stream, checker)¶ Check each tuple on a stream.
For each tuple
t
on streamchecker(t)
is called.If the return evaluates to False then the condition fails. Once the condition fails it can never become valid. Otherwise the condition becomes or remains valid. The first tuple on the stream makes the condition valid if the checker callable evaluates to True.
The condition can be combined with
tuple_count()
withexact=False
to test a stream map or filter with random input data.An example of combining tuple_count and tuple_check to test a filter followed by a map is working correctly across a random set of values:
def rands(): r = random.Random() while True: yield r.random() class TestFilterMap(unittest.testCase): # Set up omitted def test_filter(self): # Declare the application to be tested topology = Topology() r = topology.source(rands()) r = r.filter(lambda x : x > 0.7) r = r.map(lambda x : x + 0.2) # Create tester and assign conditions tester = Tester(topology) # Ensure at least 1000 tuples pass through the filter. tester.tuple_count(r, 1000, exact=False) tester.tuple_check(r, lambda x : x > 0.9) # Submit the application for test # If it fails an AssertionError will be raised. tester.test(self.test_ctxtype, self.test_config)
Parameters: - stream (Stream) – Stream to be tested.
- checker (callable) – Callable that must evaluate to True for each tuple.
-
tuple_count
(stream, count, exact=True)¶ Test that a stream contains a number of tuples.
If exact is True, then condition becomes valid when count tuples are seen on stream during the test. Subsequently if additional tuples are seen on stream then the condition fails and can never become valid.
If exact is False, then the condition becomes valid once count tuples are seen on stream and remains valid regardless of any additional tuples.
Parameters: - stream (Stream) – Stream to be tested.
- count (int) – Number of tuples expected.
- exact (bool) – True if the stream must contain exactly count tuples, False if the stream must contain at least count tuples.
Returns: stream
Return type:
-