streamsx.topology.schema

Schemas for streams.

Overview

A stream represents an unbounded flow of tuples with a declared schema so that each tuple on the stream complies with the schema. A stream’s schema may be one of:

  • StreamsSchema structured schema - a tuple is a sequence of attributes, and an attribute is a named value of a specific type.

  • Json a tuple is a JSON object.

  • String a tuple is a string.

  • Python a tuple is any Python object, effectively an untyped stream.

Structured schemas

A structured schema is a sequence of attributes, and an attribute is a named value of a specific type. For example a stream of sensor readings can be represented as a schema with three attributes sensor_id, ts and reading with types of int64, int64 and float64 respectively.

This schema can be declared a number of ways:

Python 3.6:

class SensorReading(typing.NamedTuple):
    sensor_id: int
    ts: int
    reading: float

sensors = raw_readings.map(parse_sensor, schema=SensorReading)

Python 3:

sensors = raw_readings.map(parse_sensor,
    schema='tuple<int64 sensor_id, int64 ts, float64 reading>')

The supported types are defined by IBM Streams and are listed in StreamSchema.

Nested structured schemas

A structured schema can contain nested structures that are defined separately.

Python 3.6:

class Sensor(typing.NamedTuple):
    manufacturer: str
    sensor_id: int

class SensorReading(typing.NamedTuple):
    sensor: Sensor
    ts: int
    reading: float

sensors = raw_readings.map(parse_sensor, schema=SensorReading)

Python 3:

sensors = raw_readings.map(parse_sensor,
    schema='tuple<tuple<rstring manufacturer, int64 sensor_id> sensor, int64 ts, float64 reading>')

Both schema definitions are equivalent.

Structured schemas provide type-safety and efficient network serialization when compared to passing a dict using Python streams.

Streams with structured schemas can be interchanged with any IBM Streams application using publish() and subscribe() maintaining type safety.

Defining a stream’s schema

Every stream within a Topology has defined schema. The schema may be defined explictly (for example map() or subscribe()) or implicity (for example filter() produces a stream with the same schema as its input stream).

Explictly defining a stream’s schema is flexible and various types of values are accepted as the schema.

  • Builtin types as aliases for common schema types:

  • Values of the enumeration CommonSchema

  • An instance of typing.NamedTuple (Python 3)

  • An instance of StreamSchema

  • A string of the format tuple<...> defining the attribute names and types. See StreamSchema for details on the format and types supported.

  • A string containing a namespace qualified SPL stream type (e.g. com.ibm.streams.geospatial::FlightPathEncounterTypes.Observation3D)

Module contents

Functions

is_common

Is schema an common schema.

Classes

CommonSchema

Common stream schemas for interoperability within Streams applications.

StreamSchema

Defines a schema for a structured stream.

streamsx.topology.schema.is_common(schema)

Is schema an common schema.

Parameters

schema – Scheme to test.

Returns

True if schema is a common schema, otherwise False.

Return type

bool

class streamsx.topology.schema.StreamSchema(schema)

Bases: object

Defines a schema for a structured stream.

On a structured stream a tuple is a sequence of attributes, and an attribute is a named value of a specific type.

The supported types are defined by IBM Streams and include such types as int8, int16, rstring and list<float32>.

A structured schema can be defined using a typing.NamedTuple in Python 3, a string with the syntax tuple<type name [,...]> or an instance of this class.

typing.NamedTuple:

A typing.NamedTuple can be used to define a structured schema with the field names and types mapping to the structured schema attribute names and types.

Python types are mapped to IBM Streams types as follows:

Python type

IBM Streams type

str

rstring

bool

boolean

int

int64

float

float64

decimal.Decimal

decimal128

complex

complex64

bytes

blob

streamsx.spl.types.Timestamp

timestamp

datetime.datetime

timestamp

typing.List[T]

list<T>

typing.Set[T]

set<T>

typing.Mapping[K,V]

map<K,V>

typing.Optional[T]

optional<T>

Note

Tuples on a stream with a schema defined by a typing.NamedTuple instance are passed into callables as instance of a named tuple with the the correct field names and types unless the named tuple contains nested named tuples at any nesting depth. When passed as named tuple, there is no guarantee to be the same class instance as the one used to declare the schema.

Tuple string:

A string of the format tuple<type name [,…]> can be used to define a structured schema, where type is an IBM Streams type.

Example:

tuple<rstring id, timestamp ts, float64 value>

represents a schema with three attributes suitable for a sensor reading.

IBM Streams types:

Type

Description

Python representation

Conversion from Python

boolean

True or False

bool

bool(value)

int8

8-bit signed integer

int

int(value) truncated to 8 bits

