Package com.ibm.streamsx.topology

Java application API for IBM Streams.

See: Description

  • Interface Summary 
    Interface Description
    TopologyElement
    Any element in a Topology.
    TSink
    A handle to the sink of a TStream.
    TStream<T>
    A TStream is a declaration of a continuous sequence of tuples.
    TWindow<T,K>
    Declares a window of tuples for a TStream.
  • Class Summary 
    Class Description
    Topology
    A declaration of a topology of streaming data.
  • Enum Summary 
    Enum Description
    TStream.Routing
    Enumeration for routing tuples to parallel channels.

Package com.ibm.streamsx.topology Description

Java application API for IBM Streams. This API is used to generate streaming topologies for execution by IBM Streams 4.0.1 or later. An instance of Topology is created that then is used to build a topology (or graph) of streams, represented by TStream instances. This topology is the declaration of the application, and it may then be :
  • Submitted to an IBM Streams instance.
  • Executed as an IBM Streams standalone application.
  • Compiled into an IBM Streams application bundle (.sab file).
  • Executed within the local JVM (not all toppologies are supported).
A TStream represents a continuous stream of tuples with a specific type T. Streams are transformed into new streams through functional transformations, defined using a single method of an interface like Function.

A transformation consists of an instance of a functional interface, known as the functional logic and how that logic is applied against tuples on the stream. The method on TStream the functional logic is passed into to create a new stream, declares how the functional logic is applied, for example:

Functional Logic semantics

Declaring Functional Logic

When declaring (building) the topology any functional logic is an instance of a Java interface, such as Predicate. These objects are always serialized and deserialized before being executed against tuples on streams, regardless of the type of the StreamsContext.
  • If the instance is declared using an anonymous class then it should be declared in a static context (e.g. static method). If the anonymous class is not declared in a static context then it will also capture the reference of the object that declares it, typically this object (the code declaring the topology) will not be serializable, and is not required by the anonymous class for execution.
    • The code declaring the topology passes parameters into an anonymous class by having local variables or method parameters that are final and thus can be captured by the anonymous class.
    • The code within an anonymous class must not reference any static fields set up by the code declaring the topology, as the code declaring the topology is not executed when the topology itself is executed. For example a method in an anonymous class that references a static file name set when declaring the topology, will not see that value when executing at runtime.
  • When the functional logic is deserialized before being executed at runtime, its constructor is not called. Thus any transient fields either need to be initialized on their first use or using the Java serialization hooks readObject() or readResolve(). The sample RegexGrep has an example of this.
  • A single deserialized instance of functional logic is called for all tuples on each parallel channel, when the stream is not parallelized a single instance is called.

Synchronization of Functional Logic

  • Call to a functional logic are not executed concurrently.
  • Each call to a functional logic happens-before any subsequent call to it.

Thus there is no need for a functional logic to have synchronization to protect state from multiple concurrent calls or to ensure visibility.

Stateful Functional Logic

A functional logic instance lives for the lifetime of its container (embedded JVM, standalone process or processing element in distributed mode), thus it may maintain state across the invocations of its method. For a de-duplicating Predicate may maintain a collection of previously seen tuples on the stream to filter out duplicates.
In distributed mode, if a processing element (PE) restarts then any state will be lost and a new functional logic instance set to its initial deserialized state is created.
For future compatibility:
  • any state that should not be persisted on a checkpoint must be marked as transient, such as connections to external systems.
  • any non-changing instance fields should be marked as final.

Pass by reference semantics for tuples

Where possible, tuples are passed by reference from one stream to another, thus in a general case, a tuple (as a Java object) is returned from one functional logic instance and passed into one or more downstream functional logic instances, using its reference. For example, with the flow:
s.transform(trans1).filter(filt).transform(trans2)
a tuple t returned from trans1.apply(t) may be passed directly as filt.test(t) and to trans2.apply(t) if the former returned true.
When a tuple is not, or can not, be passed by reference, for example when functional logic instances are being executed on different hosts in a distributed environment, then the tuple object will passed using serialization and de-serialization.

Ensuring consistent behavior

To ensure consistent behavior regardless of how the topology is executed, these recommendations should be followed:
  • Where possible TStream types should be immutable, which means any downstream functional logic cannot modify any tuple through its reference. Thus a reference to a tuple may be safely passed to multiple downstream functional logic instances.
  • Any functional logic discards any reference to any mutable returned item, thus any change made to an mutable tuple object are not seen by any upstream functional logic.
  • Any functional logic discards any reference to any mutable argument when it returns from its method..

Included Libraries

This API requires the IBM Streams Java Operator API which results in these libraries in the class path:
streamsx.topology 2.1 @ IBMStreams GitHub