streamsx.topology.state¶
Application state.
Overview¶
Stateful applications are ones that include callables that are classes and thus can maintain state as instance variables.
By default any state is reset to its initial state after a processing element (PE) restart. A restart may occur due to:
a failure in the PE or its resource,
a explicit PE restart request,
or a parallel region width change (IBM Streams 4.3 or later)
The application or a portion of it may be configured to maintain state after a PE restart by one of two mechanisms.
Consistent region. A consistent region is a subgraph where the states of callables become consistent by processing all the tuples within defined points on a stream. After a PE restart all callables in the region are reset to the last consistent point, so that the state of all callables represents the processing of the same input tuples to the region.
Checkpointing, each stateful callable is checkpointed periodically and after a PE restart its callables are reset to their most recent checkpointed state.
Stateful callables¶
Use of a class instance allows a transformation (for example map()
) to be stateful by maintaining state in instance
attributes across invocations.
When the callable is in a consistent region or checkpointing it is serialized using dill. The default serialization may be modified by using the standard Python pickle mechanism of __getstate__
and __setstate__
. This is required if the state includes objects that cannot be serialized, for example file descriptors. For details see See https://docs.python.org/3.5/library/pickle.html#handling-stateful-objects .
If the callable has __enter__
and __exit__
context manager methods then __enter__
is called after the object has been deserialized by dill. Thus __enter__
is used to recreate runtime objects that cannot be serialized such as open files or sockets.
Module contents¶
Classes
A |
-
class
streamsx.topology.state.
ConsistentRegionConfig
(trigger=None, period=None, drain_timeout=180, reset_timeout=180, max_consecutive_attempts=5)¶ Bases:
object
A
ConsistentRegionConfig
configures a consistent region.The recommended way to create a
ConsistentRegionConfig
is to call eitheroperator_driven()
orperiodic()
.- Parameters
trigger (ConsistentRegionConfig.Trigger) – Determines how the drain/checkpoint cycle of the consistent region is triggered.
period – The trigger period. If the trigger is
PERIODIC
, this must be specified, otherwise it may not be specfied. This may be either adatetime.timedelta
value or the number of seconds as a float.drain_timeout – Indicates the maximum time in seconds that the drain and checkpoint of the region is allotted to finish processing. If the process takes longer than the specified time, a failure is reported and the region is reset to the point of the previously successfully established consistent state. The value must be specified as either a
datetime.timedelta
value or the number of seconds as a float. If not specified, the default value is 180 seconds.reset_timeout – Indicates the maximum time in seconds that the reset of the region is allotted to finish processing. If the process takes longer than the specified time, a failure is reported and another reset of the region is attempted. The value must be specified as either a
datetime.timedelta
value or the number of seconds as a float. If not specified, the default value is 180 seconds.max_consecutive_attempts (int) – Indicates the maximum number of consecutive attempts to reset a consistent region. After a failure, if the maximum number of attempts is reached, the region stops processing new tuples. After the maximum number of consecutive attempts is reached, a region can be reset only with manual intervention or with a program with a call to a method in the consistent region controller. This must be an integer value between 1 and 2147483647, inclusive. If not specified, the default value is 5.
Example:
from streamsx.topology.state import ConsistentRegionConfig # set source to be the start of an operator driven consistent region # with a drain timeout of sixty seconds and a reset timeout of twenty seconds. source.set_consistent(ConsistentRegionConfig.operator_driven(drain_timeout=60, reset_timeout=20))
See also
New in version 1.11.
-
class
Trigger
¶ Bases:
enum.Enum
Defines how the drain-checkpoint cycle of a consistent region is triggered.
New in version 1.11.
-
OPERATOR_DRIVEN
= 1¶ Region is triggered by the start operator.
-
PERIODIC
= 2¶ Region is triggered periodically.
-
-
static
operator_driven
(drain_timeout=180, reset_timeout=180, max_consecutive_attempts=5)¶ Define an operator-driven consistent region configuration. The source operator triggers drain and checkpoint cycles for the region.
- Parameters
drain_timeout – The drain timeout, as either a
datetime.timedelta
value or the number of seconds as a float. If not specified, the default value is 180 seconds.reset_timeout – The reset timeout, as either a
datetime.timedelta
value or the number of seconds as a float. If not specified, the default value is 180 seconds.max_consecutive_attempts (int) – The maximum number of consecutive attempts to reset the region. This must be an integer value between 1 and 2147483647, inclusive. If not specified, the default value is 5.
- Returns
the configuration.
- Return type
-
static
periodic
(period, drain_timeout=180, reset_timeout=180, max_consecutive_attempts=5)¶ Create a periodic consistent region configuration. The IBM Streams runtime will trigger a drain and checkpoint the region periodically at the time interval specified by period.
- Parameters
period – The trigger period. This may be either a
datetime.timedelta
value or the number of seconds as a float.drain_timeout – The drain timeout, as either a
datetime.timedelta
value or the number of seconds as a float. If not specified, the default value is 180 seconds.reset_timeout – The reset timeout, as either a
datetime.timedelta
value or the number of seconds as a float. If not specified, the default value is 180 seconds.max_consecutive_attempts (int) – The maximum number of consecutive attempts to reset the region. This must be an integer value between 1 and 2147483647, inclusive. If not specified, the default value is 5.
- Returns
the configuration.
- Return type