int16

16-bit signed integer

int

int(value) truncated to 16 bits

int32

32-bit signed integer

int

int(value) truncated to 32 bits

int64

64-bit signed integer

int

int(value)

uint8

8-bit unsigned integer

int

uint16

16-bit unsigned integer

int

uint32

32-bit unsigned integer

int

uint64

64-bit unsigned integer

int

float32

32-bit binary floating point

float

float(value) truncated to 32 bits

float64

64-bit binary floating point

float

float(value)

decimal32

32-bit decimal floating point

decimal.Decimal

decimal.Decimal(value) normalized to IEEE 754 decimal32

decimal64

64-bit decimal floating point

decimal.Decimal

decimal.Decimal(value) normalized to IEEE 754 decimal64

decimal128

128-bit decimal floating point

decimal.Decimal

decimal.Decimal(value) normalized to IEEE 754 decimal128

complex32

complex with float32 values

complex

complex(value) with real and imaginary values truncated to 32 bits

complex64

complex with float64 values

complex

complex(value)

timestamp

Nanosecond timestamp

Timestamp

rstring

UTF-8 string

str

str(value)

rstring[N]

Bounded UTF-8 string

str

str(value)

ustring

UTF-16 string

str

str(value)

blob

Sequence of bytes

memoryview

list<T>

List with elements of type T

list

list<T>[N]

Bounded list

list

set<T>

Set with elements of type T

set

set<T>[N]

Bounded set

set

map<K,V>

Map with typed keys and values

dict

map<K,V>[N]

Bounded map, limted to N pairs

dict

optional<T>

Optional value of type T

Value of type T, or None

Value of for type T

enum{id [,...]}

Enumeration

Not supported

Not supported

xml

XML value

Not supported

Not supported

tuple<type name [, ...]>

Nested tuple

dict

dict

Note

Type optional<T> requires IBM Streams 4.3 or later.

Note

Conversion to or from Python:

  • Type set<T> is restricted to primitive types

  • Type map<K,V> is restricted to primitive types for the key type K

Python representation is how an attribute value in a structured schema is passed into a Python function.

Conversion from Python indicates how a value from Python is converted to an attribute value in a structured schema. For example a value v assigned to float64 attribute is converted as though float(v) is called first, thus v may be a float, int or any type that has a __float__ method.

When a type is not supported in Python it can only be used in a schema used for streams produced and consumed by invocation of SPL operators.

A StreamSchema can be created by passing a string of the form tuple<...> or by passing the name of an SPL type from an SPL toolkit, for example com.ibm.streamsx.transportation.vehicle::VehicleLocation.

Attribute names must start with an ASCII letter or underscore, followed by ASCII letters, digits, or underscores.

When a tuple on a structured stream is passed into a Python callable it is converted to a dict, tuple or named tuple object containing all attributes of the stream tuple. See style(), as_dict() and as_tuple() for details.

Note

When a tuple on a structured stream, that contains nested tuples, is passed into a Python callable it is always converted to a dict object containing all attributes of the stream tuple.

When a Python object is submitted to a structured stream, for example as the return from the function invoked in a map() with the schema parameter set, it must be:

  • A Python dict. Attributes are set by name using value in the dict for the name. If a value does not exist (the name does not exist as a key) or is set to None then the attribute has its default value, zero, false, empty list or string etc.

  • A Python tuple or named tuple. Attributes are set by position, with the first attribute being the value at index 0 in the Python tuple. If a value does not exist (the tuple has less values than the structured schema) or is set to None then the attribute has its default value, zero, false, empty list or string etc.

Parameters

schema (str) – Schema definition. Either a schema definition or the name of an SPL type.

New in version 1.16: Support for nested tuples (conversion to SPL from Python or conversion to Python from SPL)

as_dict()

Create a structured schema that will pass stream tuples into callables as dict instances. This allows a return to the default calling style for a structured schema.

If this instance represents a common schema then it will be returned without modification. Stream tuples with common schemas are always passed according to their definition.

Returns

Schema passing stream tuples as dict if allowed.

Return type

StreamSchema

New in version 1.8.

as_tuple(named=None)

Create a structured schema that will pass stream tuples into callables as tuple instances.

If this instance represents a common schema then it will be returned without modification. Stream tuples with common schemas are always passed according to their definition.

Passing as tuple

When named evaluates to False then each stream tuple will be passed as a tuple. For example with a structured schema of tuple<rstring id, float64 value> a value is passed as ('TempSensor', 27.4) and access to the first attribute is t[0] and the second as t[1] where t represents the passed value..

Passing as named tuple

