streamsx.ec

Access to the IBM Streams execution context.

Overview

This module (streamsx.ec) provides access to the execution context when Python code is running in a Streams application.

A Streams application runs distributed or standalone.

Distributed

Distributed is used when an application is submitted to the Streaming Analytics service on IBM Cloud or a IBM Streams distributed instance.

With distributed a running application is a job that contains one or more processing elements (PEs). A PE corresponds to a Linux operating system process. The PEs in a job may be distributed across the resources (hosts) in the Streams instance.

Standalone

Standalone is a mode where the complete application is run as a single PE (process) outside of a Streams instance.

Standalone is typically used for ad-hoc testing of an application.

Application log and trace

IBM Streams provides application trace and log services.

Application log

The Streams application log service is for application logging, where logging is defined as the recording of serviceability information pertaining to application or operator events. The purpose of logging is to provide an administrator with enough information to do problem determination for items they can potentially control. In general, very few events are logged in the normal running scenario of an application or operator. Events pertinent to the failure or partial failure of application runtime scenarios should be logged.

When running in distributed or standalone the com.ibm.streams.log logger has a handler that records messages to the Streams application log service. The level of the logger and its handler are set to the configured application log level at PE start up.

This logger and handler discard any message with level below INFO (20).

Python application code can log a message suitable for an administrator by using the com.ibm.streams.log logger or a child logger that has logger.propagate evaulating to True. Example of logging a file exception:

try:
    import numpy
except ImportError as e:
    logging.getLogger('com.ibm.streams.log').error(e)
    raise

Application code must not modify the com.ibm.streams.log logger, if additional handlers or different levels are required a child logger should be used.

Application trace

The Streams application trace service is for application tracing, where tracing is defined as the recording of application or operator internal events and data. The purpose of tracing is to allow application or operator developers to debug their applications or operators.

When running in distributed or standalone the root logger has a handler that records messages to the Streams application trace service. The level of the logger and its handler are set to the configured application trace level at PE start up.

Python application code can trace a message using the root logger or a child logger that has logger.propagate evaulating to True. Example of logging a trace message:

trace = logging.getLogger(__name__)

...

    trace.info("Threshold set to %f", val)

Any existing logging performed by modules will automatically become Streams trace messages if the application is using the logging package.

Application code must not modify the root logger, if additional handlers or different levels are required a child logger should be used.

Execution Context

This module (streamsx.ec) provides access to the execution context when Python code is running in a Streams application.

Access is only supported when running:
  • Streams 4.2 or later

This module may be used by Python functions or classes used in a Topology or decorated SPL operators.

Most functionality is only available when a Python class is being invoked in a Streams application.

Changed in version 1.9: Support for Python 2.7

Module contents

Functions

channel

Return the parallel region global channel number obj is executing in.

domain_id

Return the instance identifier.

get_application_configuration

Get a named application configuration.

get_application_directory

Get the application directory.

get_submission_time_value

Gets the value of a submission time parameter.

instance_id

Return the instance identifier.

is_active

Tests if code is active within a IBM Streams execution context.

is_standalone

Is the execution context standalone.

job_id

Return the job identifier.

local_channel

Return the parallel region local channel number obj is executing in.

local_max_channels

Return the local maximum number of channels for the parallel region obj is executing in.

max_channels

Return the global maximum number of channels for the parallel region obj is executing in.

pe_id

Return the PE identifier.

shutdown

Return the processing element (PE) shutdown event.

Classes

CustomMetric

Create a custom metric.

MetricKind

Enumeration for the kind of a metric.

streamsx.ec.is_active()

Tests if code is active within a IBM Streams execution context.

Returns a true value when called from within a IBM Streams distributed job or standalone execution.

Can be used to only run code required at runtime, such as importing a module that is only needed at runtime and not topology declaration time.

Returns

True if running in a IBM Streams context false otherwise.

Return type

bool

New in version 1.11.

streamsx.ec.shutdown()

Return the processing element (PE) shutdown event.

The event is set when the PE is being shutdown. Can be used in source iterators that need to block by sleeping:

# Sleep for 60 seconds unless the PE is being shutdown
if streamsx.ec.shutdown.wait(60.0):
    return None

Code must not call set() on the returned event.

Returns

Event object representing PE shutdown.

Return type

threading.Event

New in version 1.11.

streamsx.ec.domain_id()

Return the instance identifier.

streamsx.ec.instance_id()

Return the instance identifier.

streamsx.ec.job_id()

Return the job identifier.

streamsx.ec.pe_id()

Return the PE identifier.

streamsx.ec.is_standalone()

Is the execution context standalone.

Returns

True if the execution context is standalone, False if it is distributed.

Return type

boolean

streamsx.ec.get_application_directory()

Get the application directory.

Returns

The application directory.

Return type

str

New in version 1.7.

streamsx.ec.get_application_configuration(name)

Get a named application configuration.

An application configuration is a named set of securely stored properties where each key and its value in the property set is a string.

