streamsx.ec¶
Access to the IBM Streams execution context.
Overview¶
This module (streamsx.ec) provides access to the execution context when Python code is running in a Streams application.
A Streams application runs distributed or standalone.
Distributed¶
Distributed is used when an application is submitted to the Streaming Analytics service on IBM Cloud or a IBM Streams distributed instance.
With distributed a running application is a job that contains one or more processing elements (PEs). A PE corresponds to a Linux operating system process. The PEs in a job may be distributed across the resources (hosts) in the Streams instance.
Standalone¶
Standalone is a mode where the complete application is run as a single PE (process) outside of a Streams instance.
Standalone is typically used for ad-hoc testing of an application.
Application log and trace¶
IBM Streams provides application trace and log services.
Application log¶
The Streams application log service is for application logging, where logging is defined as the recording of serviceability information pertaining to application or operator events. The purpose of logging is to provide an administrator with enough information to do problem determination for items they can potentially control. In general, very few events are logged in the normal running scenario of an application or operator. Events pertinent to the failure or partial failure of application runtime scenarios should be logged.
When running in distributed or standalone the com.ibm.streams.log logger has a handler that records messages to the Streams application log service. The level of the logger and its handler are set to the configured application log level at PE start up.
This logger and handler discard any message with level below INFO (20).
Python application code can log a message suitable for an administrator by using
the com.ibm.streams.log logger or a child logger that has logger.propagate
evaulating to True
. Example of logging a file exception:
try:
import numpy
except ImportError as e:
logging.getLogger('com.ibm.streams.log').error(e)
raise
Application code must not modify the com.ibm.streams.log logger, if additional handlers or different levels are required a child logger should be used.
Application trace¶
The Streams application trace service is for application tracing, where tracing is defined as the recording of application or operator internal events and data. The purpose of tracing is to allow application or operator developers to debug their applications or operators.
When running in distributed or standalone the root logger has a handler that records messages to the Streams application trace service. The level of the logger and its handler are set to the configured application trace level at PE start up.
Python application code can trace a message using
the root logger or a child logger that has logger.propagate
evaulating to True
. Example of logging a trace message:
trace = logging.getLogger(__name__)
...
trace.info("Threshold set to %f", val)
Any existing logging performed by modules will automatically become Streams trace messages if the application is using the logging package.
Application code must not modify the root logger, if additional handlers or different levels are required a child logger should be used.
Execution Context¶
This module (streamsx.ec) provides access to the execution context when Python code is running in a Streams application.
- Access is only supported when running:
Streams 4.2 or later
This module may be used by Python functions or classes used in a Topology or decorated SPL operators.
Most functionality is only available when a Python class is being invoked in a Streams application.
Changed in version 1.9: Support for Python 2.7
Module contents¶
Functions
Return the parallel region global channel number obj is executing in. |
|
Return the instance identifier. |
|
Get a named application configuration. |
|
Get the application directory. |
|
Gets the value of a submission time parameter. |
|
Return the instance identifier. |
|
Tests if code is active within a IBM Streams execution context. |
|
Is the execution context standalone. |
|
Return the job identifier. |
|
Return the parallel region local channel number obj is executing in. |
|
Return the local maximum number of channels for the parallel region obj is executing in. |
|
Return the global maximum number of channels for the parallel region obj is executing in. |
|
Return the PE identifier. |
|
Return the processing element (PE) shutdown event. |
Classes
Create a custom metric. |
|
Enumeration for the kind of a metric. |
-
streamsx.ec.
is_active
()¶ Tests if code is active within a IBM Streams execution context.
Returns a true value when called from within a IBM Streams distributed job or standalone execution.
Can be used to only run code required at runtime, such as importing a module that is only needed at runtime and not topology declaration time.
- Returns
True if running in a IBM Streams context false otherwise.
- Return type
bool
New in version 1.11.
-
streamsx.ec.
shutdown
()¶ Return the processing element (PE) shutdown event.
The event is set when the PE is being shutdown. Can be used in source iterators that need to block by sleeping:
# Sleep for 60 seconds unless the PE is being shutdown if streamsx.ec.shutdown.wait(60.0): return None
Code must not call
set()
on the returned event.- Returns
Event object representing PE shutdown.
- Return type
threading.Event
New in version 1.11.
-
streamsx.ec.
domain_id
()¶ Return the instance identifier.
-
streamsx.ec.
instance_id
()¶ Return the instance identifier.
-
streamsx.ec.
job_id
()¶ Return the job identifier.
-
streamsx.ec.
pe_id
()¶ Return the PE identifier.
-
streamsx.ec.
is_standalone
()¶ Is the execution context standalone.
- Returns
True if the execution context is standalone, False if it is distributed.
- Return type
boolean
-
streamsx.ec.
get_application_directory
()¶ Get the application directory.
- Returns
The application directory.
- Return type
str
New in version 1.7.
-
streamsx.ec.
get_application_configuration
(name)¶ Get a named application configuration.
An application configuration is a named set of securely stored properties where each key and its value in the property set is a string.
An application configuration object is used to store information that IBM Streams applications require, such as:
Database connection data
Credentials that your applications need to use to access external systems
Other data, such as the port numbers or URLs of external systems
- Parameters
name (str) – Name of the application configuration.
- Returns
Dictionary containing the property names and values for the application configuration.
- Return type
dict
- Raises
ValueError – Application configuration does not exist.
-
streamsx.ec.
get_submission_time_value
(name, type_=None)¶ Gets the value of a submission time parameter.
Note
Submission parameters must be created with
streamsx.topology.topology.Topology.create_submission_parameter()
at topology declaration time before they can be accessed with this function.Note
Submission time parameters, which are defined within other toolkits that are used by this topology or created with
streamsx.spl.op.Expression
in this topology are not accessible with this function.- Parameters
name (str) – The name of the submission time parameter
type_ – Type of parameter value. Supported values are
str
,int
,float
andbool
.None
assumes that the type of the parameter isstr
and needs no conversion. The type must be the same as used instreamsx.topology.topology.Topology.create_submission_parameter()
or as deduced from the parameters default value.
- Returns
The value of the submission time parameter. The return type is either
str
or the giventype_
.- Raises
ValueError – The submission time parameter with the given name does not exist.
New in version 1.15.
-
streamsx.ec.
channel
(obj)¶ Return the parallel region global channel number obj is executing in.
The channel number is in the range of 0 to
max_channel(obj)
.When the parallel region is not nested this is the same value as
local_channel(obj)
.If the parallel region is nested the value will be between zero and
(width*N - 1)
where N is the number of times the parallel region has been replicated due to nesting.- Parameters
obj – Instance of a class executing within Streams.
- Returns
Parallel region global channel number or -1 if not located in a parallel region.
- Return type
int
-
streamsx.ec.
local_channel
(obj)¶ Return the parallel region local channel number obj is executing in.
The channel number is in the range of zero to
local_max_channel(obj)
.- Parameters
obj – Instance of a class executing within Streams.
- Returns
Parallel region local channel number or -1 if not located in a parallel region.
- Return type
int
-
streamsx.ec.
max_channels
(obj)¶ Return the global maximum number of channels for the parallel region obj is executing in.
When the parallel region is not nested this is the same value as
local_max_channels(obj)
.If the parallel region is nested the value will be
(width*N)
where N is the number of times the parallel region has been replicated due to nesting.- Parameters
obj – Instance of a class executing within Streams.
- Returns
Parallel region global maximum number of channels or 0 if not located in a parallel region.
- Return type
int
-
streamsx.ec.
local_max_channels
(obj)¶ Return the local maximum number of channels for the parallel region obj is executing in.
The maximum number of channels corresponds to the width of the region.
- Parameters
obj – Instance of a class executing within Streams.
- Returns
Parallel region local maximum number of channels or 0 if not located in a parallel region.
- Return type
int
-
class
streamsx.ec.
MetricKind
¶ Bases:
enum.Enum
Enumeration for the kind of a metric.
The kind of the metric only indicates the behavior of value, it does not impose any semantics on the value. The kind is typically used by tooling applications.
-
Counter
= 1¶ A counter metric observes a value that represents a count of an occurrence.
-
Gauge
= 0¶ A gauge metric observes a value that is continuously variable with time.
-
Time
= 2¶ A time metric represents a point in time or duration. The recommended unit of time is milliseconds, using the standard epoch of 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970 to represent a point in time.
-
-
class
streamsx.ec.
CustomMetric
(obj, name, description=None, kind=<MetricKind.Counter: 1>, initialValue=0)¶ Bases:
object
Create a custom metric.
A custom metric holds a 64 bit signed integer value that represents a Counter, Gauge or Time metric.
Custom metrics are exposed through the IBM Streams monitoring APIs.
Metric
name
is unique within the execution context of the callableobj
. Attempts to create multiple metrics with the same name but different kinds will raise an exception. Multiple creations of a metric of the same name and kind all refer to the same metric, the first creation is the only one that will set the initial value.The metric’s value is assigned through the
value
property and can be modified through+=
and-=
.CustomMetric
can also be converted to anint
.- Parameters
obj – Instance of a class executing within Streams.
name (str) – Name of the custom metric.
kind (MetricKind) – Kind of the metric.
description (str) – Description of the metric.
initialValue – Initial value of the metric.
Examples:
Simple example used as an instance to
Stream.filter
:class MyF: def __init__(self, substring): self.substring = substring pass def __call__(self, tuple): if self.substring in str(tuple) self.my_metric += 1 return True # Create the metric when the it is running # in the Streams execution context def __enter__(self): self.my_metric = ec.CustomMetric(self, "count_" + self.substring) # must supply __exit__ if using __enter__ def __exit__(self, exc_type, exc_val, exc_tb): pass def __getstate__(self): # Remove metric from saved state. state = self.__dict__.copy() if 'my_metric' in state: del state['my_metric'] return state def __setstate__(self, state): self.__dict__.update(state)
-
property
value
¶ Current value of the metric.