streamsx.ec

Access to the IBM Streams execution context.

Overview

This module (streamsx.ec) provides access to the execution context when Python code is running in a Streams application.

A Streams application runs distributed or standalone.

Distributed

Distributed is used when an application is submitted to the Streaming Analytics service on IBM Cloud or a IBM Streams distributed instance.

With distributed a running application is a job that contains one or more processing elements (PEs). A PE corresponds to a Linux operating system process. The PEs in a job may be distributed across the resources (hosts) in the Streams instance.

Standalone

Standalone is a mode where the complete application is run as a single PE (process) outside of a Streams instance.

Standalone is typically used for ad-hoc testing of an application.

Application log and trace

IBM Streams provides application trace and log services.

Application log

The Streams application log service is for application logging, where logging is defined as the recording of serviceability information pertaining to application or operator events. The purpose of logging is to provide an administrator with enough information to do problem determination for items they can potentially control. In general, very few events are logged in the normal running scenario of an application or operator. Events pertinent to the failure or partial failure of application runtime scenarios should be logged.

When running in distributed or standalone the com.ibm.streams.log logger has a handler that records messages to the Streams application log service. The level of the logger and its handler are set to the configured application log level at PE start up.

This logger and handler discard any message with level below INFO (20).

Python application code can log a message suitable for an administrator by using the com.ibm.streams.log logger or a child logger that has logger.propagate evaulating to True. Example of logging a file exception:

try:
    import numpy
except ImportError as e:
    logging.getLogger('com.ibm.streams.log').error(e)
    raise

Application code must not modify the com.ibm.streams.log logger, if additional handlers or different levels are required a child logger should be used.

Application trace

The Streams application trace service is for application tracing, where tracing is defined as the recording of application or operator internal events and data. The purpose of tracing is to allow application or operator developers to debug their applications or operators.

When running in distributed or standalone the root logger has a handler that records messages to the Streams application trace service. The level of the logger and its handler are set to the configured application trace level at PE start up.

Python application code can trace a message using the root logger or a child logger that has logger.propagate evaulating to True. Example of logging a trace message:

trace = logging.getLogger(__name__)

...

    trace.info("Threshold set to %f", val)

Any existing logging performed by modules will automatically become Streams trace messages if the application is using the logging package.

Application code must not modify the root logger, if additional handlers or different levels are required a child logger should be used.

Execution Context

This module (streamsx.ec) provides access to the execution context when Python code is running in a Streams application.

Access is only supported when running:
  • Streams 4.2 or later

This module may be used by Python functions or classes used in a Topology or decorated SPL operators.

Most functionality is only available when a Python class is being invoked in a Streams application.

Changed in version 1.9: Support for Python 2.7

Module contents

Functions

channel Return the parallel region global channel number obj is executing in.
domain_id Return the instance identifier.
get_application_configuration Get a named application configuration.
get_application_directory Get the application directory.
instance_id Return the instance identifier.
is_standalone Is the execution context standalone.
job_id Return the job identifier.
local_channel Return the parallel region local channel number obj is executing in.
local_max_channels Return the local maximum number of channels for the parallel region obj is executing in.
max_channels Return the global maximum number of channels for the parallel region obj is executing in.
pe_id Return the PE identifier.

Classes

CustomMetric Create a custom metric.
MetricKind Enumeration for the kind of a metric.
class streamsx.ec.CustomMetric(obj, name, description=None, kind=<MetricKind.Counter: 1>, initialValue=0)

Create a custom metric.

A custom metric holds a 64 bit signed integer value that represents a Counter, Gauge or Time metric.

Custom metrics are exposed through the IBM Streams monitoring APIs.

Parameters:
  • obj – Instance of a class executing within Streams.
  • name (str) – Name of the custom metric.
  • kind (MetricKind) – Kind of the metric.
  • description (str) – Description of the metric.
  • initialValue – Initial value of the metric.

Examples:

Simple example used as an instance to Stream.filter:

class MyF:
    def __init__(self, substring):
        self.substring = substring
        pass

    def __call__(self, tuple):
        if self.substring in str(tuple)
            self.my_metric += 1
        return True

    # Create the metric when the it is running
    # in the Streams execution context
    def __enter__(self):
        self.my_metric = ec.CustomMetric(self, "count_" + self.substring)

    # must supply __exit__ if using __enter__
    def __exit__(self, exc_type, exc_val, exc_tb):
        pass

    def __getstate__(self):
        # Remove metric from saved state.
        state = self.__dict__.copy()
        if 'my_metric' in state:
            del state['my_metric']
        return state

    def __setstate__(self, state):
        self.__dict__.update(state)
value

Current value of the metric.

class streamsx.ec.MetricKind

Enumeration for the kind of a metric.

The kind of the metric only indicates the behavior of value, it does not impose any semantics on the value. The kind is typically used by tooling applications.

Counter = 1

A counter metric observes a value that represents a count of an occurrence.

Gauge = 0

A gauge metric observes a value that is continuously variable with time.

Time = 2

A time metric represents a point in time or duration. The recommended unit of time is milliseconds, using the standard epoch of 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970 to represent a point in time.

streamsx.ec.channel(obj)

Return the parallel region global channel number obj is executing in.

The channel number is in the range of 0 to max_channel(obj).

When the parallel region is not nested this is the same value as local_channel(obj).

If the parallel region is nested the value will be between zero and (width*N - 1) where N is the number of times the parallel region has been replicated due to nesting.

Parameters:obj – Instance of a class executing within Streams.
Returns:Parallel region global channel number or -1 if not located in a parallel region.
Return type:int
streamsx.ec.domain_id()

Return the instance identifier.

streamsx.ec.get_application_configuration(name)

Get a named application configuration.

An application configuration is a named set of securely stored properties where each key and its value in the property set is a string.

An application configuration object is used to store information that IBM Streams applications require, such as:

  • Database connection data
  • Credentials that your applications need to use to access external systems
  • Other data, such as the port numbers or URLs of external systems
Parameters:name (str) – Name of the application configuration.
Returns:Dictionary containing the property names and values for the application configuration.
Return type:dict
Raises:ValueError – Application configuration does not exist.
streamsx.ec.get_application_directory()

Get the application directory.

Returns:The application directory.
Return type:str

New in version 1.7.

streamsx.ec.instance_id()

Return the instance identifier.

streamsx.ec.is_standalone()

Is the execution context standalone.

Returns:True if the execution context is standalone, False if it is distributed.
Return type:boolean
streamsx.ec.job_id()

Return the job identifier.

streamsx.ec.local_channel(obj)

Return the parallel region local channel number obj is executing in.

The channel number is in the range of zero to local_max_channel(obj).

Parameters:obj – Instance of a class executing within Streams.
Returns:Parallel region local channel number or -1 if not located in a parallel region.
Return type:int
streamsx.ec.local_max_channels(obj)

Return the local maximum number of channels for the parallel region obj is executing in.

The maximum number of channels corresponds to the width of the region.

Parameters:obj – Instance of a class executing within Streams.
Returns:Parallel region local maximum number of channels or 0 if not located in a parallel region.
Return type:int
streamsx.ec.max_channels(obj)

Return the global maximum number of channels for the parallel region obj is executing in.

When the parallel region is not nested this is the same value as local_max_channels(obj).

If the parallel region is nested the value will be (width*N) where N is the number of times the parallel region has been replicated due to nesting.

Parameters:obj – Instance of a class executing within Streams.
Returns:Parallel region global maximum number of channels or 0 if not located in a parallel region.
Return type:int
streamsx.ec.pe_id()

Return the PE identifier.