streamsx.rest_primitives¶
Primitive objects for REST bindings.
Module contents¶
Classes
Domain or instance service. |
|
Contains IBM Streams installation information |
|
Application bundle tied to an instance. |
|
An application configuration. |
|
A base image used for an Edge image build using the EDGE context type. |
|
IBM Streams domain. |
|
Stream exported stream by a job. |
|
Resource in a Streams domain or instance. |
|
Stream imported by a job. |
|
IBM Streams installation. |
|
IBM Streams instance. |
|
A running streams application. |
|
|
A job group definition. |
Streams custom or system metric. |
|
An operator invocation within a job. |
|
Connection between operators. |
|
Operator input port. |
|
Operator output port. |
|
Processing element (PE) within a job. |
|
Stream connection between two PEs. |
|
Metadata for a published topic. |
|
A resource available to a IBM Streams domain. |
|
A resource that is allocated to an IBM Streams instance. |
|
Resource tag defined in a Streams domain |
|
HTTP REST resource identifier. |
|
Streaming Analytics service running on IBM Cloud. |
|
IBM Streams toolkit. |
|
View on a stream. |
|
A stream tuple in view. |
-
class
streamsx.rest_primitives.
ActiveService
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
Domain or instance service.
-
resourceType
¶ Identifies the REST resource type, which is activeService.
- Type
str
-
leader
¶ If True, this service is a standby service.
- Type
bool
-
processId
¶ Process ID of this service.
- Type
str
-
startTime
¶ Epoch time when this service started.
- Type
long
-
status
¶ Status of this service. Some possible values include stopped, running, failed, and unknown.
- Type
str
-
type
¶ Type of this service.
- Type
str
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> services = instances.get_active_services() >>> print(services[0].resourceType) activeService
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
ActiveVersion
(json_active_version)¶ Bases:
object
Contains IBM Streams installation information
-
architecture
¶ Hardware architecture on which product is installed.
- Type
str
-
build_version
¶ Product build ID.
- Type
str
-
edition_name
¶ Product edition.
- Type
str
-
full_product_version
¶ Full product version, including any hot fix.
- Type
str
-
minimum_os_base_version
¶ Minimum operating system version requirement.
- Type
str
-
minimum_os_patch_version
¶ Minimum operating system patch requirement.
- Type
str
-
product_name
¶ Product name.
- Type
str
-
product_version
¶ Product version.
- Type
str
-
-
class
streamsx.rest_primitives.
ApplicationBundle
(_delegator, instance, json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
Application bundle tied to an instance.
New in version 1.11.
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
ApplicationConfiguration
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
An application configuration.
Application configurations are used for secure storage and retrieval of name/value pairs.
An application configuration maintains a set of properties that an application can access at runtime. These are typically used to maintain connection endpoint and credentials for sources and sinks.
-
name
¶ Name of the configuration.
- Type
str
-
description
¶ Description for the configuration.
- Type
str
-
properties
¶ Property values stored for the configuration.
- Type
dict
-
creationTime
¶ Epoch time when this configuraiton was created.
- Type
long
-
lastModifiedTime
¶ Epoch time when this configuration was last modified.
- Type
long
-
delete
()¶ Delete this application configuration.
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
update
(properties=None, description=None)¶ Update this application configuration.
To create or update a property provide its key-value pair in properties.
To delete a property provide its key with the value
None
in properties.- Parameters
properties (dict) – Property values to be updated. If
None
the properties are unchanged.description (str) – Description for the configuration. If
None
the description is unchanged.
- Returns
self
- Return type
-
-
class
streamsx.rest_primitives.
BaseImage
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
A base image used for an Edge image build using the EDGE context type.
-
buildPool
¶ REST URL of the build pool that contains the image
- Type
str
-
id
¶ identifier in the form registry/prefix/imagename:tag
- Type
str
-
name
¶ the image name
- Type
str
-
prefix
¶ the image prefix
- Type
str
-
registry
¶ the registry where the image is stored
- Type
str
-
resourceType
¶ the REST resource type, which is image
- Type
str
-
restid
¶ identifier in the form registry/prefix/imagename:tag
- Type
str
-
tag
¶ the image tag
- Type
str
Example
>>> from streamsx.build import BuildService >>> build_service = BuildService.of_endpoint() >>> baseimages = build_service.get_base_images() >>> print(type(baseimages[0])) <class 'streamsx.rest_primitives.BaseImage'> >>> print (baseimages[0].resourceType) image
New in version 1.15.
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
Domain
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
IBM Streams domain. A domain contains instances that support running Streams applications as jobs.
-
id
¶ Unique ID for this domain.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is domain.
- Type
str
-
creationTime
¶ Epoch time when this domain was created.
- Type
long
-
creationuser
¶ User ID that created this domain.
- Type
str
-
status
¶ Status of this domain. Some possible values include running, stopping, stopped, starting, removing, and unknown.
- Type
str
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> domains = sc.get_domains() >>> print (domains[0].resourceType) domain
-
get_active_services
()¶ Get the list of
ActiveService
elements associated with this domain.- Returns
List of ActiveService elements associated with this domain.
- Return type
list(ActiveService)
-
get_hosts
()¶ Get the list of
Host
elements associated with this domain.- Returns
List of Host elements associated with this domain.
- Return type
list(Host)
-
get_instances
()¶ Get the list of
Instance
elements associated with this domain.- Returns
List of Instance elements associated with this domain.
- Return type
list(Instance)
-
get_resource_allocations
()¶ Get the list of
ResourceAllocation
elements associated with this domain.- Returns
List of ResourceAllocation elements associated with this domain.
- Return type
list(ResourceAllocation)
-
get_resources
()¶ Get the list of
Resource
elements associated with this domain.- Returns
List of Resource elements associated with this domain.
- Return type
list(Resource)
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
ExportedStream
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
Stream exported stream by a job.
-
resourceType
¶ Identifies the REST resource type, which is exportedStream.
- Type
str
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> exportedstreams = instances[0].get_exported_streams() >>> print (exportedstreams[0].resourceType) exportedStream
-
get_operator_output_port
()¶ Get the output port of this exported stream.
- Returns
Output port of this exported stream.
- Return type
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
Host
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
Resource in a Streams domain or instance.
-
name
¶ Configuration name for the IBM Streams resource.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is host.
- Type
str
-
ipAddress
¶ IP address for the IBM Streams resource.
- Type
str
-
processorCount
¶ Number of processors on the IBM Streams resource.
- Type
int
-
restrictedTags
¶ Set of resource tags that processing elements (PEs) must have to run on the IBM Streams resource.
- Type
list(str)
-
services
¶ Name and status of each domain service that is designated to run on the IBM Streams resource.
- Type
list(dict)
-
status
¶ Status of the IBM Streams resource.
- Type
str
-
tag
¶ Names of each tag that is assigned to the IBM Streams resource.
- Type
list(str)
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> domains = sc.get_domains() >>> hosts = domains[0].get_hosts() >>> print (hosts[0].resourceType) host
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
ImportedStream
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
Stream imported by a job.
-
resourceType
¶ Identifies the REST resource type, which is importedStream.
- Type
str
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> importedstreams = instances[0].get_imported_streams() >>> print (importedstreams[0].resourceType) importedStream
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
Installation
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
IBM Streams installation.
-
resourceType
¶ Identifies the REST resource type, which is installation.
- Type
str
-
architecture
¶ Hardware architecture on which product is installed.
- Type
str
-
buildVersion
¶ Product build ID.
- Type
str
-
editionName
¶ Product edition.
- Type
str
-
fullProductVersion
¶ Full product version, including any hot fix.
- Type
str
-
minimumOSBaseVersion
¶ Minimum operating system version requirement.
- Type
str
-
minimumOSPatchVersion
¶ Minimum operating system patch requirement.
- Type
str
-
productName
¶ Product name.
- Type
str
-
productVersion
¶ Product version.
- Type
str
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
Instance
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
IBM Streams instance.
-
id
¶ Unique ID for this instance.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is instance.
- Type
str
-
creationTime
¶ Epoch time when this instance was created.
- Type
long
-
creationuser
¶ User ID that created this instance.
- Type
str
-
health
¶ Summarize status of the jobs in the instance. Some possible values include healthy, partiallyHealthy, partiallyUnhealthy, unhealthy, and unknown.
- Type
str
-
owner
¶ User ID that owns this instance.
- Type
str
-
startTime
¶ Epoch time when this instance was started.
- Type
long
-
status
¶ Status of this instance. Some possible values include running, failed, stopped, and unknown.
- Type
str
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> print (instances[0].resourceType) instance
-
create_application_configuration
(name, properties, description=None)¶ Create an application configuration.
- Parameters
name (str, optional) – Only return application configurations containing property name that matches name. name can be a
-
get_active_services
()¶ Get the list of
ActiveService
elements associated with this instance.- Returns
List of ActiveService elements associated with this instance.
- Return type
list(ActiveService)
-
get_application_configurations
(name=None)¶ Retrieves application configurations for this instance.
- Parameters
name (str, optional) – Only return application configurations containing property name that matches name. name can be a regular expression. If name is not supplied, then all application configurations are returned.
- Returns
A list of application configurations matching the given name.
- Return type
list(ApplicationConfiguration)
-
get_domain
()¶ Get the Streams domain that owns this instance.
- Returns
Streams domain owning this instance.
- Return type
-
get_exported_streams
()¶ Get the list of
ExportedStream
elements associated with this instance.- Returns
List of ExportedStream elements associated with this instance.
- Return type
list(ExportedStream)
-
get_hosts
()¶ Get the list of
Host
element associated with this instance.- Returns
List of Host element associated with this instance.
- Return type
list(Host)
-
get_imported_streams
()¶ Get the list of
ImportedStream
elements associated with this instance.- Returns
List of ImportedStream elements associated with this instance.
- Return type
list(ImportedStream)
-
get_job
(id)¶ Retrieves a job matching the given id
- Parameters
id (str) – Job id to match.
- Returns
Job matching the given id
- Return type
- Raises
ValueError – No resource matches given id or multiple resources matching given id
-
get_job_groups
(name=None)¶ Retrieves job groups defined in this instance.
- Parameters
name (str, optional) – Only return job groups containing property name that matches name. name can be a regular expression. If name is not supplied, then all job groups are returned.
- Returns
A list of job groups matching the given name.
- Return type
list(JobGroup)
Only supported for Streams 5.0 and later.
New in version 1.13.13.
-
get_jobs
(name=None)¶ Retrieves jobs running in this instance.
- Parameters
name (str, optional) – Only return jobs containing property name that matches name. name can be a regular expression. If name is not supplied, then all jobs are returned.
- Returns
A list of jobs matching the given name.
- Return type
list(Job)
Retrieving a list of jobs whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instance = sc.get_instances()[0] >>> jobs = instance.get_jobs(name=".*temperatureApplication*")
-
get_operator_connections
()¶ Get the list of
OperatorConnection
elements associated with this instance.- Returns
List of OperatorConnection elements associated with this instance.
- Return type
list(OperatorConnection)
-
get_operators
(name=None)¶ Get the list of
Operator
elements associated with this instance.- Parameters
name (str) – Only return operators matching name, where name can be a regular expression. If name is not supplied, then all operators for this instance are returned.
- Returns
List of Operator elements associated with this instance.
- Return type
list(Operator)
Retrieving a list of operators whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instance = sc.get_instances()[0] >>> operators = instance.get_operators(name=".*temperatureSensor*")
Changed in version 1.9: name parameter added.
-
get_pe_connections
()¶ Get the list of
PEConnection
elements associated with this instance.- Returns
List of PEConnection elements associated with this instance.
- Return type
list(PEConnection)
-
get_pes
()¶ Get the list of
PE
elements associated with this instance resource.- Returns
List of PE elements associated with this instance.
- Return type
list(PE)
-
get_published_topics
()¶ Get a list of published topics for this instance.
Streams applications publish streams to a a topic that can be subscribed to by other applications. This allows a microservice approach where publishers and subscribers are independent of each other.
A published stream has a topic and a schema. It is recommended that a topic is only associated with a single schema.
Streams may be published and subscribed by applications regardless of the implementation language. For example a Python application can publish a stream of JSON tuples that are subscribed to by SPL and Java applications.
- Returns
List of currently published topics.
- Return type
list(PublishedTopic)
-
get_resource_allocations
()¶ Get the list of
ResourceAllocation
elements associated with this instance.- Returns
List of ResourceAllocation elements associated with this instance.
- Return type
list(ResourceAllocation)
-
get_views
(name=None)¶ Get the list of
View
elements associated with this instance.- Parameters
name (str, optional) – Returns view(s) matching name. name can be a regular expression. If name
not supplied, then all views associated with this instance are returned. (is) –
- Returns
List of views matching name.
- Return type
Retrieving a list of views whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instance = sc.get_instances()[0] >>> view = instance.get_views(name=".*temperatureSensor*")
-
static
of_endpoint
(endpoint=None, service_name=None, username=None, password=None, verify=None)¶ Connect to a Cloud Pak for Data IBM Streams instance.
Two configurations are supported.
Integrated configuration
The Streams instance is defined using the Cloud Pak for Data deployment endpoint (URL) and the Streams service name.
The endpoint is passed in as endpoint defaulting the the environment variable
CP4D_URL
. An example is https://cp4d_server:31843.The Streams service name is passed in as service_name defaulting to the environment variable
STREAMS_INSTANCE_ID
.Standalone configuration
The Streams instance is defined using its Streams REST api endpoint, which is its SWS service.
The endpoint is passed in as endpoint defaulting the the environment variable
STREAMS_REST_URL
. An example is https://streams_sws_service:34679.No service name is specified thus service_name should be passed as
None
or not set.- Parameters
endpoint (str) – Endpoint defining the Streams instance.
service_name (str) – Streams instance name for a integrated configuration. This value is ignored for a standalone configuration.
username (str) – User name to authenticate as. Defaults to the environment variable
STREAMS_USERNAME
or the operating system identifier if not set.password (str) – Password for authentication. Defaults to the environment variable
STREAMS_PASSWORD
or the operating system identifier if not set.verify – SSL verification. Set to
False
to disable SSL verification. Defaults to SSL verification being enabled.
- Returns
Connection to Streams instance or
None
if insufficient configuration was provided.- Return type
New in version 1.13.
-
static
of_service
(config)¶ Connect to an IBM Streams service instance running in Cloud Pak for Data.
The instance is specified in config. The configuration may be code injected from the list of services in a Jupyter notebook running in ICPD or manually created. The code that selects a service instance by name is:
# Two lines are code injected in a Jupyter notebook by selecting the service instance from icpd_core import ipcd_util cfg = icpd_util.get_service_details(name='instanceName', instance_type='streams') instance = Instance.of_service(cfg)
SSL host verification is disabled by setting
SSL_VERIFY
toFalse
within config before calling this method:cfg[ConfigParams.SSL_VERIFY] = False instance = Instance.of_service(cfg)
- Parameters
config (dict) – Configuration of IBM Streams service instance.
- Returns
Instance representing for IBM Streams service instance.
- Return type
Note
Only supported when running within the ICPD cluster, for example in a Jupyter notebook within a ICPD project.
New in version 1.12.
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
submit_job
(bundle, job_config=None)¶ Submit a application to be run in this instance.
- Parameters
bundle (str) – path to a Streams application bundle (sab file) containing the application to be submitted
job_config (JobConfig) – a job configuration overlay
- Returns
Resulting job instance.
- Return type
New in version 1.11.
-
upload_bundle
(bundle)¶ Upload a Streams application bundle (sab) to the instance.
Uploading a bundle allows job submission from the returned
ApplicationBundle
.- Parameters
bundle (str) – path to a Streams application bundle (sab file) containing the application to be uploaded.
- Returns
Application bundle representing the uploaded bundle.
- Return type
Note
When an instance does not support uploading a bundle the returned ApplicationBundle represents the local file
bundle
tied to this instance. The returned object may still be used for job submission.New in version 1.11.
-
-
class
streamsx.rest_primitives.
Job
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
A running streams application.
-
id
¶ job ID.
- Type
str
-
name
¶ Name of the job.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is job.
- Type
str
-
health
¶ Health indicator for the job. Some possible values for this property include healthy, partiallyHealthy, partiallyUnhealthy, unhealthy, and unknown.
- Type
str
-
applicationName
¶ Name of the streams processing application that this job is running.
- Type
str
-
jobGroup
¶ Streams 4.2/4.3 only. Identifies the job group to which this job belongs.
- Type
str
-
startedBy
¶ Identifies the user ID that started this job.
- Type
str
-
status
¶ Status of this job. Some possible values for this property include canceling, running, canceled, and unknown.
- Type
str
-
submitTime
¶ Epoch time when this job was submitted.
- Type
long
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> jobs = instances[0].get_jobs() >>> print (jobs[0].health) healthy
-
cancel
(force=False)¶ Cancel this job.
- Parameters
force (bool, optional) – Forcefully cancel this job.
- Returns
True if the job was cancelled, otherwise False if an error occurred.
- Return type
bool
-
get_domain
()¶ Get the Streams domain that owns this job.
- Returns
Streams domain that owns this job.
- Return type
-
get_hosts
()¶ Get the list of
Host
elements associated with this job.- Returns
List of Host elements associated with this job.
- Return type
list(Host)
-
get_instance
()¶ Get the Streams instance that owns this job.
- Returns
Streams instance that owns this job.
- Return type
-
get_job_group
()¶ Get the job group associated with this job.
New in version 1.13.13.
-
get_operator_connections
()¶ Get the list of
OperatorConnection
elements associated with this job.- Returns
List of OperatorConnection elements associated with this job.
- Return type
list(OperatorConnection)
-
get_operators
(name=None)¶ Get the list of
Operator
elements associated with this job.- Parameters
name (str) – Only return operators matching name, where name can be a regular expression. If name is not supplied, then all operators for this job are returned.
- Returns
List of Operator elements associated with this job.
- Return type
list(Operator)
Retrieving a list of operators whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> job = instances[0].get_jobs()[0] >>> operators = job.get_operators(name=".*temperatureSensor*")
Changed in version 1.9: name parameter added.
-
get_pe_connections
()¶ Get the list of
PEConnection
elements associated with this job.- Returns
List of PEConnection elements associated with this job.
- Return type
list(PEConnection)
-
get_pes
()¶ Get the list of
PE
elements associated with this job.- Returns
List of PE elements associated with this job.
- Return type
list(PE)
-
get_resource_allocations
()¶ Get the list of
ResourceAllocation
elements associated with this job.- Returns
List of ResourceAllocation elements associated with this job.
- Return type
list(ResourceAllocation)
-
get_views
(name=None)¶ Get the list of
View
elements associated with this job.- Parameters
name (str, optional) – Returns view(s) matching name. name can be a regular expression. If name
not supplied, then all views associated with this instance are returned. (is) –
- Returns
List of views matching name.
- Return type
Retrieving a list of views that contain the string “temperatureSensor” could be performed as followed .. rubric:: Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> job = instances[0].get_jobs()[0] >>> views = job.get_views(name = ".*temperatureSensor*")
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
retrieve_log_trace
(filename=None, dir=None)¶ Retrieves the application log and trace files of the job and saves them as a compressed tar file.
An existing file with the same name will be overwritten.
- Parameters
filename (str) – name of the created tar file. Defaults to job_<id>_<timestamp>.tar.gz where id is the job identifier and timestamp is the number of seconds since the Unix epoch, for example
job_355_1511995995.tar.gz
.dir (str) – a valid directory in which to save the archive. Defaults to the current directory.
- Returns
the path to the created tar file, or
None
if retrieving a job’s logs is not supported in the version of IBM Streams to which the job is submitted.- Return type
str
New in version 1.8.
-
update_operators
(job_config)¶ Adjust a job configuration while the job is running
- Parameters
{JobConfig} -- a job configuration overlay (job_config) –
- Returns
[JSON] – The result of applying the new jobConfig?
-
-
class
streamsx.rest_primitives.
Metric
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
Streams custom or system metric.
-
name
¶ Name of this metric.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is metric.
- Type
str
-
description
¶ Describes this metric.
- Type
str
-
lastTimeRetrieved
¶ Epoch time when the metric was most recently retrieved.
- Type
str
-
metricKind
¶ Kind of metric. Some possible values include counter, gauge, time and unknown.
- Type
str
-
metricType
¶ Type of metric. Some possible values include system, custom and unknown.
- Type
str
-
value
¶ Value for the metric.
- Type
int
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> operators = instances[0].get_operators() >>> metrics = operators[0].get_metrics() >>> print (metrics[0].resourceType) metric
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
OperatorConnection
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
Connection between operators.
-
id
¶ Unique ID of this operator connection within the instance.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is operator.
- Type
str
-
required
¶ Indicates whether the connection is required.
- Type
bool
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> operatorconnections = instances[0].get_operator_connections() >>> print (operatorconnections[0].resourceType) operatorConnection
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
OperatorInputPort
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
Operator input port.
-
name
¶ Name of this input port.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is operatorInputPort.
- Type
str
-
indexWithinOperator
¶ Index of the input port within the operator.
- Type
int
New in version 1.9.
-
get_connections
()¶ Get the list of
OperatorConnection
elements associated with this port.- Returns
List of OperatorConnection elements associated with this port.
- Return type
list(OperatorConnection)
New in version 1.13.
-
get_metrics
(name=None)¶ Get metrics for this input port.
- Parameters
name (str, optional) – Only return metrics matching name, where name can be a regular expression. If name is not supplied, then all metrics for this input port are returned.
- Returns
List of matching metrics.
- Return type
list(Metric)
Retrieving a list of metrics whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> operator = instances[0].get_operators()[0] >>> input_port = operator.get_input_ports()[0] >>> metrics = input_port.get_metrics(name='*temperatureSensor*')
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
OperatorOutputPort
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
Operator output port.
-
name
¶ Name of this output port.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is operatorOutputPort.
- Type
str
-
indexWithinOperator
¶ Index of the output port within the operator.
- Type
int
-
streamName
¶ Name of the stream that is associated with this output port.
- Type
str
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> exportedstreams = instances[0].get_exported_streams() >>> operatoroutputport = exportedstreams[0].get_operator_output_port() >>> print (operatoroutputport.resourceType) operatorOutputPort
-
get_connections
()¶ Get the list of
OperatorConnection
elements associated with this port.- Returns
List of OperatorConnection elements associated with this port.
- Return type
list(OperatorConnection)
New in version 1.13.
-
get_metrics
(name=None)¶ Get metrics for this output port.
- Parameters
name (str, optional) – Only return metrics matching name, where name can be a regular expression. If name is not supplied, then all metrics for this output port are returned.
- Returns
List of matching metrics.
- Return type
list(Metric)
Retrieving a list of metrics whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> exportedstreams = instances[0].get_exported_streams() >>> operatoroutputport = exportedstreams[0].get_operator_output_port() >>> operatoroutputport.get_metrics(name='*temperatureSensor*')
New in version 1.9.
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
Operator
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
An operator invocation within a job.
-
name
¶ Operator name.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is operator.
- Type
str
-
operatorKind
¶ SPL primitive operator type for this operator.
- Type
str
-
indexWithinJob
¶ Index of this operator within the job.
- Type
int
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> operators = instances[0].get_operators() >>> print (operators[0].resourceType) operator
-
get_host
()¶ - Get resource this operator is currently executing in.
If the operator is running on an externally managed resource
None
is returned.
- Returns
Resource this operator is running on.
- Return type
New in version 1.9.
-
get_input_ports
()¶ Get list of input ports for this operator.
- Returns
Input ports for this operator.
- Return type
list(OperatorInputPort)
New in version 1.9.
-
get_job
()¶ Get the Streams job that owns this operator.
- Returns
Streams Job owning this operator.
- Return type
-
get_metrics
(name=None)¶ Get metrics for this operator.
- Parameters
name (str, optional) – Only return metrics matching name, where name can be a regular expression. If name is not supplied, then all metrics for this operator are returned.
- Returns
List of matching metrics.
- Return type
list(Metric)
Retrieving a list of metrics whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> operator = instances[0].get_operators()[0] >>> metrics = op.get_metrics(name='*temperatureSensor*')
-
get_output_ports
()¶ Get list of output ports for this operator.
- Returns
Output ports for this operator.
- Return type
list(OperatorOutputPort)
New in version 1.9.
-
get_pe
()¶ Get the Streams processing element this operator is executing in.
- Returns
Processing element for this operator.
- Return type
New in version 1.9.
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
PEConnection
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
Stream connection between two PEs.
-
id
¶ PE connection ID.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is peConnection.
- Type
str
-
required
¶ Indicates whether this connection is required.
- Type
bool
-
status
¶ Status of this connection. Some possible values include connected, disconnected, and unknown.
- Type
str
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> peconnections = instances.get_pe_connections() >>> print(peconnections[0].resourceType) peConnection
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
PE
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
Processing element (PE) within a job. A processing element hosts one or more operators within a single job.
-
id
¶ PE ID.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is pe.
- Type
str
-
health
¶ Health indicator for this PE. Some possible values include healthy, partiallyHealthy, partiallyUnhealthy, unhealthy, and unknown.
- Type
str
-
indexWithinJob
¶ Index of the PE within the job.
- Type
int
-
launchCount
¶ Number of times this PE was started manually or automatically because of failures.
- Type
int
-
optionalConnections
¶ Status of optional connections for this PE. Some possible values include connected, disconnected, partiallyConnected, and unknown.
- Type
str
-
pendingTracingLevel
¶ Describes a pending change to the granularity of the trace information that is stored for this PE. Some possible values include off, error, debug and trace. The value is None, if no change is pending.
- Type
str
-
processId
¶ Operating system process ID for this PE.
- Type
str
-
relocatable
¶ Indicates whether this PE can be relocated to a different resource.
- Type
bool
-
requiredConnections
¶ Status of the required connections for this PE. Some possible values include connected, disconnected, partiallyConnected, and unknown.
- Type
str
-
restartable
¶ Indicates whether this PE can be restarted.
- Type
bool
-
status
¶ Status of this PE.
- Type
str
-
statusReason
¶ Additional information for the status of this PE.
- Type
str
-
tracingLevel
¶ Granularity of the trace information. Some possible values include off, error, debug and trace.
- Type
str
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> pes = instances.get_pes() >>> print(pes[0].resourceType) pe
-
get_host
()¶ - Get resource this processing element is currently executing in.
If the processing element is running on an externally managed resource
None
is returned.
- Returns
Resource this processing element is running on.
- Return type
New in version 1.9.
-
get_job
()¶ Get the Streams job that owns this PE.
- Returns
Streams Job owning this PE.
- Return type
-
get_metrics
(name=None)¶ Get metrics for this PE.
- Parameters
name (str, optional) – Only return metrics matching name, where name can be a regular expression. If name is not supplied, then all metrics for this PE are returned.
- Returns
List of matching metrics.
- Return type
list(Metric)
Retrieving a list of metrics whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> pe = instances.get_pes()[0] >>> metrics = pe.get_metrics(name='*temperatureSensor*')
New in version 1.9.
-
get_resource
()¶ Get resource this processing element is currently executing in.
- Returns
Resource this processing element is running on.
- Return type
New in version 1.13.13.
-
get_resource_allocation
()¶ Get the
ResourceAllocation
element tance.- Returns
Resource allocation used to access information about the resource where this PE is running.
- Return type
New in version 1.9.
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
retrieve_console_log
(filename=None, dir=None)¶ Retrieves the application console log (standard out and error) files for this PE and saves them as a plain text file.
An existing file with the same name will be overwritten.
- Parameters
filename (str) – name of the created file. Defaults to pe_<id>_<timestamp>.stdouterr where id is the PE identifier and timestamp is the number of seconds since the Unix epoch, for example
pe_83_1511995995.trace
.dir (str) – a valid directory in which to save the file. Defaults to the current directory.
- Returns
the path to the created file, or None if retrieving a job’s logs is not supported in the version of streams to which the job is submitted.
- Return type
str
New in version 1.9.
-
retrieve_trace
(filename=None, dir=None)¶ Retrieves the application trace files for this PE and saves them as a plain text file.
An existing file with the same name will be overwritten.
- Parameters
filename (str) – name of the created file. Defaults to pe_<id>_<timestamp>.trace where id is the PE identifier and timestamp is the number of seconds since the Unix epoch, for example
pe_83_1511995995.trace
.dir (str) – a valid directory in which to save the file. Defaults to the current directory.
- Returns
the path to the created file, or None if retrieving a job’s logs is not supported in the version of streams to which the job is submitted.
- Return type
str
New in version 1.9.
-
-
class
streamsx.rest_primitives.
PublishedTopic
(topic, schema)¶ Bases:
object
Metadata for a published topic.
-
topic
¶ Published topic
- Type
str
-
schema
¶ Schema of topic
- Type
str
-
-
class
streamsx.rest_primitives.
ResourceAllocation
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
A resource that is allocated to an IBM Streams instance.
-
resourceType
¶ Identifies the REST resource type, which is resourceAllocation.
- Type
str
-
applicationResource
¶ Indicates whether this resource is an application resource, which is used to run streams processing applications.
- Type
bool
-
schedulerStatus
¶ Indicates whether this resource is schedulable for the instance.
- Type
str
-
status
¶ Status of this resource for the instance.
- Type
str
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> allocations = instances.get_resource_allocations() >>> print(allocations[0].resourceType) resourceAllocation
-
get_jobs
(name=None)¶ Retrieves jobs running on this resource in its instance.
- Parameters
name (str, optional) – Only return jobs containing property name that matches name. name can be a regular expression. If name is not supplied, then all jobs are returned.
- Returns
A list of jobs matching the given name.
- Return type
list(Job)
Note
If
applicationResource
is False an empty list is returned.New in version 1.9.
-
get_pes
()¶ Get the list of
PE
running on this resource in its instance.- Returns
List of PE running on this resource.
- Return type
list(PE)
Note
If
applicationResource
is False an empty list is returned.New in version 1.9.
-
get_resource
()¶ Get the
Resource
of the resource allocation.- Returns
Resource for this allocation.
- Return type
New in version 1.9.
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
Resource
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
A resource available to a IBM Streams domain.
-
id
¶ Resource identifier.
- Type
str
-
displayName
¶ Resource display name.
- Type
str
-
ipAddress
¶ IP address.
- Type
str
-
status
¶ Resource status.
- Type
str
Tags assigned to resource.
- Type
list[str]
New in version 1.9.
-
get_metrics
(name=None)¶ Get metrics for this resource.
- Parameters
name (str, optional) – Only return metrics matching name, where name can be a regular expression. If name is not supplied, then all metrics for this resource are returned.
- Returns
List of matching metrics.
- Return type
list(Metric)
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
ResourceTag
(json_resource_tag)¶ Bases:
object
Resource tag defined in a Streams domain
-
definition_format_properties
¶ Indicates whether the resource definition consists of one or more properties.
- Type
bool
-
description
¶ Tag description.
- Type
str
-
name
¶ Tag name.
- Type
str
-
properties_definition
¶ Contains the properties of the resource definition. Only present if definition_format_properties is True.
- Type
list(str)
-
reserved
¶ If True, this tag is defined by IBM Streams, and cannot be modified.
- Type
bool
-
-
class
streamsx.rest_primitives.
RestResource
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
HTTP REST resource identifier.
-
name
¶ Resource name.
- Type
str
-
resource
¶ A string that identifies the URI for the resource.
- Type
str
Changed in version 1.9: Changed to RestResource from Resource.
-
get_resource
()¶ Make a request against this REST resource. :returns: JSON response. :rtype: dict
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
StreamingAnalyticsService
(rest_client, credentials)¶ Bases:
object
Streaming Analytics service running on IBM Cloud.
-
cancel_job
(job_id=None, job_name=None)¶ Cancel a running job.
- Parameters
job_id (str, optional) – Identifier of job to be canceled.
job_name (str, optional) – Name of job to be canceled.
- Returns
JSON response for the job cancel operation.
- Return type
dict
-
get_instance_status
()¶ Get the status the instance for this Streaming Analytics service.
- Returns
JSON response for the instance status operation.
- Return type
dict
-
start_instance
()¶ Start the instance for this Streaming Analytics service.
- Returns
JSON response for the instance start operation.
- Return type
dict
-
stop_instance
()¶ Stop the instance for this Streaming Analytics service.
- Returns
JSON response for the instance start operation.
- Return type
dict
-
submit_job
(bundle, job_config=None)¶ Submit a Streams Application Bundle (sab file) to this Streaming Analytics service.
- Parameters
bundle (str) – path to a Streams application bundle (sab file) containing the application to be submitted
job_config (JobConfig) – a job configuration overlay
- Returns
- JSON response from service containing ‘name’ field with unique
job name assigned to submitted job, or, ‘error_status’ and ‘description’ fields if submission was unsuccessful.
- Return type
dict
-
-
class
streamsx.rest_primitives.
Toolkit
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
IBM Streams toolkit.
-
id
¶ Unique ID for this instance.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is toolkit.
- Type
str
-
name
¶ The name of the toolkit.
- Type
str
-
version
¶ The version of the toolkit.
- Type
str
-
requiredProductVersion
¶ The earliest version of Streams required by the toolkit.
- Type
str
-
path
¶ The full path to the toolkit.
- Type
str
Example
>>> from streamsx.build import BuildService >>> build_service = BuildService.of_endpoint() >>> toolkits = build_service.get_toolkits() >>> print (toolkits[0].resourceType) toolkit
New in version 1.13.
-
class
Dependency
(name, version)¶ Bases:
object
The name, and range of versions, of a toolkit required by another toolkit.
-
name
¶ the name of the required toolkit
- Type
str
-
version
¶ the range of versions required of the toolkit
- Type
str
-
-
property
dependencies
¶ Find all the dependencies for this toolkit.
- Returns
List of dependencies of this toolkit. If this toolkit does not have any dependencies, this will be an empty list.
- Return type
list(Dependency)
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
ViewItem
(json_rep, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
A stream tuple in view.
-
collectionTime
¶ Epoch time when this viewItem is collected from the stream.
- Type
long
-
data
¶ Content of this viewItem.
- Type
dict
-
resourceType
¶ Identifies the REST resource type, which is viewItem.
- Type
str
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> views = instances[0].get_views() >>> viewitems = views[0].get_view_items() >>> print (viewitems[0].resourceType) viewItem
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
-
class
streamsx.rest_primitives.
View
(json_view, rest_client)¶ Bases:
streamsx.rest_primitives._ResourceElement
View on a stream.
-
id
¶ An unique identifier for the view.
- Type
str
-
name
¶ View name.
- Type
str
-
description
¶ Description of the view.
- Type
str
-
resourceType
¶ Identifies the REST resource type, which is view.
- Type
str
-
activateOption
¶ Indicate when the view starts buffering data.
- Type
str
-
maximumTupleRate
¶ Maximum Number of tuples at which the view collects per second.
- Type
int
-
logicalOperatorName
¶ The logical name of the operator that contains the output port on which the view is created.
- Type
str
-
bufferCapacitySeconds
¶ Buffer size measured in seconds.
- Type
int
-
bufferCapacityTuples
¶ Buffer size measured in number of tuples.
- Type
int
-
bufferCapacityUnits
¶ Indicates whether the buffer capacity for the view is determined by seconds, tuples or unknown.
- Type
str
Example
>>> from streamsx import rest >>> sc = rest.StreamingAnalyticsConnection() >>> instances = sc.get_instances() >>> views = instances[0].get_views() >>> print (views[0].resourceType) view
-
display
(duration=None, period=2)¶ Display a view within a Jupyter or IPython notebook.
Provides an easy mechanism to visualize data on a stream using a view.
Tuples are fetched from the view and displayed in a table within the notebook cell using a
pandas.DataFrame
. The table is continually updated with the latest tuples from the view.This method calls
start_data_fetch()
and will callstop_data_fetch()
when completed if duration is set.- Parameters
duration (float) – Number of seconds to fetch and display tuples. If
None
then the display will be updated untilstop_data_fetch()
is called.period (float) – Maximum update period.
Note
A view is a sampling of data on a stream so tuples that are on the stream may not appear in the view.
Note
Python modules ipywidgets and pandas must be installed in the notebook environment.
Warning
Behavior when called outside a notebook is undefined.
New in version 1.12.
-
fetch_tuples
(max_tuples=20, timeout=None)¶ Fetch a number of tuples from this view.
Fetching of data must have been started with
start_data_fetch()
before calling this method.If
timeout
isNone
then the returned list will containmax_tuples
tuples. Otherwise if the timeout is reached the list may contain less thanmax_tuples
tuples.- Parameters
max_tuples (int) – Maximum number of tuples to fetch.
timeout (float) – Maximum time to wait for
max_tuples
tuples.
- Returns
List of fetched tuples.
- Return type
list
New in version 1.12.
-
get_domain
()¶ Get the Streams domain for the instance that owns this view.
- Returns
Streams domain for the instance owning this view.
- Return type
-
get_instance
()¶ Get the Streams instance that owns this view.
- Returns
Streams instance owning this view.
- Return type
-
get_job
()¶ Get the Streams job that owns this view.
- Returns
Streams Job owning this view.
- Return type
-
get_view_items
()¶ Get a list of
ViewItem
elements associated with this view.- Returns
List of ViewItem(s) associated with this view.
- Return type
list(ViewItem)
-
refresh
()¶ Refresh the resource and update the attributes to reflect the latest status.
-
start_data_fetch
()¶ Starts a thread that fetches data from the Streams view server.
Each item in the returned Queue represents a single tuple on the stream the view is attached to.
- Returns
Queue containing view data.
- Return type
queue.Queue
Note
This is a queue of the tuples coverted to Python objects, it is not a queue of
ViewItem
objects.
-
stop_data_fetch
()¶ Stops the thread that fetches data from the Streams view server.
-