streamsx.topology.schema¶
Schemas for streams.
Overview¶
A stream represents an unbounded flow of tuples with a declared schema so that each tuple on the stream complies with the schema. A stream’s schema may be one of:
Structured schemas¶
A structured schema is a sequence of attributes, and an attribute is a named value of a specific type. For example a stream of sensor readings can be represented as a schema with three attributes sensor_id, ts and reading with types of int64, int64 and float64 respectively.
This schema can be declared a number of ways:
Python 3.6:
class SensorReading(typing.NamedTuple):
    sensor_id: int
    ts: int
    reading: float
sensors = raw_readings.map(parse_sensor, schema=SensorReading)
Python 3:
sensors = raw_readings.map(parse_sensor,
    schema='tuple<int64 sensor_id, int64 ts, float64 reading>')
The supported types are defined by IBM Streams and are listed in StreamSchema.
Nested structured schemas¶
A structured schema can contain nested structures that are defined separately.
Python 3.6:
class Sensor(typing.NamedTuple):
    manufacturer: str
    sensor_id: int
class SensorReading(typing.NamedTuple):
    sensor: Sensor
    ts: int
    reading: float
sensors = raw_readings.map(parse_sensor, schema=SensorReading)
Python 3:
sensors = raw_readings.map(parse_sensor,
    schema='tuple<tuple<rstring manufacturer, int64 sensor_id> sensor, int64 ts, float64 reading>')
Both schema definitions are equivalent.
Structured schemas provide type-safety and efficient network serialization when compared to passing a dict using Python streams.
Streams with structured schemas can be interchanged with any IBM Streams application using publish() and subscribe() maintaining type safety.
Defining a stream’s schema¶
Every stream within a Topology has defined schema. The schema may be defined explictly (for example map() or subscribe()) or implicity (for example filter() produces a stream with the same schema as its input stream).
Explictly defining a stream’s schema is flexible and various types of values are accepted as the schema.
Builtin types as aliases for common schema types:
Values of the enumeration
CommonSchema
An instance of
typing.NamedTuple(Python 3)
An instance of
StreamSchema
A string of the format
tuple<...>defining the attribute names and types. SeeStreamSchemafor details on the format and types supported.
A string containing a namespace qualified SPL stream type (e.g.
com.ibm.streams.geospatial::FlightPathEncounterTypes.Observation3D)
Module contents¶
Functions
| Is schema an common schema. | 
Classes
| Common stream schemas for interoperability within Streams applications. | |
| Defines a schema for a structured stream. | 
- 
streamsx.topology.schema.is_common(schema)¶
- Is schema an common schema. - Parameters
- schema – Scheme to test. 
- Returns
- Trueif schema is a common schema, otherwise- False.
- Return type
- bool 
 