When named is True or a str then each stream tuple will be passed as a named tuple. For example with a structured schema of tuple<rstring id, float64 value> a value is passed as ('TempSensor', 27.4) and access to the first attribute is t.id (or t[0]) and the second as t.value (t[1]) where t represents the passed value.

Warning

If an schema’s attribute name is not a valid Python identifier or starts with an underscore then it will be renamed as positional name _n. For example, with the schema tuple<int32 a, int32 def, int32 id> the field names are a, _1, _2.

The value of named is used as the name of the named tuple class with StreamTuple used when named is True.

It is not guaranteed that the class of the namedtuple is the same for all callables processing tuples with the same structured schema, only that the tuple is a named tuple with the correct field names.

Parameters

named – Pass stream tuples as a named tuple. If not set then stream tuples are passed as instances of tuple.

Returns

Schema passing stream tuples as tuple if allowed.

Return type

StreamSchema

New in version 1.8.

New in version 1.9: Addition of named parameter.

extend(schema)

Extend a structured schema by another.

For example extending tuple<rstring id, timestamp ts, float64 value> with tuple<float32 score> results in tuple<rstring id, timestamp ts, float64 value, float32 score>.

Parameters

schema (StreamSchema) – Schema to extend this schema by.

Returns

New schema that is an extension of this schema.

Return type

StreamSchema

schema()

Private method. May be removed at any time.

property style

Style stream tuples will be passed into a callable.

For the common schemas the style is fixed:

  • CommonSchema.Python - object - Stream tuples are arbitrary objects.

  • CommonSchema.String - str - Stream tuples are unicode strings.

  • CommonSchema.Json - dict - Stream tuples are a dict that represents the JSON object.

For a structured schema the supported styles are:

  • dict - Stream tuples are passed as a dict with the key being the attribute name and and the value the attribute value. This is the default.

    • E.g. with a schema of tuple<rstring id, float32 value> a value is passed as {'id':'TempSensor', 'value':20.3}.

  • tuple - Stream tuples are passed as a tuple with the value being the attributes value in order. A schema is set to pass stream tuples as tuples using as_tuple().

    • E.g. with a schema of tuple<rstring id, float32 value> a value is passed as ('TempSensor', 20.3).

  • namedtuple - Stream tuples are passed as a named tuple (see collections.namedtuple) with the value being the attributes value in order. Field names correspond to the attribute names of the schema. A schema is set to pass stream tuples as named tuples using as_tuple() setting the named parameter.

Returns

Class of tuples that will be passed into callables.

Return type

type

New in version 1.8.

New in version 1.9: Support for namedtuple.

class streamsx.topology.schema.CommonSchema

Bases: enum.Enum

Common stream schemas for interoperability within Streams applications.

Streams application can publish streams that are subscribed to by other applications. Use of common schemas allow streams connections regardless of the application implementation language.

Python applications publish streams using publish() and subscribe using subscribe().

  • Python - Stream constains Python objects.

  • Json - Stream contains JSON objects.

  • String - Stream contains strings.

  • Binary - Stream contains binary tuples.

  • XML - Stream contains XML documents.

Binary = <streamsx.topology.schema.StreamSchema object>

Stream where each tuple is a binary object (sequence of bytes).

Warning

Binary is not yet supported for Python applications.

Json = <streamsx.topology.schema.StreamSchema object>

Stream where each tuple is logically a JSON object.

Json can be used as a natural interchange format between Streams applications implemented in different programming languages. All languages supported by Streams support publishing and subscribing to JSON streams.

A Python callable receives each tuple as a dict as though it was created from json.loads(json_formatted_str) where json_formatted_str is the JSON formatted representation of tuple.

Python objects that are to be converted to JSON objects must be supported by JSONEncoder. If the object is not a dict then it will be converted to a JSON object with a single key payload containing the value.

Python = <streamsx.topology.schema.StreamSchema object>

Stream where each tuple is a Python object. Each object must be picklable to allow execution in a distributed environment where streams can connect processes running on the same or different resources.

Python streams can only be used by Python applications.

String = <streamsx.topology.schema.StreamSchema object>

Stream where each tuple is a string.

String can be used as a natural interchange format between Streams applications implemented in different programming languages. All languages supported by Streams support publishing and subscribing to string streams.

A Python callable receives each tuple as a str object.

Python objects are converted to strings using str(obj).

XML = <streamsx.topology.schema.StreamSchema object>

Stream where each tuple is an XML document.

Warning

XML is not yet supported for Python applications.

extend(schema)

Extend a structured schema by another.

Parameters

schema (StreamSchema) – Schema to extend this schema by.

Returns

New schema that is an extension of this schema.

Return type

StreamSchema

schema()

Private method. May be removed at any time.