The Python REST API
Edit meDepending on the problem at hand, a developer might choose to create an IBM Streams application in a particular programming language. To this end, the ‘streamsx.topology’ project supports APIs in Java, Scala, Python, and IBM Streams Processing Language (SPL). Regardless of the language used to develop and submit the application, however, it becomes necessary to monitor the application while it is running. By monitoring the application, you can observe runtime information regarding the application or its environment, for example:
- Whether a job is running, stopped, or even exists.
- Whether the instance that contains a job is running or stopped. In other words, are additional jobs even able to be submitted?
- Metrics of a stream, such as flow rate or congestion.
- Individual tuples on a stream, potentially for purposes of data visualization.
This information is exposed through the Python REST API in the streamsx.rest module. Furthermore, the REST API is not strictly read-only, as you can also use it to cancel remote jobs. This guide walks through some of the most common use cases for the API, and also aims to give users a more general understanding for types of applications that can be written.
Connecting with Streams in IBM Cloud Pak for Data (Streams 5)
External connection to Cloud Pak for Data
Connect to an IBM Streams service instance running in Cloud Pak for Data from a Python script running outside Cloud Pak for Data.
of_endpoint()
is the entry point to using the Streams REST API bindings, returning an Instance.
Sample code to connect and collect all metrics of all running jobs:
try:
from streamsx.rest_primitives import Instance
except ImportError:
# streamsx < 1.15.1
from streamsx.rest import Instance
# Streams instance object
instance = Instance.of_endpoint(verify=False)
# Get list of all running jobs in the Streams instance
job_list = instance.get_jobs()
for j in job_list:
job_id = j.id
job_name = j.name
print("JobId: "+job_id + " Name: "+ job_name)
# Loop over each operator of the job
op_list = j.get_operators()
for op in op_list:
print("Operator name:" + op.name + " kind: " + op.operatorKind)
# List all metrics of the operator
m = op.get_metrics()
if len(m) > 0:
print("Metric "+m[0].name + ": "+str(m[0].value))
Reference
Integrated configuration within project
Connect to an IBM Streams service instance running in Cloud Pak for Data inside a notebook running in Cloud Pak for Data.
of_service()
is the entry point to using the Streams REST API bindings, returning an Instance.
Sample code to retrieve the Instance object:
try:
from streamsx.rest_primitives import Instance
except ImportError:
# streamsx < 1.15.1
from streamsx.rest import Instance
# Select the service instance
from icpd_core import ipcd_util
cfg = icpd_util.get_service_details(name='instanceName', instance_type='streams')
# Disable SSL certificate verification if necessary
cfg[context.ConfigParams.SSL_VERIFY] = False
# Streams instance object
instance = Instance.of_service(cfg)
The sample notebook demonstrates how to connect and retrieve operator metrics from a Streams job when job and notebook are running in running in Cloud Pak for Data: Streams-EventStoreSample.ipynb
Reference
Connecting with Streams (4.2, 4.3)
Connecting with the StreamsConnection
class - IBM Streams On-premises (4.2, 4.3)
The primary abstraction in the Python REST API is the StreamsConnection
class. Every application that seeks to use the REST API must first create an instance of this class.
The StreamsConnection
instance retrieves runtime information about your application via HTTP, so to connect to a Streams installation, you need to provide the REST API URL. You can get the URL from your Streams installation by running
streamtool geturl --api
Which will print out a url: https://10.51.4.141:8443/streams/rest/resources
Use that url and a valid Streams user name and password to create an instance of the StreamsConnection
class.
from streamsx import rest
sc = rest.StreamsConnection(username="streamsadmin", password="passw0rd", resource_url=url)
sc.session.verify = False
If the URL is omitted, the StreamsConnection
instance connects to a locaL Streams installation.
By default, SSL authentication is enabled. To disable it, enter sc.session.verify = False
immediately after you create your StreamsConnection
instance.
Connecting to the IBM Streaming Analytics service on IBM Cloud
In this case, the first step is to instantiate a subclass of StreamsConnection
called StreamingAnalyticsConnection
. Instead of a user name and password, the constructor arguments include the path to a vcap
file and the name of the Streaming Analytics service:
from streamsx import rest
sc = rest.StreamingAnalyticsConnection("/home/streamsadmin/vcap.json", "Streaming Analytics-be")
sc.session.verify = False
Retrieving resources elements
The StreamsConnection
object represents the root in a tree of resource elements, where each node in the tree is a resource that can be queried to retrieve its state. If you look at the methods exposed by the StreamsConnection
object, you see several methods related to obtaining a resource element:
sc.get_instance()
sc.get_instances()
sc.get_resources()
sc.get_domain()
sc.get_domains()
sc.get_installations()
Each of these methods, when invoked, retrieves up-to-date information about a resource in the form of a Python object. For example, an IBM Streams instance is the container in which jobs are executed. The get_instance
method retrieves the resource element that contains current information about an instance, including the instance’s owner, status, and the time it was started.
>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection("/home/streamsadmin/vcap.json", "Streaming Analytics-be")
>>> instance = sc.get_instance(id="MyStreamsInstance")
>>> print(instance.owner, instance.status, instance.startTime)
streamsdomainowner running 1492194564662
The instance.status
field reflects whether the instance is running. An instance whose status is stopped
is not currently able to run jobs. The following code is an example of all information that can be obtained from the instance
resource.
>>> from streamsx import rest
>>> sc = rest.StreamingAnalyticsConnection("/home/streamsadmin/vcap.json", "Streaming Analytics-be")
>>> instance = sc.get_instances()[0]
>>> print(instance.json_rep)
'activeServices': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/activeservices',
'activeVersion': {'architecture': 'x86_64',
'buildVersion': '20170323145113',
'editionName': 'IBM Streams',
'fullProductVersion': '4.2.1.0',
'minimumOSBaseVersion': '6',
'minimumOSPatchVersion': '6',
'minimumOSVersion': 'Red Hat Enterprise Linux '
'Server release 6.6 '
'(Santiago)',
'productName': 'IBM Streams',
'productVersion': '4.2.1.0'},
'activeViews': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/activeviews',
'configuredViews': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/configuredviews',
'creationTime': 1485540669058,
'creationUser': 'streamsdomainowner',
'domain': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/domains/standard1',
'exportedStreams': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/exportedstreams',
'health': 'healthy',
'hosts': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/hosts',
'id': 'dd5f603a-7fb1-4e9b-861c-e15542e2d423',
'importedStreams': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/importedstreams',
'jobs': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/jobs',
'operatorConnections': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/operatorconnections',
'operators': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/operators',
'owner': 'streamsdomainowner',
'peConnections': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/peconnections',
'pes': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/pes',
'resourceAllocations': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/resourceallocations',
'resourceType': 'instance',
'restid': 'dd5f603a-7fb1-4e9b-861c-e15542e2d423',
'self': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423',
'startTime': 1492194564662,
'startedBy': 'streamsdomainowner',
'status': 'running',
'views': 'https://streams-console-c6d1.ng.bluemix.net/streams/rest/instances/dd5f603a-7fb1-4e9b-861c-e15542e2d423/views'
You can find a complete reference for the types of resources and their fields in the IBM Streams documentation in IBM Knowledge Center.
Resource elements are arranged in a hierarchy. You can understand the child elements of theinstance
resource by inspecting its methods:
>>> instance.
inst.get_exported_streams( inst.get_resource_allocations(
inst.get_hosts( inst.get_views(
inst.get_imported_streams(
inst.get_job( inst.refresh(
inst.get_jobs(
inst.get_operator_connections(
inst.get_operators(
inst.get_pe_connections(
inst.get_active_services( inst.get_pes(
inst.get_domain( inst.get_published_topics(
The presence of the get_jobs
method indicates that the job
resource is a child of the instance
resource. Furthermore, the operator
resource is a child of the job
resource. The following script finds the names of all operators that are associated with an instance:
>>> from streamsx import rest
>>> sc = rest.StreamsConnection(username='streamsadmin', password='passw0rd')
>>> instance = sc.get_instances()[0]
>>> for job in instance.get_jobs():
... for operator in instance.get_operators():
... print(operator.name)
...
identity
list_2
neural_net_model
Op.PublishTopic.TopicProperties
periodicSource
print_flush
Canceling jobs
The Python REST API is not strictly read-only; you can also use the REST API to cancel jobs. This functionality is exposed through the job.cancel()
method. A user who wants to cancel a job can do so as follows:
>>> from streamsx import rest
>>> sc = rest.StreamsConnection(username='streamsadmin', password='passw0rd')
>>> instance = sc.get_instances()[0]
>>> job = instance.get_jobs()[0]
>>> if job.cancel():
... print("The job was successfully canceled.")
... else:
... print("Error canceling job.")
...
The job was successfully canceled.
Canceling remote jobs has the benefit of freeing up resources.
Accessing the tuples of a view
Streaming applications process unbounded amounts of data in real time. Naturally, users might want to have access to the data stream for purposes of visualization or additional monitoring. To this end, you can use the Python REST API to retrieve the tuples of any stream that is created with a view. In the Python Application API, a view is created on a stream by calling the view
method:
>>> from streamsx.topology import Topology
>>> top = Topology("myApplication")
>>> strm = top.source(["Hello", "world!"])
>>> strm.view(name="myView")
The last line of code, strm.view()
, marks the strm
stream as viewable, meaning that you can access its tuples with the Python REST API. When the application is submitted, a view
resource element is created as a child of job
and instance
. To find all views associated with an instance, you can call the instance.get_views()
method:
>>> from streamsx import rest
>>> sc = rest.StreamsConnection(username='streamsadmin', password='passw0rd')
>>> instance = sc.get_instances()[0]
>>> views = instance.get_views()
>>> print(len(views))
2
Or, if you want to retrieve all views associated with a job, you can call the jobs.get_views()
method:
>>> from streamsx import rest
>>> sc = rest.StreamsConnection(username='streamsadmin', password='passw0rd')
>>> instance = sc.get_instances()[0]
>>> job = instance.get_jobs()[0]
>>> views = job.get_views()
>>> print(len(views))
1
Each view can be created with a name. In the previous example, you created a view called myView
. You can locate the myView
view by iterating through the list of views and checking whether the name matches:
>>> myView = None
>>> for view in views:
... if view.name == "myView":
... myView = view
...
>>> if myView:
... print("Successfully located myView")
... else:
... print("Could not locate myView")
Successfully located myView
After the correct view is located, its data is obtained by calling the view.start_data_fetch()
method. The view.start_data_fetch()
method returns a queue whose contents are updated to reflect the contents of the remote stream. The queue is continuously populated by a background thread until the view.stop_data_fetch()
method is invoked. The following example shows how this works in practice:
>>> queue = view.start_data_fetch()
>>> try:
... for item in iter(queue.get, None):
... print(item)
... except KeyboardInterrupt:
... view.stop_data_fetch()
Hello
world!
^C
>>>
Going line by line, queue = view.start_data_fetch()
begins fetching stream data from the remote view to populate the created queue object
. Next, for item in iter(queue.get, None)
creates an iterator that uses the queue, and iterates over its values and prints them to the screen with print(item)
. For the sake of this example, data is consumed until the user sends an interrupt with Control-C, although the user is free to decide when and how data stops being consumed. Lastly, when the user sends an interrupt, view.stop_data_fetch()
is invoked, which terminates the background thread, and data ceases to be retrieved from the remote view.
The ability to obtain live stream data from a running job has proved valuable for real-time data visualization. For example, the stream might send temperature readings from an engine to be analyzed by a mechanic. High temperature readings can be a signal to limit the engine’s maximum RPMs. Jupyter notebooks provide an ideal platform for performing this kind of visualization.