- 
class streamsx.topology.schema.StreamSchema(schema)¶
- Bases: - object- Defines a schema for a structured stream. - On a structured stream a tuple is a sequence of attributes, and an attribute is a named value of a specific type. - The supported types are defined by IBM Streams and include such types as int8, int16, rstring and list<float32>. - A structured schema can be defined using a - typing.NamedTuplein Python 3, a string with the syntax- tuple<type name [,...]>or an instance of this class.- typing.NamedTuple: - A - typing.NamedTuplecan be used to define a structured schema with the field names and types mapping to the structured schema attribute names and types.- Python types are mapped to IBM Streams types as follows: - Python type - IBM Streams type - str- rstring- bool- boolean- int- int64- float- float64- decimal.Decimal- decimal128- complex- complex64- bytes- blob- streamsx.spl.types.Timestamp- timestamp- datetime.datetime- timestamp- typing.List[T]- list<T>- typing.Set[T]- set<T>- typing.Mapping[K,V]- map<K,V>- typing.Optional[T]- optional<T>- Note - Tuples on a stream with a schema defined by a - typing.NamedTupleinstance are passed into callables as instance of a named tuple with the the correct field names and types unless the named tuple contains nested named tuples at any nesting depth. When passed as named tuple, there is no guarantee to be the same class instance as the one used to declare the schema.- Tuple string: - A string of the format tuple<type name [,…]> can be used to define a structured schema, where type is an IBM Streams type. - Example: - tuple<rstring id, timestamp ts, float64 value> - represents a schema with three attributes suitable for a sensor reading. - IBM Streams types: - Type - Description - Python representation - Conversion from Python - boolean- True or False - bool- bool(value)- int8- 8-bit signed integer - int- int(value)truncated to 8 bits- int16- 16-bit signed integer - int- int(value)truncated to 16 bits- int32- 32-bit signed integer - int- int(value)truncated to 32 bits- int64- 64-bit signed integer - int- int(value)- uint8- 8-bit unsigned integer - int- uint16- 16-bit unsigned integer - int- uint32- 32-bit unsigned integer - int- uint64- 64-bit unsigned integer - int- float32- 32-bit binary floating point - float- float(value)truncated to 32 bits- float64- 64-bit binary floating point - float- float(value)- decimal32- 32-bit decimal floating point - decimal.Decimal- decimal.Decimal(value)normalized to IEEE 754 decimal32- decimal64- 64-bit decimal floating point - decimal.Decimal- decimal.Decimal(value)normalized to IEEE 754 decimal64- decimal128- 128-bit decimal floating point - decimal.Decimal- decimal.Decimal(value)normalized to IEEE 754 decimal128- complex32- complex with float32 values - complex- complex(value)with real and imaginary values truncated to 32 bits- complex64- complex with float64 values - complex- complex(value)- timestamp- Nanosecond timestamp - rstring- UTF-8 string - str- str(value)- rstring[N]- Bounded UTF-8 string - str- str(value)- ustring- UTF-16 string - str- str(value)- blob- Sequence of bytes - memoryview- list<T>- List with elements of type T - list- list<T>[N]- Bounded list - list- set<T>- Set with elements of type T - set- set<T>[N]- Bounded set - set- map<K,V>- Map with typed keys and values - dict- map<K,V>[N]- Bounded map, limted to N pairs - dict- optional<T>- Optional value of type T - Value of type T, or None - Value of for type - T- enum{id [,...]}- Enumeration - Not supported - Not supported - xml- XML value - Not supported - Not supported - tuple<type name [, ...]>- Nested tuple - dict- dict- Note - Type optional<T> requires IBM Streams 4.3 or later. - Note - Conversion to or from Python: - Type set<T> is restricted to primitive types 
- Type map<K,V> is restricted to primitive types for the key type K 
 - Python representation is how an attribute value in a structured schema is passed into a Python function. - Conversion from Python indicates how a value from Python is converted to an attribute value in a structured schema. For example a value - vassigned to- float64attribute is converted as though- float(v)is called first, thus- vmay be a- float,- intor any type that has a- __float__method.- When a type is not supported in Python it can only be used in a schema used for streams produced and consumed by invocation of SPL operators. - A StreamSchema can be created by passing a string of the form - tuple<...>or by passing the name of an SPL type from an SPL toolkit, for example- com.ibm.streamsx.transportation.vehicle::VehicleLocation.- Attribute names must start with an ASCII letter or underscore, followed by ASCII letters, digits, or underscores. - When a tuple on a structured stream is passed into a Python callable it is converted to a - dict,- tupleor named tuple object containing all attributes of the stream tuple. See- style(),- as_dict()and- as_tuple()for details.- Note - When a tuple on a structured stream, that contains nested tuples, is passed into a Python callable it is always converted to a - dictobject containing all attributes of the stream tuple.- When a Python object is submitted to a structured stream, for example as the return from the function invoked in a - map()with the schema parameter set, it must be:- A Python - dict. Attributes are set by name using value in the dict for the name. If a value does not exist (the name does not exist as a key) or is set to None then the attribute has its default value, zero, false, empty list or string etc.
- A Python - tupleor named tuple. Attributes are set by position, with the first attribute being the value at index 0 in the Python tuple. If a value does not exist (the tuple has less values than the structured schema) or is set to None then the attribute has its default value, zero, false, empty list or string etc.
 - Parameters
- schema (str) – Schema definition. Either a schema definition or the name of an SPL type. 
 - New in version 1.16: Support for nested tuples (conversion to SPL from Python or conversion to Python from SPL) - 
as_dict()¶
- Create a structured schema that will pass stream tuples into callables as - dictinstances. This allows a return to the default calling style for a structured schema.- If this instance represents a common schema then it will be returned without modification. Stream tuples with common schemas are always passed according to their definition. - Returns
- Schema passing stream tuples as - dictif allowed.
- Return type
 - New in version 1.8. 
 - 