An application configuration object is used to store information that IBM Streams applications require, such as:

  • Database connection data

  • Credentials that your applications need to use to access external systems

  • Other data, such as the port numbers or URLs of external systems

Parameters

name (str) – Name of the application configuration.

Returns

Dictionary containing the property names and values for the application configuration.

Return type

dict

Raises

ValueError – Application configuration does not exist.

streamsx.ec.get_submission_time_value(name, type_=None)

Gets the value of a submission time parameter.

Note

Submission parameters must be created with streamsx.topology.topology.Topology.create_submission_parameter() at topology declaration time before they can be accessed with this function.

Note

Submission time parameters, which are defined within other toolkits that are used by this topology or created with streamsx.spl.op.Expression in this topology are not accessible with this function.

Parameters
  • name (str) – The name of the submission time parameter

  • type_ – Type of parameter value. Supported values are str, int, float and bool. None assumes that the type of the parameter is str and needs no conversion. The type must be the same as used in streamsx.topology.topology.Topology.create_submission_parameter() or as deduced from the parameters default value.

Returns

The value of the submission time parameter. The return type is either str or the given type_.

Raises

ValueError – The submission time parameter with the given name does not exist.

New in version 1.15.

streamsx.ec.channel(obj)

Return the parallel region global channel number obj is executing in.

The channel number is in the range of 0 to max_channel(obj).

When the parallel region is not nested this is the same value as local_channel(obj).

If the parallel region is nested the value will be between zero and (width*N - 1) where N is the number of times the parallel region has been replicated due to nesting.

Parameters

obj – Instance of a class executing within Streams.

Returns

Parallel region global channel number or -1 if not located in a parallel region.

Return type

int

streamsx.ec.local_channel(obj)

Return the parallel region local channel number obj is executing in.

The channel number is in the range of zero to local_max_channel(obj).

Parameters

obj – Instance of a class executing within Streams.

Returns

Parallel region local channel number or -1 if not located in a parallel region.

Return type

int

streamsx.ec.max_channels(obj)

Return the global maximum number of channels for the parallel region obj is executing in.

When the parallel region is not nested this is the same value as local_max_channels(obj).

If the parallel region is nested the value will be (width*N) where N is the number of times the parallel region has been replicated due to nesting.

Parameters

obj – Instance of a class executing within Streams.

Returns

Parallel region global maximum number of channels or 0 if not located in a parallel region.

Return type

int

streamsx.ec.local_max_channels(obj)

Return the local maximum number of channels for the parallel region obj is executing in.

The maximum number of channels corresponds to the width of the region.

Parameters

obj – Instance of a class executing within Streams.

Returns

Parallel region local maximum number of channels or 0 if not located in a parallel region.

Return type

int

class streamsx.ec.MetricKind

Bases: enum.Enum

Enumeration for the kind of a metric.

The kind of the metric only indicates the behavior of value, it does not impose any semantics on the value. The kind is typically used by tooling applications.

Counter = 1

A counter metric observes a value that represents a count of an occurrence.

Gauge = 0

A gauge metric observes a value that is continuously variable with time.

Time = 2

A time metric represents a point in time or duration. The recommended unit of time is milliseconds, using the standard epoch of 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970 to represent a point in time.

class streamsx.ec.CustomMetric(obj, name, description=None, kind=<MetricKind.Counter: 1>, initialValue=0)

Bases: object

Create a custom metric.

A custom metric holds a 64 bit signed integer value that represents a Counter, Gauge or Time metric.

Custom metrics are exposed through the IBM Streams monitoring APIs.

Metric name is unique within the execution context of the callable obj. Attempts to create multiple metrics with the same name but different kinds will raise an exception. Multiple creations of a metric of the same name and kind all refer to the same metric, the first creation is the only one that will set the initial value.

The metric’s value is assigned through the value property and can be modified through += and -=. CustomMetric can also be converted to an int.

Parameters
  • obj – Instance of a class executing within Streams.

  • name (str) – Name of the custom metric.

  • kind (MetricKind) – Kind of the metric.

  • description (str) – Description of the metric.

  • initialValue – Initial value of the metric.

Examples:

Simple example used as an instance to Stream.filter:

class MyF:
    def __init__(self, substring):
        self.substring = substring
        pass

    def __call__(self, tuple):
        if self.substring in str(tuple)
            self.my_metric += 1
        return True

    # Create the metric when the it is running
    # in the Streams execution context
    def __enter__(self):
        self.my_metric = ec.CustomMetric(self, "count_" + self.substring)

    # must supply __exit__ if using __enter__
    def __exit__(self, exc_type, exc_val, exc_tb):
        pass

    def __getstate__(self):
        # Remove metric from saved state.
        state = self.__dict__.copy()
        if 'my_metric' in state:
            del state['my_metric']
        return state

    def __setstate__(self, state):
        self.__dict__.update(state)
property value

Current value of the metric.