streamsx.rest_primitives

Primitive objects for REST bindings.

Overview

Contains classes representing primitive Streams objects, such as Instance, Job, PE, etc.

Module contents

Classes

ActiveService

Domain or instance service.

ActiveVersion

Contains IBM Streams installation information

ApplicationBundle

Application bundle tied to an instance.

ApplicationConfiguration

An application configuration.

BaseImage

A base image used for an Edge image build using the EDGE context type.

Domain

IBM Streams domain.

ExportedStream

Stream exported stream by a job.

Host

Resource in a Streams domain or instance.

ImportedStream

Stream imported by a job.

Installation

IBM Streams installation.

Instance

IBM Streams instance.

Job

A running streams application.

JobGroup

A job group definition.

Metric

Streams custom or system metric.

Operator

An operator invocation within a job.

OperatorConnection

Connection between operators.

OperatorInputPort

Operator input port.

OperatorOutputPort

Operator output port.

PE

Processing element (PE) within a job.

PEConnection

Stream connection between two PEs.

PublishedTopic

Metadata for a published topic.

Resource

A resource available to a IBM Streams domain.

ResourceAllocation

A resource that is allocated to an IBM Streams instance.

ResourceTag

Resource tag defined in a Streams domain

RestResource

HTTP REST resource identifier.

StreamingAnalyticsService

Streaming Analytics service running on IBM Cloud.

Toolkit

IBM Streams toolkit.

View

View on a stream.

ViewItem

A stream tuple in view.