as_tuple(named=None)¶
- Create a structured schema that will pass stream tuples into callables as - tupleinstances.- If this instance represents a common schema then it will be returned without modification. Stream tuples with common schemas are always passed according to their definition. - Passing as tuple - When named evaluates to - Falsethen each stream tuple will be passed as a- tuple. For example with a structured schema of- tuple<rstring id, float64 value>a value is passed as- ('TempSensor', 27.4)and access to the first attribute is- t[0]and the second as- t[1]where- trepresents the passed value..- Passing as named tuple - When named is - Trueor a- strthen each stream tuple will be passed as a named tuple. For example with a structured schema of- tuple<rstring id, float64 value>a value is passed as- ('TempSensor', 27.4)and access to the first attribute is- t.id(or- t[0]) and the second as- t.value(- t[1]) where- trepresents the passed value.- Warning - If an schema’s attribute name is not a valid Python identifier or starts with an underscore then it will be renamed as positional name - _n. For example, with the schema- tuple<int32 a, int32 def, int32 id>the field names are- a,- _1,- _2.- The value of named is used as the name of the named tuple class with - StreamTupleused when named is- True.- It is not guaranteed that the class of the namedtuple is the same for all callables processing tuples with the same structured schema, only that the tuple is a named tuple with the correct field names. - Parameters
- named – Pass stream tuples as a named tuple. If not set then stream tuples are passed as instances of - tuple.
- Returns
- Schema passing stream tuples as - tupleif allowed.
- Return type
 - New in version 1.8. - New in version 1.9: Addition of named parameter. 
 - 
extend(schema)¶
- Extend a structured schema by another. - For example extending - tuple<rstring id, timestamp ts, float64 value>with- tuple<float32 score>results in- tuple<rstring id, timestamp ts, float64 value, float32 score>.- Parameters
- schema (StreamSchema) – Schema to extend this schema by. 
- Returns
- New schema that is an extension of this schema. 
- Return type
 
 - 
schema()¶
- Private method. May be removed at any time. 
 - 
property style¶
- Style stream tuples will be passed into a callable. - For the common schemas the style is fixed: - CommonSchema.Python-- object- Stream tuples are arbitrary objects.
- CommonSchema.String-- str- Stream tuples are unicode strings.
- CommonSchema.Json-- dict- Stream tuples are a- dictthat represents the JSON object.
 - For a structured schema the supported styles are: - dict- Stream tuples are passed as a- dictwith the key being the attribute name and and the value the attribute value. This is the default.- E.g. with a schema of - tuple<rstring id, float32 value>a value is passed as- {'id':'TempSensor', 'value':20.3}.
 
- tuple- Stream tuples are passed as a- tuplewith the value being the attributes value in order. A schema is set to pass stream tuples as tuples using- as_tuple().- E.g. with a schema of - tuple<rstring id, float32 value>a value is passed as- ('TempSensor', 20.3).
 
- namedtuple- Stream tuples are passed as a named tuple (see- collections.namedtuple) with the value being the attributes value in order. Field names correspond to the attribute names of the schema. A schema is set to pass stream tuples as named tuples using- as_tuple()setting the named parameter.
 - Returns
- Class of tuples that will be passed into callables. 
- Return type
- type 
 - New in version 1.8. - New in version 1.9: Support for namedtuple. 
 
- 
class streamsx.topology.schema.CommonSchema¶
- Bases: - enum.Enum- Common stream schemas for interoperability within Streams applications. - Streams application can publish streams that are subscribed to by other applications. Use of common schemas allow streams connections regardless of the application implementation language. - Python applications publish streams using - publish()and subscribe using- subscribe().- 
Binary= <streamsx.topology.schema.StreamSchema object>¶
- Stream where each tuple is a binary object (sequence of bytes). - Warning - Binary is not yet supported for Python applications. 
 - 
Json= <streamsx.topology.schema.StreamSchema object>¶
- Stream where each tuple is logically a JSON object. - Json can be used as a natural interchange format between Streams applications implemented in different programming languages. All languages supported by Streams support publishing and subscribing to JSON streams. - A Python callable receives each tuple as a dict as though it was created from - json.loads(json_formatted_str)where json_formatted_str is the JSON formatted representation of tuple.- Python objects that are to be converted to JSON objects must be supported by JSONEncoder. If the object is not a dict then it will be converted to a JSON object with a single key payload containing the value. 
 - 
Python= <streamsx.topology.schema.StreamSchema object>¶
- Stream where each tuple is a Python object. Each object must be picklable to allow execution in a distributed environment where streams can connect processes running on the same or different resources. - Python streams can only be used by Python applications. 
 - 
String= <streamsx.topology.schema.StreamSchema object>¶
- Stream where each tuple is a string. - String can be used as a natural interchange format between Streams applications implemented in different programming languages. All languages supported by Streams support publishing and subscribing to string streams. - A Python callable receives each tuple as a str object. - Python objects are converted to strings using - str(obj).
 - 
XML= <streamsx.topology.schema.StreamSchema object>¶
- Stream where each tuple is an XML document. - Warning - XML is not yet supported for Python applications. 
 - 
extend(schema)¶
- Extend a structured schema by another. - Parameters
- schema (StreamSchema) – Schema to extend this schema by. 
- Returns
- New schema that is an extension of this schema. 
- Return type
 
 - 
schema()¶
- Private method. May be removed at any time. 
 
-