streamsx.service¶
Streams Jobs as a Cloud Pak for Data Service
Overview¶
A streams-application service can be used to insert data into and retrieve data from a Streams job.
When adding one or more callables using EndpointSource
or EndpointSink
to your topology application and submitting the application to run as a job, a streams-application service is created.
Exchanging data with the job is done by using a REST API.
The streams-application service instances are included in the Services > Instances page of the Cloud Pak for Data web client. Selecting a service entry in the list opens the REST API documentation for the service.
See also
- Streams Jobs as a Service
Resources for Streams developers in the IBM Community.
Module contents¶
Classes
Creates a service endpoint to consume data from that stream via REST. |
|
Creates a service endpoint that produces a stream of data received via REST. |
-
class
streamsx.service.
EndpointSink
(buffer_size=None, consuming_reads=None, service_documentation=None, endpoint_documentation=None)¶ Bases:
streamsx.topology.composite.ForEach
Creates a service endpoint to consume data from that stream via REST.
With this sink the Streams job is enabled as a Cloud Pak for Data service and emits job data using REST API.
Use an instance of this class in
for_each()
and terminate a stream:from streamsx.service import EndpointSink stream.for_each(EndpointSink())
Simple example without service or endpoint documentation:
from streamsx.topology.topology import Topology from streamsx.topology.context import submit, ContextTypes, ConfigParams, JobConfig from streamsx.service import EndpointSink from typing import Iterable, NamedTuple import itertools, random class SampleSourceSchema(NamedTuple): id: str num: int # Callable of the Source class SampleSource(object): def __call__(self) -> Iterable[SampleSourceSchema]: for num in itertools.count(1): output_event = SampleSourceSchema( id = str(num), num = random.randint(0,num) ) yield output_event topo = Topology('endpoint_sink_sample') stream1 = topo.source(SampleSource()) stream1.for_each(EndpointSink(buffer_size=50000))
Example with service and endpoint documentation:
service_documentation={ 'title': 'streamsx-sample-endpoint-sink', 'description': 'NUMBER GENERATOR', 'version': '1.0.0', 'externalDocsUrl': 'https://mycompany.com/numgen/doc', 'externalDocsDescription': 'Number generator documentation' } tags = dict() tag1 = { 'Output': { 'description': 'Output tag description', 'externalDocs': { 'url': 'https://mycompany.com/numgen/input/doc', 'description': 'Output tag external doc description' } } } tags.update(tag1) service_documentation['tags'] = tags endpoint_documentation = dict() endpoint_documentation['summary'] = 'Sample endpoint sink' endpoint_documentation['tags'] = ['Output'] endpoint_documentation['description'] = 'Streams job emits some data with random numbers' doc_attr = dict() descr = {'id': {'description': 'IDENTIFIER (incremented by one per tuple)'}} doc_attr.update(descr) descr = {'num': {'description': 'RANDOM NUMBER'}} doc_attr.update(descr) endpoint_documentation['attributeDescriptions'] = doc_attr stream1 = topo.source(SampleSource()) stream1.for_each(EndpointSink( buffer_size=50000, service_documentation=service_documentation, endpoint_documentation=endpoint_documentation), name='cpd_endpoint_sink')
New in version 1.18.
-
buffer_size
¶ Size of the buffer. If the buffer capacity is reached, older tuples are removed to make room for the newer tuples. A warning is returned on an API request if the requested start time is before the oldest tuple in the buffer. The default buffer size is 1000.
- Type
int
-
consuming_reads
¶ Indicates whether tuples should be removed from the endpoint buffer after they have been retuned on a REST API call. The default value is false.
- Type
boolean
-
service_documentation
¶ Content to describe the service. This is set once per application only. Apply a
dict
containing one or more of the keys: ‘title, ‘version’, ‘description’, ‘externalDocsUrl’, ‘externalDocsDescription’, ‘tags’:d = { 'title': <string value>, 'version': <string value>, 'description': <string value>, 'externalDocsUrl': <string value>, 'externalDocsDescription': <string value>, 'tags': {<key> {'description': <string value>, 'externalDocs': {'url': <string value>, 'description': <string value>}}, ...} }
- Type
dict
-
endpoint_documentation
¶ Additional content to be included for an API endpoint to describe the endpoint sink. Apply a
dict
containing one or more of the keys: ‘summary, ‘tags’, ‘description’, ‘attributeDescriptions’:d = { 'summary': <string value>, 'tags': <array of strings>, 'description': <string value>, 'attributeDescriptions': {<key> {'description': <string value>}, ...} }
- Type
dict
-
Returns
¶ topology_ref:streamsx.topology.topology.Sink
: Stream termination.
-
populate
(topology, stream, name, **options)¶ Populate the topology with this composite for each transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:sink = input_stream.for_each(myForEachComposite)
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
name – Name passed into
for_each
.**options – Future options passed to
for_each
.
- Returns
Termination for this composite transformation of stream.
- Return type
-
-
class
streamsx.service.
EndpointSource
(schema, buffer_size=None, endpoint_documentation=None, service_documentation=None)¶ Bases:
streamsx.topology.composite.Source
Creates a service endpoint that produces a stream of data received via REST.
With this source the Streams job is enabled as a Cloud Pak for Data service and retrieves job data using REST API.
Use an instance of this class in
source()
and create a stream:from streamsx.service import EndpointSource topo.source(EndpointSource())
Example endpoint that receives data in JSON format:
from streamsx.service import EndpointSource s = topo.source(EndpointSource(schema=CommonSchema.Json), name='cpd_endpoint_src_json')
Example with structured Stream schema, service and endpoint documentation:
from streamsx.topology.topology import Topology from streamsx.service import EndpointSource topo = Topology('endpoint_source_sample') service_documentation={ 'title': 'streamsx-sample-endpoint-source', 'description': 'Streams job as service receives data', 'version': '1.0.0' } endpoint_documentation = { 'summary': 'Sample endpoint source', 'description': 'CPD job endpoint injects some data' } schema = 'tuple<rstring id, int64 number>' s = topo.source(EndpointSource( schema=schema, buffer_size=200000, service_documentation=service_documentation, endpoint_documentation=endpoint_documentation), name='cpd_endpoint_src')
New in version 1.18.
-
schema
¶ Schema of the source stream.
- Type
-
buffer_size
¶ Size of the buffer
- Type
int
-
service_documentation
¶ Content to describe the service. This is set once per application only. Apply a
dict
containing one or more of the keys: ‘title, ‘version’, ‘description’, ‘externalDocsUrl’, ‘externalDocsDescription’, ‘tags’:d = { 'title': <string value>, 'version': <string value>, 'description': <string value>, 'externalDocsUrl': <string value>, 'externalDocsDescription': <string value>, 'tags': {<key> {'description': <string value>, 'externalDocs': {'url': <string value>, 'description': <string value>}}, ...} }
- Type
dict
-
endpoint_documentation
¶ Additional content to be included for an API endpoint to describe the endpoint source. Apply a
dict
containing one or more of the keys: ‘summary, ‘tags’, ‘description’, ‘attributeDescriptions’:d = { 'summary': <string value>, 'tags': <array of strings>, 'description': <string value>, 'attributeDescriptions': {<key> {'description': <string value>}, ...} }
- Type
dict
-
Returns
¶ Stream.
-
populate
(topology, name, **options)¶ Populate the topology with this composite source. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:topo = Topology() source_stream = topo.source(mySourceComposite)
- Parameters
topology – Topology containing the source.
name – Name passed into
source
.**options – Future options passed to
source
.
- Returns
Single stream representing the source.
- Return type
-