streamsx.topology.state

Application state.

Overview

Stateful applications are ones that include callables that are classes and thus can maintain state as instance variables.

By default any state is reset to its initial state after a processing element (PE) restart. A restart may occur due to:

  • a failure in the PE or its resource,

  • a explicit PE restart request,

  • or a parallel region width change (IBM Streams 4.3 or later)

The application or a portion of it may be configured to maintain state after a PE restart by one of two mechanisms.

Stateful callables

Use of a class instance allows a transformation (for example map()) to be stateful by maintaining state in instance attributes across invocations.

When the callable is in a consistent region or checkpointing it is serialized using dill. The default serialization may be modified by using the standard Python pickle mechanism of __getstate__ and __setstate__. This is required if the state includes objects that cannot be serialized, for example file descriptors. For details see See https://docs.python.org/3.5/library/pickle.html#handling-stateful-objects .

If the callable has __enter__ and __exit__ context manager methods then __enter__ is called after the object has been deserialized by dill. Thus __enter__ is used to recreate runtime objects that cannot be serialized such as open files or sockets.

Module contents

Classes

ConsistentRegionConfig

A ConsistentRegionConfig configures a consistent region.

class streamsx.topology.state.ConsistentRegionConfig(trigger=None, period=None, drain_timeout=180, reset_timeout=180, max_consecutive_attempts=5)

Bases: object

A ConsistentRegionConfig configures a consistent region.

The recommended way to create a ConsistentRegionConfig is to call either operator_driven() or periodic().

Parameters
  • trigger (ConsistentRegionConfig.Trigger) – Determines how the drain/checkpoint cycle of the consistent region is triggered.

  • period – The trigger period. If the trigger is PERIODIC, this must be specified, otherwise it may not be specfied. This may be either a datetime.timedelta value or the number of seconds as a float.

  • drain_timeout – Indicates the maximum time in seconds that the drain and checkpoint of the region is allotted to finish processing. If the process takes longer than the specified time, a failure is reported and the region is reset to the point of the previously successfully established consistent state. The value must be specified as either a datetime.timedelta value or the number of seconds as a float. If not specified, the default value is 180 seconds.

  • reset_timeout – Indicates the maximum time in seconds that the reset of the region is allotted to finish processing. If the process takes longer than the specified time, a failure is reported and another reset of the region is attempted. The value must be specified as either a datetime.timedelta value or the number of seconds as a float. If not specified, the default value is 180 seconds.

  • max_consecutive_attempts (int) – Indicates the maximum number of consecutive attempts to reset a consistent region. After a failure, if the maximum number of attempts is reached, the region stops processing new tuples. After the maximum number of consecutive attempts is reached, a region can be reset only with manual intervention or with a program with a call to a method in the consistent region controller. This must be an integer value between 1 and 2147483647, inclusive. If not specified, the default value is 5.

Example:

from streamsx.topology.state import ConsistentRegionConfig
# set source to be the start of an operator driven consistent region
# with a drain timeout of sixty seconds and a reset timeout of twenty seconds.
source.set_consistent(ConsistentRegionConfig.operator_driven(drain_timeout=60, reset_timeout=20))

See also

set_consistent()

New in version 1.11.

class Trigger

Bases: enum.Enum

Defines how the drain-checkpoint cycle of a consistent region is triggered.

New in version 1.11.

OPERATOR_DRIVEN = 1

Region is triggered by the start operator.

PERIODIC = 2

Region is triggered periodically.

static operator_driven(drain_timeout=180, reset_timeout=180, max_consecutive_attempts=5)

Define an operator-driven consistent region configuration. The source operator triggers drain and checkpoint cycles for the region.

Parameters
  • drain_timeout – The drain timeout, as either a datetime.timedelta value or the number of seconds as a float. If not specified, the default value is 180 seconds.

  • reset_timeout – The reset timeout, as either a datetime.timedelta value or the number of seconds as a float. If not specified, the default value is 180 seconds.

  • max_consecutive_attempts (int) – The maximum number of consecutive attempts to reset the region. This must be an integer value between 1 and 2147483647, inclusive. If not specified, the default value is 5.

Returns

the configuration.

Return type

ConsistentRegionConfig

static periodic(period, drain_timeout=180, reset_timeout=180, max_consecutive_attempts=5)

Create a periodic consistent region configuration. The IBM Streams runtime will trigger a drain and checkpoint the region periodically at the time interval specified by period.

Parameters
  • period – The trigger period. This may be either a datetime.timedelta value or the number of seconds as a float.

  • drain_timeout – The drain timeout, as either a datetime.timedelta value or the number of seconds as a float. If not specified, the default value is 180 seconds.

  • reset_timeout – The reset timeout, as either a datetime.timedelta value or the number of seconds as a float. If not specified, the default value is 180 seconds.

  • max_consecutive_attempts (int) – The maximum number of consecutive attempts to reset the region. This must be an integer value between 1 and 2147483647, inclusive. If not specified, the default value is 5.

Returns

the configuration.

Return type

ConsistentRegionConfig