streamsx.ec module¶
Overview¶
Access to the IBM Streams execution context.
A Streams application runs distributed or standalone.
Distributed¶
Distributed is used when an application is submitted to the Streaming Analytics service on IBM Bluemix cloud platform or a IBM Streams distributed instance.
With distributed a running application is a job that contains one or more processing elements (PEs). A PE corresponds to a Linux operating system process. The PEs in a job may be distributed across the resources (hosts) in the Streams instance.
Standalone¶
Standalone is a mode where the complete application is run as a single PE (process) outside of a Streams instance.
Standalone is typically used for ad-hoc testing of an application.
Execution Context¶
This module (streamsx.exec) provides access to the execution context when Python code is running in a Streams application.
- Access is only supported when running:
- Python 3.5
- Streams 4.2 or later
This module may be used by Python functions or classes used in a Topology or decorated SPL operators.
Most functionality is only available when a Python class is being invoked in a Streams application.
-
class
streamsx.ec.CustomMetric(obj, name, description=None, kind=<MetricKind.Counter: 1>, initialValue=0)¶ Bases:
objectCreate a custom metric.
A custom metric holds a 64 bit signed integer value that represents a Counter, Gauge or Time metric.
Custom metrics are exposed through the IBM Streams monitoring APIs.
Parameters: - obj – Instance of a class executing within Streams.
- name (str) – Name of the custom metric.
- kind (MetricKind) – Kind of the metric.
- description (str) – Description of the metric.
- initialValue – Initial value of the metric.
Examples:
Simple example used as an instance to
Stream.filter:class MyF: def __init__(self, substring): self.substring = substring pass def __call__(self, tuple): if self.substring in str(tuple) self.my_metric += 1 return True # Create the metric when the it is running # in the Streams execution context def __enter__(self): self.my_metric = ec.CustomMetric(self, "count_" + self.substring) # must supply __exit__ if using __enter__ def __exit__(self, exc_type, exc_val, exc_tb): pass def __getstate__(self): # Remove metric from saved state. state = self.__dict__.copy() if 'my_metric' in state: del state['my_metric'] return state def __setstate__(self, state): self.__dict__.update(state)
-
value¶ Current value of the metric.
-
class
streamsx.ec.MetricKind¶ Bases:
enum.EnumEnumeration for the kind of a metric.
The kind of the metric only indicates the behavior of value, it does not impose any semantics on the value. The kind is typically used by tooling applications.
-
Counter= 1¶ A counter metric observes a value that represents a count of an occurrence.
-
Gauge= 0¶ A gauge metric observes a value that is continuously variable with time.
-
Time= 2¶ A time metric represents a point in time or duration. The recommended unit of time is milliseconds, using the standard epoch of 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970 to represent a point in time.
-
-
streamsx.ec.channel(obj)¶ Return the parallel region global channel number obj is executing in.
The channel number is in the range of 0 to
max_channel(obj).When the parallel region is not nested this is the same value as
local_channel(obj).If the parallel region is nested the value will be between zero and
(width*N - 1)where N is the number of times the parallel region has been replicated due to nesting.Parameters: obj – Instance of a class executing within Streams. Returns: Parallel region global channel number or -1 if not located in a parallel region. Return type: int
-
streamsx.ec.domain_id()¶ Return the instance identifier.
-
streamsx.ec.get_application_configuration(name)¶ Get a named application configuration.
An application configuration is a named set of securely stored properties where each key and its value in the property set is a string.
An application configuration object is used to store information that IBM Streams applications require, such as:
- Database connection data
- Credentials that your applications need to use to access external systems
- Other data, such as the port numbers or URLs of external systems
Parameters: name (str) – Name of the application configuration. Returns: Dictionary containing the property names and values for the application configuration. Return type: dict Raises: ValueError– Application configuration does not exist.
-
streamsx.ec.instance_id()¶ Return the instance identifier.
-
streamsx.ec.is_standalone()¶ Is the execution context standalone.
Returns: True if the execution context is standalone, False if it is distributed. Return type: boolean
-
streamsx.ec.job_id()¶ Return the job identifier.
-
streamsx.ec.local_channel(obj)¶ Return the parallel region local channel number obj is executing in.
The channel number is in the range of zero to
local_max_channel(obj).Parameters: obj – Instance of a class executing within Streams. Returns: Parallel region local channel number or -1 if not located in a parallel region. Return type: int
-
streamsx.ec.local_max_channels(obj)¶ Return the local maximum number of channels for the parallel region obj is executing in.
The maximum number of channels corresponds to the width of the region.
Parameters: obj – Instance of a class executing within Streams. Returns: Parallel region local maximum number of channels or 0 if not located in a parallel region. Return type: int
-
streamsx.ec.max_channels(obj)¶ Return the global maximum number of channels for the parallel region obj is executing in.
When the parallel region is not nested this is the same value as
local_max_channels(obj).If the parallel region is nested the value will be
(width*N)where N is the number of times the parallel region has been replicated due to nesting.Parameters: obj – Instance of a class executing within Streams. Returns: Parallel region global maximum number of channels or 0 if not located in a parallel region. Return type: int
-
streamsx.ec.pe_id()¶ Return the PE identifier.