class streamsx.rest_primitives.ActiveService(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

Domain or instance service.

resourceType

Identifies the REST resource type, which is activeService.

Type

str

leader

If True, this service is a standby service.

Type

bool

processId

Process ID of this service.

Type

str

startTime

Epoch time when this service started.

Type

long

status

Status of this service. Some possible values include stopped, running, failed, and unknown.

Type

str

type

Type of this service.

Type

str

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> services = instances.get_active_services()
>>> print(services[0].resourceType)
activeService
refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.ActiveVersion(json_active_version)

Bases: object

Contains IBM Streams installation information

architecture

Hardware architecture on which product is installed.

Type

str

build_version

Product build ID.

Type

str

edition_name

Product edition.

Type

str

full_product_version

Full product version, including any hot fix.

Type

str

minimum_os_base_version

Minimum operating system version requirement.

Type

str

minimum_os_patch_version

Minimum operating system patch requirement.

Type

str

product_name

Product name.

Type

str

product_version

Product version.

Type

str

class streamsx.rest_primitives.ApplicationBundle(_delegator, instance, json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

Application bundle tied to an instance.

New in version 1.11.

refresh()

Refresh the resource and update the attributes to reflect the latest status.

submit_job(job_config=None)

Submit this Streams Application Bundle (sab file) to its associated instance.

Parameters

job_config (JobConfig) – a job configuration overlay

Returns

Resulting job instance.

Return type

Job

class streamsx.rest_primitives.ApplicationConfiguration(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

An application configuration.

Application configurations are used for secure storage and retrieval of name/value pairs.

An application configuration maintains a set of properties that an application can access at runtime. These are typically used to maintain connection endpoint and credentials for sources and sinks.

name

Name of the configuration.

Type

str

description

Description for the configuration.

Type

str

properties

Property values stored for the configuration.

Type

dict

creationTime

Epoch time when this configuraiton was created.

Type

long

lastModifiedTime

Epoch time when this configuration was last modified.

Type

long

delete()

Delete this application configuration.

refresh()

Refresh the resource and update the attributes to reflect the latest status.

update(properties=None, description=None)

Update this application configuration.

To create or update a property provide its key-value pair in properties.

To delete a property provide its key with the value None in properties.

Parameters
  • properties (dict) – Property values to be updated. If None the properties are unchanged.

  • description (str) – Description for the configuration. If None the description is unchanged.

Returns

self

Return type

ApplicationConfiguration

class streamsx.rest_primitives.BaseImage(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

A base image used for an Edge image build using the EDGE context type.

buildPool

REST URL of the build pool that contains the image

Type

str

id

identifier in the form registry/prefix/imagename:tag

Type

str

name

the image name

Type

str

prefix

the image prefix

Type

str

registry

the registry where the image is stored

Type

str

resourceType

the REST resource type, which is image

Type

str

restid

identifier in the form registry/prefix/imagename:tag

Type

str

tag

the image tag

Type

str

Example

>>> from streamsx.build import BuildService
>>> build_service = BuildService.of_endpoint()
>>> baseimages = build_service.get_base_images()
>>> print(type(baseimages[0]))
<class 'streamsx.rest_primitives.BaseImage'>
>>> print (baseimages[0].resourceType)
image

New in version 1.15.

refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.Domain(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

IBM Streams domain. A domain contains instances that support running Streams applications as jobs.

id

Unique ID for this domain.

Type

str

resourceType

Identifies the REST resource type, which is domain.

Type

str

creationTime

Epoch time when this domain was created.

Type

long

creationuser

User ID that created this domain.

Type

str

status

Status of this domain. Some possible values include running, stopping, stopped, starting, removing, and unknown.

Type

str

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> domains = sc.get_domains()
>>> print (domains[0].resourceType)
domain
get_active_services()

Get the list of ActiveService elements associated with this domain.

Returns

List of ActiveService elements associated with this domain.

Return type

list(ActiveService)

get_hosts()

Get the list of Host elements associated with this domain.

Returns

List of Host elements associated with this domain.

Return type

list(Host)

get_instances()

Get the list of Instance elements associated with this domain.

Returns

List of Instance elements associated with this domain.

Return type

list(Instance)

get_resource_allocations()

Get the list of ResourceAllocation elements associated with this domain.

Returns

List of ResourceAllocation elements associated with this domain.

Return type

list(ResourceAllocation)

get_resources()

Get the list of Resource elements associated with this domain.

Returns

List of Resource elements associated with this domain.

Return type

list(Resource)

refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.ExportedStream(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

Stream exported stream by a job.

resourceType

Identifies the REST resource type, which is exportedStream.

Type

str

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> exportedstreams = instances[0].get_exported_streams()
>>> print (exportedstreams[0].resourceType)
exportedStream
get_operator_output_port()

Get the output port of this exported stream.

Returns

Output port of this exported stream.

Return type

OperatorOutputPort

refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.Host(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

Resource in a Streams domain or instance.

name

Configuration name for the IBM Streams resource.

Type

str

resourceType

Identifies the REST resource type, which is host.

Type

str

ipAddress

IP address for the IBM Streams resource.

Type

str

processorCount

Number of processors on the IBM Streams resource.

Type

int

restrictedTags

Set of resource tags that processing elements (PEs) must have to run on the IBM Streams resource.

Type

list(str)

services

Name and status of each domain service that is designated to run on the IBM Streams resource.

Type

list(dict)

status

Status of the IBM Streams resource.

Type

str

tag

Names of each tag that is assigned to the IBM Streams resource.

Type

list(str)

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> domains = sc.get_domains()
>>> hosts = domains[0].get_hosts()
>>> print (hosts[0].resourceType)
host
refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.ImportedStream(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

Stream imported by a job.

resourceType

Identifies the REST resource type, which is importedStream.

Type

str

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> importedstreams = instances[0].get_imported_streams()
>>> print (importedstreams[0].resourceType)
importedStream
refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.Installation(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

IBM Streams installation.

resourceType

Identifies the REST resource type, which is installation.

Type

str

architecture

Hardware architecture on which product is installed.

Type

str

buildVersion

Product build ID.

Type

str

editionName

Product edition.

Type

str

fullProductVersion

Full product version, including any hot fix.

Type

str

minimumOSBaseVersion

Minimum operating system version requirement.

Type

str

minimumOSPatchVersion

Minimum operating system patch requirement.

Type

str

productName

Product name.

Type

str

productVersion

Product version.

Type

str

refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.Instance(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

IBM Streams instance.

id

Unique ID for this instance.

Type

str

resourceType

Identifies the REST resource type, which is instance.

Type

str

creationTime

Epoch time when this instance was created.

Type

long

creationuser

User ID that created this instance.

Type

str

health

Summarize status of the jobs in the instance. Some possible values include healthy, partiallyHealthy, partiallyUnhealthy, unhealthy, and unknown.

Type

str

owner

User ID that owns this instance.

Type

str

startTime

Epoch time when this instance was started.

Type

long

status

Status of this instance. Some possible values include running, failed, stopped, and unknown.

Type

str

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> print (instances[0].resourceType)
instance
create_application_configuration(name, properties, description=None)

Create an application configuration.

Parameters

name (str, optional) – Only return application configurations containing property name that matches name. name can be a

get_active_services()

Get the list of ActiveService elements associated with this instance.

Returns

List of ActiveService elements associated with this instance.

Return type

list(ActiveService)

get_application_configurations(name=None)

Retrieves application configurations for this instance.

Parameters

name (str, optional) – Only return application configurations containing property name that matches name. name can be a regular expression. If name is not supplied, then all application configurations are returned.

Returns

A list of application configurations matching the given name.

Return type

list(ApplicationConfiguration)

get_domain()

Get the Streams domain that owns this instance.

Returns

Streams domain owning this instance.

Return type

Domain

get_exported_streams()

Get the list of ExportedStream elements associated with this instance.

Returns

List of ExportedStream elements associated with this instance.

Return type

list(ExportedStream)

get_hosts()

Get the list of Host element associated with this instance.

Returns

List of Host element associated with this instance.

Return type

list(Host)

get_imported_streams()

Get the list of ImportedStream elements associated with this instance.

Returns

List of ImportedStream elements associated with this instance.

Return type

list(ImportedStream)

get_job(id)

Retrieves a job matching the given id

Parameters

id (str) – Job id to match.

Returns

Job matching the given id

Return type

Job

Raises

ValueError – No resource matches given id or multiple resources matching given id

get_job_groups(name=None)

Retrieves job groups defined in this instance.

Parameters

name (str, optional) – Only return job groups containing property name that matches name. name can be a regular expression. If name is not supplied, then all job groups are returned.

Returns

A list of job groups matching the given name.

Return type

list(JobGroup)

Only supported for Streams 5.0 and later.

New in version 1.13.13.

get_jobs(name=None)

Retrieves jobs running in this instance.

Parameters

name (str, optional) – Only return jobs containing property name that matches name. name can be a regular expression. If name is not supplied, then all jobs are returned.

Returns

A list of jobs matching the given name.

Return type

list(Job)

Retrieving a list of jobs whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instance = sc.get_instances()[0]
>>> jobs = instance.get_jobs(name=".*temperatureApplication*")
get_operator_connections()

Get the list of OperatorConnection elements associated with this instance.

Returns

List of OperatorConnection elements associated with this instance.

Return type

list(OperatorConnection)

get_operators(name=None)

Get the list of Operator elements associated with this instance.

Parameters

name (str) – Only return operators matching name, where name can be a regular expression. If name is not supplied, then all operators for this instance are returned.

Returns

List of Operator elements associated with this instance.

Return type

list(Operator)

Retrieving a list of operators whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instance = sc.get_instances()[0]
>>> operators = instance.get_operators(name=".*temperatureSensor*")

Changed in version 1.9: name parameter added.

get_pe_connections()

Get the list of PEConnection elements associated with this instance.

Returns

List of PEConnection elements associated with this instance.

Return type

list(PEConnection)

get_pes()

Get the list of PE elements associated with this instance resource.

Returns

List of PE elements associated with this instance.

Return type

list(PE)

get_published_topics()

Get a list of published topics for this instance.

Streams applications publish streams to a a topic that can be subscribed to by other applications. This allows a microservice approach where publishers and subscribers are independent of each other.

A published stream has a topic and a schema. It is recommended that a topic is only associated with a single schema.

Streams may be published and subscribed by applications regardless of the implementation language. For example a Python application can publish a stream of JSON tuples that are subscribed to by SPL and Java applications.

Returns

List of currently published topics.

Return type

list(PublishedTopic)

get_resource_allocations()

Get the list of ResourceAllocation elements associated with this instance.

Returns

List of ResourceAllocation elements associated with this instance.

Return type

list(ResourceAllocation)

get_views(name=None)

Get the list of View elements associated with this instance.

Parameters
  • name (str, optional) – Returns view(s) matching name. name can be a regular expression. If name

  • not supplied, then all views associated with this instance are returned. (is) –

Returns

List of views matching name.

Return type

list(streamsx.rest_primitives.View)

Retrieving a list of views whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instance = sc.get_instances()[0]
>>> view = instance.get_views(name=".*temperatureSensor*")
static of_endpoint(endpoint=None, service_name=None, username=None, password=None, verify=None)

Connect to a Cloud Pak for Data IBM Streams instance.

Two configurations are supported.

Integrated configuration

The Streams instance is defined using the Cloud Pak for Data deployment endpoint (URL) and the Streams service name.

The endpoint is passed in as endpoint defaulting the the environment variable CP4D_URL. An example is https://cp4d_server:31843.

The Streams service name is passed in as service_name defaulting to the environment variable STREAMS_INSTANCE_ID.

Standalone configuration

The Streams instance is defined using its Streams REST api endpoint, which is its SWS service.

The endpoint is passed in as endpoint defaulting the the environment variable STREAMS_REST_URL. An example is https://streams_sws_service:34679.

No service name is specified thus service_name should be passed as None or not set.

Parameters
  • endpoint (str) – Endpoint defining the Streams instance.

  • service_name (str) – Streams instance name for a integrated configuration. This value is ignored for a standalone configuration.

  • username (str) – User name to authenticate as. Defaults to the environment variable STREAMS_USERNAME or the operating system identifier if not set.

  • password (str) – Password for authentication. Defaults to the environment variable STREAMS_PASSWORD or the operating system identifier if not set.

  • verify – SSL verification. Set to False to disable SSL verification. Defaults to SSL verification being enabled.

Returns

Connection to Streams instance or None if insufficient configuration was provided.

Return type

Instance

New in version 1.13.

static of_service(config)

Connect to an IBM Streams service instance running in Cloud Pak for Data.

The instance is specified in config. The configuration may be code injected from the list of services in a Jupyter notebook running in ICPD or manually created. The code that selects a service instance by name is:

# Two lines are code injected in a Jupyter notebook by selecting the service instance
from icpd_core import ipcd_util
cfg = icpd_util.get_service_details(name='instanceName', instance_type='streams')

instance = Instance.of_service(cfg)

SSL host verification is disabled by setting SSL_VERIFY to False within config before calling this method:

cfg[ConfigParams.SSL_VERIFY] = False
instance = Instance.of_service(cfg)
Parameters

config (dict) – Configuration of IBM Streams service instance.

Returns

Instance representing for IBM Streams service instance.

Return type

Instance

Note

Only supported when running within the ICPD cluster, for example in a Jupyter notebook within a ICPD project.

New in version 1.12.

refresh()

Refresh the resource and update the attributes to reflect the latest status.

submit_job(bundle, job_config=None)

Submit a application to be run in this instance.

Parameters
  • bundle (str) – path to a Streams application bundle (sab file) containing the application to be submitted

  • job_config (JobConfig) – a job configuration overlay

Returns

Resulting job instance.

Return type

Job

New in version 1.11.

upload_bundle(bundle)

Upload a Streams application bundle (sab) to the instance.

Uploading a bundle allows job submission from the returned ApplicationBundle.

Parameters

bundle (str) – path to a Streams application bundle (sab file) containing the application to be uploaded.

Returns

Application bundle representing the uploaded bundle.

Return type

ApplicationBundle

Note

When an instance does not support uploading a bundle the returned ApplicationBundle represents the local file bundle tied to this instance. The returned object may still be used for job submission.

New in version 1.11.

class streamsx.rest_primitives.Job(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

A running streams application.

id

job ID.

Type

str

name

Name of the job.

Type

str

resourceType

Identifies the REST resource type, which is job.

Type

str

health

Health indicator for the job. Some possible values for this property include healthy, partiallyHealthy, partiallyUnhealthy, unhealthy, and unknown.

Type

str

applicationName

Name of the streams processing application that this job is running.

Type

str

jobGroup

Streams 4.2/4.3 only. Identifies the job group to which this job belongs.

Type

str

startedBy

Identifies the user ID that started this job.

Type

str

status

Status of this job. Some possible values for this property include canceling, running, canceled, and unknown.

Type

str

submitTime

Epoch time when this job was submitted.

Type

long

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> jobs = instances[0].get_jobs()
>>> print (jobs[0].health)
healthy
cancel(force=False)

Cancel this job.

Parameters

force (bool, optional) – Forcefully cancel this job.

Returns

True if the job was cancelled, otherwise False if an error occurred.

Return type

bool

get_domain()

Get the Streams domain that owns this job.

Returns

Streams domain that owns this job.

Return type

Domain

get_hosts()

Get the list of Host elements associated with this job.

Returns

List of Host elements associated with this job.

Return type

list(Host)

get_instance()

Get the Streams instance that owns this job.

Returns

Streams instance that owns this job.

Return type

Instance

get_job_group()

Get the job group associated with this job.

New in version 1.13.13.

get_operator_connections()

Get the list of OperatorConnection elements associated with this job.

Returns

List of OperatorConnection elements associated with this job.

Return type

list(OperatorConnection)

get_operators(name=None)

Get the list of Operator elements associated with this job.

Parameters

name (str) – Only return operators matching name, where name can be a regular expression. If name is not supplied, then all operators for this job are returned.

Returns

List of Operator elements associated with this job.

Return type

list(Operator)

Retrieving a list of operators whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> job = instances[0].get_jobs()[0]
>>> operators = job.get_operators(name=".*temperatureSensor*")

Changed in version 1.9: name parameter added.

get_pe_connections()

Get the list of PEConnection elements associated with this job.

Returns

List of PEConnection elements associated with this job.

Return type

list(PEConnection)

get_pes()

Get the list of PE elements associated with this job.

Returns

List of PE elements associated with this job.

Return type

list(PE)

get_resource_allocations()

Get the list of ResourceAllocation elements associated with this job.

Returns

List of ResourceAllocation elements associated with this job.

Return type

list(ResourceAllocation)

get_views(name=None)

Get the list of View elements associated with this job.

Parameters
  • name (str, optional) – Returns view(s) matching name. name can be a regular expression. If name

  • not supplied, then all views associated with this instance are returned. (is) –

Returns

List of views matching name.

Return type

list(streamsx.rest_primitives.View)

Retrieving a list of views that contain the string “temperatureSensor” could be performed as followed .. rubric:: Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> job = instances[0].get_jobs()[0]
>>> views = job.get_views(name = ".*temperatureSensor*")
refresh()

Refresh the resource and update the attributes to reflect the latest status.

retrieve_log_trace(filename=None, dir=None)

Retrieves the application log and trace files of the job and saves them as a compressed tar file.

An existing file with the same name will be overwritten.

Parameters
  • filename (str) – name of the created tar file. Defaults to job_<id>_<timestamp>.tar.gz where id is the job identifier and timestamp is the number of seconds since the Unix epoch, for example job_355_1511995995.tar.gz.

  • dir (str) – a valid directory in which to save the archive. Defaults to the current directory.

Returns

the path to the created tar file, or None if retrieving a job’s logs is not supported in the version of IBM Streams to which the job is submitted.

Return type

str

New in version 1.8.

update_operators(job_config)

Adjust a job configuration while the job is running

Parameters

{JobConfig} -- a job configuration overlay (job_config) –

Returns

[JSON] – The result of applying the new jobConfig?

class streamsx.rest_primitives.Metric(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

Streams custom or system metric.

name

Name of this metric.

Type

str

resourceType

Identifies the REST resource type, which is metric.

Type

str

description

Describes this metric.

Type

str

lastTimeRetrieved

Epoch time when the metric was most recently retrieved.

Type

str

metricKind

Kind of metric. Some possible values include counter, gauge, time and unknown.

Type

str

metricType

Type of metric. Some possible values include system, custom and unknown.

Type

str

value

Value for the metric.

Type

int

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> operators = instances[0].get_operators()
>>> metrics = operators[0].get_metrics()
>>> print (metrics[0].resourceType)
metric
refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.OperatorConnection(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

Connection between operators.

id

Unique ID of this operator connection within the instance.

Type

str

resourceType

Identifies the REST resource type, which is operator.

Type

str

required

Indicates whether the connection is required.

Type

bool

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> operatorconnections = instances[0].get_operator_connections()
>>> print (operatorconnections[0].resourceType)
operatorConnection
refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.OperatorInputPort(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

Operator input port.

name

Name of this input port.

Type

str

resourceType

Identifies the REST resource type, which is operatorInputPort.

Type

str

indexWithinOperator

Index of the input port within the operator.

Type

int

New in version 1.9.

get_connections()

Get the list of OperatorConnection elements associated with this port.

Returns

List of OperatorConnection elements associated with this port.

Return type

list(OperatorConnection)

New in version 1.13.

get_metrics(name=None)

Get metrics for this input port.

Parameters

name (str, optional) – Only return metrics matching name, where name can be a regular expression. If name is not supplied, then all metrics for this input port are returned.

Returns

List of matching metrics.

Return type

list(Metric)

Retrieving a list of metrics whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> operator = instances[0].get_operators()[0]
>>> input_port = operator.get_input_ports()[0]
>>> metrics = input_port.get_metrics(name='*temperatureSensor*')
refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.OperatorOutputPort(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

Operator output port.

name

Name of this output port.

Type

str

resourceType

Identifies the REST resource type, which is operatorOutputPort.

Type

str

indexWithinOperator

Index of the output port within the operator.

Type

int

streamName

Name of the stream that is associated with this output port.

Type

str

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> exportedstreams = instances[0].get_exported_streams()
>>> operatoroutputport = exportedstreams[0].get_operator_output_port()
>>> print (operatoroutputport.resourceType)
operatorOutputPort
get_connections()

Get the list of OperatorConnection elements associated with this port.

Returns

List of OperatorConnection elements associated with this port.

Return type

list(OperatorConnection)

New in version 1.13.

get_metrics(name=None)

Get metrics for this output port.

Parameters

name (str, optional) – Only return metrics matching name, where name can be a regular expression. If name is not supplied, then all metrics for this output port are returned.

Returns

List of matching metrics.

Return type

list(Metric)

Retrieving a list of metrics whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> exportedstreams = instances[0].get_exported_streams()
>>> operatoroutputport = exportedstreams[0].get_operator_output_port()
>>> operatoroutputport.get_metrics(name='*temperatureSensor*')

New in version 1.9.

refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.Operator(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

An operator invocation within a job.

name

Operator name.

Type

str

resourceType

Identifies the REST resource type, which is operator.

Type

str

operatorKind

SPL primitive operator type for this operator.

Type

str

indexWithinJob

Index of this operator within the job.

Type

int

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> operators = instances[0].get_operators()
>>> print (operators[0].resourceType)
operator
get_host()
Get resource this operator is currently executing in.

If the operator is running on an externally managed resource None is returned.

Returns

Resource this operator is running on.

Return type

Host

New in version 1.9.

get_input_ports()

Get list of input ports for this operator.

Returns

Input ports for this operator.

Return type

list(OperatorInputPort)

New in version 1.9.

get_job()

Get the Streams job that owns this operator.

Returns

Streams Job owning this operator.

Return type

Job

get_metrics(name=None)

Get metrics for this operator.

Parameters

name (str, optional) – Only return metrics matching name, where name can be a regular expression. If name is not supplied, then all metrics for this operator are returned.

Returns

List of matching metrics.

Return type

list(Metric)

Retrieving a list of metrics whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> operator = instances[0].get_operators()[0]
>>> metrics = op.get_metrics(name='*temperatureSensor*')
get_output_ports()

Get list of output ports for this operator.

Returns

Output ports for this operator.

Return type

list(OperatorOutputPort)

New in version 1.9.

get_pe()

Get the Streams processing element this operator is executing in.

Returns

Processing element for this operator.

Return type

PE

New in version 1.9.

refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.PEConnection(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

Stream connection between two PEs.

id

PE connection ID.

Type

str

resourceType

Identifies the REST resource type, which is peConnection.

Type

str

required

Indicates whether this connection is required.

Type

bool

status

Status of this connection. Some possible values include connected, disconnected, and unknown.

Type

str

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> peconnections = instances.get_pe_connections()
>>> print(peconnections[0].resourceType)
peConnection
refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.PE(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

Processing element (PE) within a job. A processing element hosts one or more operators within a single job.

id

PE ID.

Type

str

resourceType

Identifies the REST resource type, which is pe.

Type

str

health

Health indicator for this PE. Some possible values include healthy, partiallyHealthy, partiallyUnhealthy, unhealthy, and unknown.

Type

str

indexWithinJob

Index of the PE within the job.

Type

int

launchCount

Number of times this PE was started manually or automatically because of failures.

Type

int

optionalConnections

Status of optional connections for this PE. Some possible values include connected, disconnected, partiallyConnected, and unknown.

Type

str

pendingTracingLevel

Describes a pending change to the granularity of the trace information that is stored for this PE. Some possible values include off, error, debug and trace. The value is None, if no change is pending.

Type

str

processId

Operating system process ID for this PE.

Type

str

relocatable

Indicates whether this PE can be relocated to a different resource.

Type

bool

requiredConnections

Status of the required connections for this PE. Some possible values include connected, disconnected, partiallyConnected, and unknown.

Type

str

restartable

Indicates whether this PE can be restarted.

Type

bool

status

Status of this PE.

Type

str

statusReason

Additional information for the status of this PE.

Type

str

tracingLevel

Granularity of the trace information. Some possible values include off, error, debug and trace.

Type

str

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> pes = instances.get_pes()
>>> print(pes[0].resourceType)
pe
get_host()
Get resource this processing element is currently executing in.

If the processing element is running on an externally managed resource None is returned.

Returns

Resource this processing element is running on.

Return type

Host

New in version 1.9.

get_job()

Get the Streams job that owns this PE.

Returns

Streams Job owning this PE.

Return type

Job

get_metrics(name=None)

Get metrics for this PE.

Parameters

name (str, optional) – Only return metrics matching name, where name can be a regular expression. If name is not supplied, then all metrics for this PE are returned.

Returns

List of matching metrics.

Return type

list(Metric)

Retrieving a list of metrics whose name contains the string “temperatureSensor” could be performed as followed .. rubric:: Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> pe = instances.get_pes()[0]
>>> metrics = pe.get_metrics(name='*temperatureSensor*')

New in version 1.9.

get_resource()

Get resource this processing element is currently executing in.

Returns

Resource this processing element is running on.

Return type

Host

New in version 1.13.13.

get_resource_allocation()

Get the ResourceAllocation element tance.

Returns

Resource allocation used to access information about the resource where this PE is running.

Return type

ResourceAllocation

New in version 1.9.

refresh()

Refresh the resource and update the attributes to reflect the latest status.

retrieve_console_log(filename=None, dir=None)

Retrieves the application console log (standard out and error) files for this PE and saves them as a plain text file.

An existing file with the same name will be overwritten.

Parameters
  • filename (str) – name of the created file. Defaults to pe_<id>_<timestamp>.stdouterr where id is the PE identifier and timestamp is the number of seconds since the Unix epoch, for example pe_83_1511995995.trace.

  • dir (str) – a valid directory in which to save the file. Defaults to the current directory.

Returns

the path to the created file, or None if retrieving a job’s logs is not supported in the version of streams to which the job is submitted.

Return type

str

New in version 1.9.

retrieve_trace(filename=None, dir=None)

Retrieves the application trace files for this PE and saves them as a plain text file.

An existing file with the same name will be overwritten.

Parameters
  • filename (str) – name of the created file. Defaults to pe_<id>_<timestamp>.trace where id is the PE identifier and timestamp is the number of seconds since the Unix epoch, for example pe_83_1511995995.trace.

  • dir (str) – a valid directory in which to save the file. Defaults to the current directory.

Returns

the path to the created file, or None if retrieving a job’s logs is not supported in the version of streams to which the job is submitted.

Return type

str

New in version 1.9.

class streamsx.rest_primitives.PublishedTopic(topic, schema)

Bases: object

Metadata for a published topic.

topic

Published topic

Type

str

schema

Schema of topic

Type

str

class streamsx.rest_primitives.ResourceAllocation(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

A resource that is allocated to an IBM Streams instance.

resourceType

Identifies the REST resource type, which is resourceAllocation.

Type

str

applicationResource

Indicates whether this resource is an application resource, which is used to run streams processing applications.

Type

bool

schedulerStatus

Indicates whether this resource is schedulable for the instance.

Type

str

status

Status of this resource for the instance.

Type

str

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> allocations = instances.get_resource_allocations()
>>> print(allocations[0].resourceType)
resourceAllocation
get_jobs(name=None)

Retrieves jobs running on this resource in its instance.

Parameters

name (str, optional) – Only return jobs containing property name that matches name. name can be a regular expression. If name is not supplied, then all jobs are returned.

Returns

A list of jobs matching the given name.

Return type

list(Job)

Note

If applicationResource is False an empty list is returned.

New in version 1.9.

get_pes()

Get the list of PE running on this resource in its instance.

Returns

List of PE running on this resource.

Return type

list(PE)

Note

If applicationResource is False an empty list is returned.

New in version 1.9.

get_resource()

Get the Resource of the resource allocation.

Returns

Resource for this allocation.

Return type

Resource

New in version 1.9.

refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.Resource(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

A resource available to a IBM Streams domain.

id

Resource identifier.

Type

str

displayName

Resource display name.

Type

str

ipAddress

IP address.

Type

str

status

Resource status.

Type

str

tags

Tags assigned to resource.

Type

list[str]

New in version 1.9.

get_metrics(name=None)

Get metrics for this resource.

Parameters

name (str, optional) – Only return metrics matching name, where name can be a regular expression. If name is not supplied, then all metrics for this resource are returned.

Returns

List of matching metrics.

Return type

list(Metric)

refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.ResourceTag(json_resource_tag)

Bases: object

Resource tag defined in a Streams domain

definition_format_properties

Indicates whether the resource definition consists of one or more properties.

Type

bool

description

Tag description.

Type

str

name

Tag name.

Type

str

properties_definition

Contains the properties of the resource definition. Only present if definition_format_properties is True.

Type

list(str)

reserved

If True, this tag is defined by IBM Streams, and cannot be modified.

Type

bool

class streamsx.rest_primitives.RestResource(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

HTTP REST resource identifier.

name

Resource name.

Type

str

resource

A string that identifies the URI for the resource.

Type

str

Changed in version 1.9: Changed to RestResource from Resource.

get_resource()

Make a request against this REST resource. :returns: JSON response. :rtype: dict

refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.StreamingAnalyticsService(rest_client, credentials)

Bases: object

Streaming Analytics service running on IBM Cloud.

cancel_job(job_id=None, job_name=None)

Cancel a running job.

Parameters
  • job_id (str, optional) – Identifier of job to be canceled.

  • job_name (str, optional) – Name of job to be canceled.

Returns

JSON response for the job cancel operation.

Return type

dict

get_instance_status()

Get the status the instance for this Streaming Analytics service.

Returns

JSON response for the instance status operation.

Return type

dict

start_instance()

Start the instance for this Streaming Analytics service.

Returns

JSON response for the instance start operation.

Return type

dict

stop_instance()

Stop the instance for this Streaming Analytics service.

Returns

JSON response for the instance start operation.

Return type

dict

submit_job(bundle, job_config=None)

Submit a Streams Application Bundle (sab file) to this Streaming Analytics service.

Parameters
  • bundle (str) – path to a Streams application bundle (sab file) containing the application to be submitted

  • job_config (JobConfig) – a job configuration overlay

Returns

JSON response from service containing ‘name’ field with unique

job name assigned to submitted job, or, ‘error_status’ and ‘description’ fields if submission was unsuccessful.

Return type

dict

class streamsx.rest_primitives.Toolkit(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

IBM Streams toolkit.

id

Unique ID for this instance.

Type

str

resourceType

Identifies the REST resource type, which is toolkit.

Type

str

name

The name of the toolkit.

Type

str

version

The version of the toolkit.

Type

str

requiredProductVersion

The earliest version of Streams required by the toolkit.

Type

str

path

The full path to the toolkit.

Type

str

Example

>>> from streamsx.build import BuildService
>>> build_service = BuildService.of_endpoint()
>>> toolkits = build_service.get_toolkits()
>>> print (toolkits[0].resourceType)
toolkit

New in version 1.13.

class Dependency(name, version)

Bases: object

The name, and range of versions, of a toolkit required by another toolkit.

name

the name of the required toolkit

Type

str

version

the range of versions required of the toolkit

Type

str

property dependencies

Find all the dependencies for this toolkit.

Returns

List of dependencies of this toolkit. If this toolkit does not have any dependencies, this will be an empty list.

Return type

list(Dependency)

refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.ViewItem(json_rep, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

A stream tuple in view.

collectionTime

Epoch time when this viewItem is collected from the stream.

Type

long

data

Content of this viewItem.

Type

dict

resourceType

Identifies the REST resource type, which is viewItem.

Type

str

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> views = instances[0].get_views()
>>> viewitems = views[0].get_view_items()
>>> print (viewitems[0].resourceType)
viewItem
refresh()

Refresh the resource and update the attributes to reflect the latest status.

class streamsx.rest_primitives.View(json_view, rest_client)

Bases: streamsx.rest_primitives._ResourceElement

View on a stream.

id

An unique identifier for the view.

Type

str

name

View name.

Type

str

description

Description of the view.

Type

str

resourceType

Identifies the REST resource type, which is view.

Type

str

activateOption

Indicate when the view starts buffering data.

Type

str

maximumTupleRate

Maximum Number of tuples at which the view collects per second.

Type

int

logicalOperatorName

The logical name of the operator that contains the output port on which the view is created.

Type

str

bufferCapacitySeconds

Buffer size measured in seconds.

Type

int

bufferCapacityTuples

Buffer size measured in number of tuples.

Type

int

bufferCapacityUnits

Indicates whether the buffer capacity for the view is determined by seconds, tuples or unknown.

Type

str

Example

>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection()
>>> instances = sc.get_instances()
>>> views = instances[0].get_views()
>>> print (views[0].resourceType)
view
display(duration=None, period=2)

Display a view within a Jupyter or IPython notebook.

Provides an easy mechanism to visualize data on a stream using a view.

Tuples are fetched from the view and displayed in a table within the notebook cell using a pandas.DataFrame. The table is continually updated with the latest tuples from the view.

This method calls start_data_fetch() and will call stop_data_fetch() when completed if duration is set.

Parameters
  • duration (float) – Number of seconds to fetch and display tuples. If None then the display will be updated until stop_data_fetch() is called.

  • period (float) – Maximum update period.

Note

A view is a sampling of data on a stream so tuples that are on the stream may not appear in the view.

Note

Python modules ipywidgets and pandas must be installed in the notebook environment.

Warning

Behavior when called outside a notebook is undefined.

New in version 1.12.

fetch_tuples(max_tuples=20, timeout=None)

Fetch a number of tuples from this view.

Fetching of data must have been started with start_data_fetch() before calling this method.

If timeout is None then the returned list will contain max_tuples tuples. Otherwise if the timeout is reached the list may contain less than max_tuples tuples.

Parameters
  • max_tuples (int) – Maximum number of tuples to fetch.

  • timeout (float) – Maximum time to wait for max_tuples tuples.

Returns

List of fetched tuples.

Return type

list

New in version 1.12.

get_domain()

Get the Streams domain for the instance that owns this view.

Returns

Streams domain for the instance owning this view.

Return type

Domain

get_instance()

Get the Streams instance that owns this view.

Returns

Streams instance owning this view.

Return type

Instance

get_job()

Get the Streams job that owns this view.

Returns

Streams Job owning this view.

Return type

Job

get_view_items()

Get a list of ViewItem elements associated with this view.

Returns

List of ViewItem(s) associated with this view.

Return type

list(ViewItem)

refresh()

Refresh the resource and update the attributes to reflect the latest status.

start_data_fetch()

Starts a thread that fetches data from the Streams view server.

Each item in the returned Queue represents a single tuple on the stream the view is attached to.

Returns

Queue containing view data.

Return type

queue.Queue

Note

This is a queue of the tuples coverted to Python objects, it is not a queue of ViewItem objects.

stop_data_fetch()

Stops the thread that fetches data from the Streams view server.