streamsx.ec module¶
Overview¶
Access to the IBM Streams execution context.
A Streams application runs distributed or standalone.
Distributed¶
Distributed is used when an application is submitted to the Streaming Analytics service on IBM Bluemix cloud platform or a IBM Streams distributed instance.
With distributed a running application is a job that contains one or more processing elements (PEs). A PE corresponds to a Linux operating system process. The PEs in a job may be distributed across the resources (hosts) in the Streams instance.
Standalone¶
Standalone is a mode where the complete application is run as a single PE (process) outside of a Streams instance.
Standalone is typically used for ad-hoc testing of an application.
Application log and trace¶
IBM Streams provides application trace and log services.
Application log¶
The Streams application log service is for application logging, where logging is defined as the recording of serviceability information pertaining to application or operator events. The purpose of logging is to provide an administrator with enough information to do problem determination for items they can potentially control. In general, very few events are logged in the normal running scenario of an application or operator. Events pertinent to the failure or partial failure of application runtime scenarios should be logged.
When running in distributed or standalone the com.ibm.streams.log logger has a handler that records messages to the Streams application log service. The level of the logger and its handler are set to the configured application log level at PE start up.
This logger and handler discard any message with level below INFO (20).
Python application code can log a message suitable for an administrator by using
the com.ibm.streams.log logger or a child logger that has logger.propagate
evaulating to True
. Example of logging a file exception:
try:
import numpy
except ImportError as e:
logging.getLogger('com.ibm.streams.log').error(e)
raise
Application code must not modify the com.ibm.streams.log logger, if additional handlers or different levels are required a child logger should be used.
Application trace¶
The Streams application trace service is for application tracing, where tracing is defined as the recording of application or operator internal events and data. The purpose of tracing is to allow application or operator developers to debug their applications or operators.
When running in distributed or standalone the root logger has a handler that records messages to the Streams application trace service. The level of the logger and its handler are set to the configured application trace level at PE start up.
Python application code can trace a message using
the root logger or a child logger that has logger.propagate
evaulating to True
. Example of logging a trace message:
trace = logging.getLogger(__name__)
...
trace.info("Threshold set to %f", val)
Any existing logging performed by modules will automatically become Streams trace messages if the application is using the logging package.
Application code must not modify the root logger, if additional handlers or different levels are required a child logger should be used.
Execution Context¶
This module (streamsx.exec) provides access to the execution context when Python code is running in a Streams application.
- Access is only supported when running:
- Python 3.5
- Streams 4.2 or later
This module may be used by Python functions or classes used in a Topology or decorated SPL operators.
Most functionality is only available when a Python class is being invoked in a Streams application.
-
class
streamsx.ec.
CustomMetric
(obj, name, description=None, kind=<MetricKind.Counter: 1>, initialValue=0)¶ Bases:
object
Create a custom metric.
A custom metric holds a 64 bit signed integer value that represents a Counter, Gauge or Time metric.
Custom metrics are exposed through the IBM Streams monitoring APIs.
Parameters: - obj – Instance of a class executing within Streams.
- name (str) – Name of the custom metric.
- kind (MetricKind) – Kind of the metric.
- description (str) – Description of the metric.
- initialValue – Initial value of the metric.
Examples:
Simple example used as an instance to
Stream.filter
:class MyF: def __init__(self, substring): self.substring = substring pass def __call__(self, tuple): if self.substring in str(tuple) self.my_metric += 1 return True # Create the metric when the it is running # in the Streams execution context def __enter__(self): self.my_metric = ec.CustomMetric(self, "count_" + self.substring) # must supply __exit__ if using __enter__ def __exit__(self, exc_type, exc_val, exc_tb): pass def __getstate__(self): # Remove metric from saved state. state = self.__dict__.copy() if 'my_metric' in state: del state['my_metric'] return state def __setstate__(self, state): self.__dict__.update(state)
-
value
¶ Current value of the metric.
-
class
streamsx.ec.
MetricKind
¶ Bases:
enum.Enum
Enumeration for the kind of a metric.
The kind of the metric only indicates the behavior of value, it does not impose any semantics on the value. The kind is typically used by tooling applications.
-
Counter
= 1¶ A counter metric observes a value that represents a count of an occurrence.
-
Gauge
= 0¶ A gauge metric observes a value that is continuously variable with time.
-
Time
= 2¶ A time metric represents a point in time or duration. The recommended unit of time is milliseconds, using the standard epoch of 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970 to represent a point in time.
-
-
streamsx.ec.
channel
(obj)¶ Return the parallel region global channel number obj is executing in.
The channel number is in the range of 0 to
max_channel(obj)
.When the parallel region is not nested this is the same value as
local_channel(obj)
.If the parallel region is nested the value will be between zero and
(width*N - 1)
where N is the number of times the parallel region has been replicated due to nesting.Parameters: obj – Instance of a class executing within Streams. Returns: Parallel region global channel number or -1 if not located in a parallel region. Return type: int
-
streamsx.ec.
domain_id
()¶ Return the instance identifier.
-
streamsx.ec.
get_application_configuration
(name)¶ Get a named application configuration.
An application configuration is a named set of securely stored properties where each key and its value in the property set is a string.
An application configuration object is used to store information that IBM Streams applications require, such as:
- Database connection data
- Credentials that your applications need to use to access external systems
- Other data, such as the port numbers or URLs of external systems
Parameters: name (str) – Name of the application configuration. Returns: Dictionary containing the property names and values for the application configuration. Return type: dict Raises: ValueError
– Application configuration does not exist.
-
streamsx.ec.
get_application_directory
()¶ Get the application directory.
Returns: The application directory. Return type: str New in version 1.7.
-
streamsx.ec.
instance_id
()¶ Return the instance identifier.
-
streamsx.ec.
is_standalone
()¶ Is the execution context standalone.
Returns: True if the execution context is standalone, False if it is distributed. Return type: boolean
-
streamsx.ec.
job_id
()¶ Return the job identifier.
-
streamsx.ec.
local_channel
(obj)¶ Return the parallel region local channel number obj is executing in.
The channel number is in the range of zero to
local_max_channel(obj)
.Parameters: obj – Instance of a class executing within Streams. Returns: Parallel region local channel number or -1 if not located in a parallel region. Return type: int
-
streamsx.ec.
local_max_channels
(obj)¶ Return the local maximum number of channels for the parallel region obj is executing in.
The maximum number of channels corresponds to the width of the region.
Parameters: obj – Instance of a class executing within Streams. Returns: Parallel region local maximum number of channels or 0 if not located in a parallel region. Return type: int
-
streamsx.ec.
max_channels
(obj)¶ Return the global maximum number of channels for the parallel region obj is executing in.
When the parallel region is not nested this is the same value as
local_max_channels(obj)
.If the parallel region is nested the value will be
(width*N)
where N is the number of times the parallel region has been replicated due to nesting.Parameters: obj – Instance of a class executing within Streams. Returns: Parallel region global maximum number of channels or 0 if not located in a parallel region. Return type: int
-
streamsx.ec.
pe_id
()¶ Return the PE identifier.