streamsx.ec module

Overview

Access to the IBM Streams execution context.

A Streams application runs distributed or standalone.

Distributed

Distributed is used when an application is submitted to the Streaming Analytics service on IBM Bluemix cloud platform or a IBM Streams distributed instance.

With distributed a running application is a job that contains one or more processing elements (PEs). A PE corresponds to a Linux operating system process. The PEs in a job may be distributed across the resources (hosts) in the Streams instance.

Standalone

Standalone is a mode where the complete application is run as a single PE (process) outside of a Streams instance.

Standalone is typically used for ad-hoc testing of an application.

Execution Context

This module (streamsx.exec) provides access to the execution context when Python code is running in a Streams application.

Access is only supported when running:
  • Python 3.5
  • Streams 4.2 or later

This module may be used by Python functions or classes used in a Topology or decorated SPL operators.

Most functionality is only available when a Python class is being invoked in a Streams application.

class streamsx.ec.CustomMetric(obj, name, description=None, kind=<MetricKind.Counter: 1>, initialValue=0)

Bases: object

Create a custom metric.

A custom metric holds a 64 bit signed integer value that represents a Counter, Gauge or Time metric.

Custom metrics are exposed through the IBM Streams monitoring APIs.

Parameters:
  • obj – Instance of a class executing within Streams.
  • name (str) – Name of the custom metric.
  • kind (MetricKind) – Kind of the metric.
  • description (str) – Description of the metric.
  • initialValue – Initial value of the metric.

Examples:

Simple example used as an instance to Stream.filter:

class MyF:
    def __init__(self, substring):
        self.substring = substring
        pass

    def __call__(self, tuple):
        if self.substring in str(tuple)
            self.my_metric += 1
        return True

    # Create the metric when the it is running
    # in the Streams execution context
    def __enter__(self):
        self.my_metric = ec.CustomMetric(self, "count_" + self.substring)

    # must supply __exit__ if using __enter__
    def __exit__(self, exc_type, exc_val, exc_tb):
        pass

    def __getstate__(self):
        # Remove metric from saved state.
        state = self.__dict__.copy()
        if 'my_metric' in state:
            del state['my_metric']
        return state

    def __setstate__(self, state):
        self.__dict__.update(state)
value

Current value of the metric.

class streamsx.ec.MetricKind

Bases: enum.Enum

Enumeration for the kind of a metric.

The kind of the metric only indicates the behavior of value, it does not impose any semantics on the value. The kind is typically used by tooling applications.

Counter = 1

A counter metric observes a value that represents a count of an occurrence.

Gauge = 0

A gauge metric observes a value that is continuously variable with time.

Time = 2

A time metric represents a point in time or duration. The recommended unit of time is milliseconds, using the standard epoch of 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970 to represent a point in time.

streamsx.ec.channel(obj)

Return the parallel region global channel number obj is executing in.

The channel number is in the range of 0 to max_channel(obj).

When the parallel region is not nested this is the same value as local_channel(obj).

If the parallel region is nested the value will be between zero and (width*N - 1) where N is the number of times the parallel region has been replicated due to nesting.

Parameters:obj – Instance of a class executing within Streams.
Returns:Parallel region global channel number or -1 if not located in a parallel region.
Return type:int
streamsx.ec.domain_id()

Return the instance identifier.

streamsx.ec.get_application_configuration(name)

Get a named application configuration.

An application configuration is a named set of securely stored properties where each key and its value in the property set is a string.

An application configuration object is used to store information that IBM Streams applications require, such as:

  • Database connection data
  • Credentials that your applications need to use to access external systems
  • Other data, such as the port numbers or URLs of external systems
Parameters:name (str) – Name of the application configuration.
Returns:Dictionary containing the property names and values for the application configuration.
Return type:dict
Raises:ValueError – Application configuration does not exist.
streamsx.ec.instance_id()

Return the instance identifier.

streamsx.ec.is_standalone()

Is the execution context standalone.

Returns:True if the execution context is standalone, False if it is distributed.
Return type:boolean
streamsx.ec.job_id()

Return the job identifier.

streamsx.ec.local_channel(obj)

Return the parallel region local channel number obj is executing in.

The channel number is in the range of zero to local_max_channel(obj).

Parameters:obj – Instance of a class executing within Streams.
Returns:Parallel region local channel number or -1 if not located in a parallel region.
Return type:int
streamsx.ec.local_max_channels(obj)

Return the local maximum number of channels for the parallel region obj is executing in.

The maximum number of channels corresponds to the width of the region.

Parameters:obj – Instance of a class executing within Streams.
Returns:Parallel region local maximum number of channels or 0 if not located in a parallel region.
Return type:int
streamsx.ec.max_channels(obj)

Return the global maximum number of channels for the parallel region obj is executing in.

When the parallel region is not nested this is the same value as local_max_channels(obj).

If the parallel region is nested the value will be (width*N) where N is the number of times the parallel region has been replicated due to nesting.

Parameters:obj – Instance of a class executing within Streams.
Returns:Parallel region global maximum number of channels or 0 if not located in a parallel region.
Return type:int
streamsx.ec.pe_id()

Return the PE identifier.