com.ibm.streamsx.topology.spl

Interface SPLStream

  • All Superinterfaces:
    Placeable<TStream<com.ibm.streams.operator.Tuple>>, SPLInput, TopologyElement, TStream<com.ibm.streams.operator.Tuple>


    public interface SPLStream
    extends TStream<com.ibm.streams.operator.Tuple>, SPLInput
    A SPLStream is a declaration of a continuous sequence of tuples with a defined SPL schema. A SPLStream is a TStream<Tuple> thus may be handled using any functional logic where each tuple will be an instance of com.ibm.streams.operator.Tuple.
    • Method Detail

      • getSchema

        com.ibm.streams.operator.StreamSchema getSchema()
        SPL schema of this stream.
        Returns:
        SPL schema of this stream.
      • toJSON

        TStream<com.ibm.json.java.JSONObject> toJSON()
        Transform SPL tuples into JSON. Each tuple from this stream is converted into a JSON representation.
        • If getSchema() returns JSONSchemas.JSON then each tuple is taken as a serialized JSON and deserialized. If the serialized JSON is an array, then a JSON object is created, with a single attribute payload containing the deserialized value.
        • Otherwise the tuple is converted to JSON using the encoding provided by the SPL Java Operator API com.ibm.streams.operator.encoding.JSONEncoding.
        Returns:
        A stream with each tuple as a JSONObject.
      • convert

        <T> TStream<T> convert(Function<com.ibm.streams.operator.Tuple,T> convertor)
        Convert SPL tuples into Java objects. This call is equivalent to transform(converter).
        Parameters:
        convertor - Function to convert
        Returns:
        Stream containing tuples of type T transformed from this stream's SPL tuples.
        See Also:
        TStream.transform(Function)
      • toTupleString

        TStream<java.lang.String> toTupleString()
        Create a stream that converts each input tuple on this stream to its SPL character representation representation.
        Returns:
        Stream containing SPL character representations of this stream's SPL tuples.
      • toStringStream

        TStream<java.lang.String> toStringStream()
        Create a TStream<Tuple> from this stream. This SPLStream must have a schema of SPLSchemas.STRING.
        Returns:
        This stream declared as a TStream<Tuple>.
        Throws:
        java.lang.IllegalStateException - Stream does not have a value schema for TStream<Tuple>.
        See Also:
        SPLStreams.stringToSPLStream(TStream)
      • endLowLatency

        SPLStream endLowLatency()
        Return a TStream that is no longer guaranteed to run in the same process as the calling stream. For example, in the following topology:
        
         ---source---.lowLatency()---filter---transform---.endLowLatency()---filter2
         
        It is guaranteed that the filter and transform operations will run in the same process, but it is not guaranteed that the transform and filter2 operations will run in the same process.

        Only applies for distributed contexts.
        Specified by:
        endLowLatency in interface TStream<com.ibm.streams.operator.Tuple>
        Returns:
        A stream that is not guaranteed to run in the same process as the calling stream.
      • filter

        SPLStream filter(Predicate<com.ibm.streams.operator.Tuple> filter)
        Declare a new stream that filters tuples from this stream. Each tuple t on this stream will appear in the returned stream if filter.test(t) returns true. If filter.test(t) returns false then then t will not appear in the returned stream.

        Examples of filtering out all empty strings from stream s of type String

         
         // Java 8 - Using lambda expression
         TStream<String> s = ...
         TStream<String> filtered = s.filter(t -> !t.isEmpty());
                     
         // Java 7 - Using anonymous class
         TStream<String> s = ...
         TStream<String> filtered = s.filter(new Predicate<String>() {
                     @Override
                     public boolean test(String t) {
                         return !t.isEmpty();
                     }} );
         
         

        Specified by:
        filter in interface TStream<com.ibm.streams.operator.Tuple>
        Parameters:
        filter - Filtering logic to be executed against each tuple.
        Returns:
        Filtered stream
        See Also:
        TStream.split(int, ToIntFunction)
      • lowLatency

        SPLStream lowLatency()
        Return a stream that is guaranteed to run in the same process as the calling TStream. All streams that are created from the returned stream are also guaranteed to run in the same process until TStream.endLowLatency() is called.

        For example, for the following topology:
        
         ---source---.lowLatency()---filter---transform---.endLowLatency()---
         
        It is guaranteed that the filter and transform operations will run in the same process.

        Only applies for distributed contexts.
        Specified by:
        lowLatency in interface TStream<com.ibm.streams.operator.Tuple>
        Returns:
        A stream that is guaranteed to run in the same process as the calling stream.
      • isolate

        SPLStream isolate()
        Return a stream whose immediate subsequent processing will execute in a separate operating system process from this stream's processing.
        For the following Topology:
        
         -->transform1-->.isolate()-->transform2-->transform3-->.isolate()-->sink
         
        It is guaranteed that:
        • transform1 and transform2 will execute in separate processes.
        • transform3 and sink will execute in separate processes.
        If multiple transformations (t1, t2, t3) are applied to a stream returned from isolate() then it is guaranteed that each of them will execute in a separate operating system process to this stream, but no guarantees are made about where t1, t2, t3 are placed in relationship to each other.
        Only applies for distributed contexts.
        Specified by:
        isolate in interface TStream<com.ibm.streams.operator.Tuple>
        Returns:
        A stream that runs in a separate process from this stream.
      • modify

        SPLStream modify(UnaryOperator<com.ibm.streams.operator.Tuple> modifier)
        Declare a new stream that modifies each tuple from this stream into one (or zero) tuple of the same type T. For each tuple t on this stream, the returned stream will contain a tuple that is the result of modifier.apply(t) when the return is not null. The function may return the same reference as its input t or a different object of the same type. If modifier.apply(t) returns null then no tuple is submitted to the returned stream for t.

        Example of modifying a stream String values by adding the suffix 'extra'.

         
         TStream<String> strings = ...
         TStream<String> modifiedStrings = strings.modify(new UnaryOperator() {
                     @Override
                     public String apply(String tuple) {
                         return tuple.concat("extra");
                     }});
         
         

        This method is equivalent to transform(Function<T,T> modifier).

        Specified by:
        modify in interface TStream<com.ibm.streams.operator.Tuple>
        Parameters:
        modifier - Modifier logic to be executed against each tuple.
        Returns:
        Stream that will contain tuples of type T modified from this stream's tuples.
      • sample

        SPLStream sample(double fraction)
        Return a stream that is a random sample of this stream.
        Specified by:
        sample in interface TStream<com.ibm.streams.operator.Tuple>
        Parameters:
        fraction - Fraction of the data to return.
        Returns:
        Stream containing a random sample of this stream.
      • throttle

        SPLStream throttle(long delay,
                           java.util.concurrent.TimeUnit unit)
        Throttle a stream by ensuring any tuple is submitted with least delay from the previous tuple.
        Specified by:
        throttle in interface TStream<com.ibm.streams.operator.Tuple>
        Parameters:
        delay - Maximum amount to delay a tuple.
        unit - Unit of delay.
        Returns:
        Stream containing all tuples on this stream. but throttled.
      • parallel

        SPLStream parallel(int width)
        Parallelizes the stream into a a fixed number of parallel channels using round-robin distribution.
        Tuples are routed to the parallel channels in a round-robin fashion.
        Subsequent transformations on the returned stream will be executed width channels until TStream.endParallel() is called or the stream terminates.
        See TStream.parallel(Supplier, Routing) for more information.
        Specified by:
        parallel in interface TStream<com.ibm.streams.operator.Tuple>
        Parameters:
        width - The degree of parallelism in the parallel region.
        Returns:
        A reference to a stream for which subsequent transformations will be executed in parallel using width channels.
      • parallel

        SPLStream parallel(Supplier<java.lang.Integer> width,
                           TStream.Routing routing)
        Parallelizes the stream into width parallel channels. Tuples are routed to the parallel channels based on the TStream.Routing parameter.

        If TStream.Routing.ROUND_ROBIN is specified the tuples are routed to parallel channels such that an even distribution is maintained.
        If TStream.Routing.HASH_PARTITIONED is specified then the hashCode() of the tuple is used to route the tuple to a corresponding channel, so that all tuples with the same hash code are sent to the same channel.
        If TStream.Routing.KEY_PARTITIONED is specified each tuple is is taken to be its own key and is routed so that all tuples with the same key are sent to the same channel. This is equivalent to calling TStream.parallel(Supplier, Function) with an identity function.

        Source operations may be parallelized as well, refer to TStream.setParallel(Supplier) for more information.

        Given the following code:
         
         TStream<String> myStream = topology.source(...);
         TStream<String> parallelStart = myStream.parallel(of(3), TStream.Routing.ROUND_ROBIN);
         TStream<String> inParallel = parallelStart.map(...);
         TStream<String> joinedParallelStreams = inParallel.endParallel();
         joinedParallelStreams.print();
         
         
        The following graph is created:


        Calling parallel(3) creates three parallel channels. Each of the 3 channels contains separate instantiations of the operations (in this case, just map) declared in the region. Such stream operations are run in parallel as follows:
        Execution ContextParallel Behavior
        StandaloneEach parallel channel is separately executed by one or more threads. The number of threads executing a channel is exactly 1 thread per input to the channel.
        DistributedA parallel channel is never run in the same process as another parallel channel. Furthermore, a single parallel channel may executed across multiple processes, as determined by the Streams runtime.
        Streaming Analytics serviceSame as Distributed.
        EmbeddedAll parallel information is ignored, and the application is executed without any added parallelism.

        A parallel region can have multiple inputs and multiple outputs. An input to a parallel region is a stream on which TStream.parallel(int) has been called, and an output is a stream on which TStream.endParallel() has been called. A parallel region with multiple inputs is created if a stream in one parallel region connects with a stream in another parallel region.

        Two streams "connect" if:
        • One stream invokes TStream.union(TStream) using the other as a parameter.
        • Both streams are used as inputs to an SPL operator created through invokeOperator which has multiple input ports.

        For example, the following code connects two separate parallel regions into a single parallel region with multiple inputs:
         
         TStream<String> firstStream = topology.source(...);
         TStream<String> secondStream = topology.source(...);
         TStream<String> firstParallelStart = firstStream.parallel(of(3), TStream.Routing.ROUND_ROBIN);
         TStream<String> secondParallelStart = secondStream.parallel(of(3), TStream.Routing.ROUND_ROBIN);
         TStream<String> fistMapOutput = firstParallelStart.map(...);
         TStream<String> unionedStreams = firstMapOutput.union(secondParallelStart);
         TStream<String> secondMapOutput = unionedStreams.map(...);
         TStream<String> nonParallelStream = secondMapOutput.endParallel();
         nonParallelStream.print();
         
         
        This code results in the following graph:


        When creating a parallel region with multiple inputs, the different inputs must all have the same value for the degree of parallelism. For example, it can not be the case that one parallel region input specifies a width of 4, while another input to the same region specifies a width of 5. Additionally, if a submission time parameter is used to specify the width of a parallel region, then different inputs to that region must all use the same submission time parameter object.

        A parallel region may contain a sink; it is not required that a parallel region have an output stream. The following defines a sink in a parallel region:
         
         TStream<String> myStream = topology.source(...);
         TStream<String> myParallelStream = myStream.parallel(6);
         myParallelStream.print();
         
         
        In the above code, the parallel region is implicitly ended by the sink, without calling TStream.endParallel() This results in the following graph:


        A parallel region with multiple output streams can be created by invoking TStream.endParallel() on multiple streams within the same parallel region. For example, the following code defines a parallel region with multiple output streams:
         
         TStream<String> myStream = topology.source(...);
         TStream<String> parallelStart = myStream.parallel(of(3), TStream.Routing.ROUND_ROBIN);
         TStream<String> firstInParallel = parallelStart.map(...);
         TStream<String> secondInParallel = parallelStart.map(...);
         TStream<String> firstParallelOutput = firstInParallel.endParallel();
         TStream<String> secondParallelOutput = secondInParallel.endParallel();
         
         
        The above code would yield the following graph:


        When a stream outside of a parallel region connects to a stream inside of a parallel region, the outside stream and all of its prior operations implicitly become part of the parallel region. For example, the following code connects a stream outside a parallel region to a stream inside of a parallel region.
         
         TStream<String> firstStream = topology.source(...);
         TStream<String> secondStream = topology.source(...);
         TStream<String> parallelStream = firstStream.parallel(of(3), TStream.Routing.ROUND_ROBIN);
         TStream<String> secondParallelStart = 
         TStream<String> firstInParallel = firstParallelStart.map(...);
         TStream<String> secondInParallel = secondParallelStart.map(...);
         TStream<String> unionStream = firstInParallel.union(secondInParallel);
         TStream<String> nonParallelStream = unionStream.endParallel();
         nonParallelStream.print();
         
         
        Once connected, the stream outside of the parallel region (and all of its prior operations) becomes part of the parallel region:

        The Streams runtime supports the nesting of parallel regions inside of another parallel region. A parallel region will become nested inside of another parallel region in one of two cases:
        • If TStream.parallel(int) is invoked on a stream which is already inside of a parallel region.
        • A stream inside of a parallel region becomes connected to a stream that has a parallel region in its previous operations.
        For example, calling TStream.parallel(int) twice on the same stream creates a nested parallel region as follows:
         
         TStream<String> stream = topology.source(...);
         stream.parallel(3).map(...).parallel(3).map(...).endParallel().endParallel();
         
         
        Results in a graph of the following structure:


        Whereas the first map operation is instantiated exactly 3 times due to parallel(3), the second map operation is instantiated a total of 9 times since each of the 3 enclosing parallel channels holds 3 nested parallel channels. The TStream#Routing configurations of the enclosing and nested parallel regions do not need to match.

        As previously mentioned, nesting also occurs when a stream inside of a parallel region becomes connected to a stream that has a parallel region in its previous operations. The following code creates such a situation:
         
         TStream<String> streamToJoin = topology.source(...);
         streamToJoin.setParallel();
         streamToJoin = streamToJoin.map(...).endParallel();
         
         TStream<String> parallelStream = topology.source(...);
         parallelStream = parallelStream.parallel(4);
         
         parallelStream = parallelStream.union(streamToJoin);
         parallelStream.map(...).endParallel().print();
         
         
        This results in the following graph structure:


        Limitations of parallel() are as follows:
        Every call to endParallel() must have a call to parallel(...) preceding it. The following is invalid:
         
         TStream myStream = topology.strings("a","b","c");
         myStream.endParallel();
         
        A parallelized stream cannot be joined with another window, and a parallelized window cannot be joined with a stream. The following is invalid:
         
         TWindow numWindow = topology.strings("1","2","3").last(3);
         TStream stringStream = topology.strings("a","b","c");
         TStream paraStringStream = myStream.parallel(5);
         TStream filtered = myStream.filter(...);
         filtered.join(numWindow, ...);
         
        Specified by:
        parallel in interface TStream<com.ibm.streams.operator.Tuple>
        Parameters:
        width - The degree of parallelism. see TStream.parallel(int width) for more details.
        routing - Defines how tuples will be routed channels.
        Returns:
        A reference to a TStream<> at the beginning of the parallel region.
        See Also:
        Topology.createSubmissionParameter(String, Class), TStream.split(int, ToIntFunction)
      • autonomous

        SPLStream autonomous()
        Return a stream matching this stream whose subsequent processing will execute in an autonomous region. By default IBM Streams processing is executed in an autonomous region where any checkpointing of operator state is autonomous (independent) of other operators.
        This method may be used to end a consistent region by starting an autonomous region. This may be called even if this stream is in an autonomous region.
        Autonomous is not applicable when a topology is submitted to embedded and standalone contexts and will be ignored.
        Specified by:
        autonomous in interface TStream<com.ibm.streams.operator.Tuple>
      • setConsistent

        SPLStream setConsistent(ConsistentRegionConfig config)
        Set the operator that is the source of this stream to be the start of a consistent region to support at least once and exactly once processing. IBM Streams calculates the boundaries of the consistent region based on the reachability graph of this stream. A region can be bounded though use of TStream.autonomous().

        Consistent regions are only supported in distributed contexts.

        This must be called on a stream directly produced by an SPL operator that supports consistent regions. Source streams produced by methods on Topology do not support consistent regions.

        Specified by:
        setConsistent in interface TStream<com.ibm.streams.operator.Tuple>
        Returns:
        this
        See Also:
        ConsistentRegionConfig
      • colocate

        SPLStream colocate(Placeable<?>... elements)
        Colocate this element with other topology elements so that at runtime they all execute within the same processing element (PE or operating system process). elements may contain any Placeable within the same topology, there is no requirement that the element is connected (by a stream) directly or indirectly to this element.

        When a set of colocated elements are completely within a single parallel region then the colocation constraint is limited to that channel. So if elements A and B are colocated then it is guaranteed that A[0],B[0] from channel 0 are in a single PE and A[1],B[1]from channel 1 are in a single PE, but typically a different PE to A[0],B[0].
        When a a set of colocated elements are not completely within a single parallel region then the colocation constraint applies to the whole topology. So if A,B are in a parallel region but C is outside then all the replicas of A,B are colocated with C and each other.

        Colocation is also referred to as fusing, when topology elements are connected by streams then the communication between them is in-process using direct method calls (instead of a network transport such as TCP).

        Specified by:
        colocate in interface Placeable<TStream<com.ibm.streams.operator.Tuple>>
        Parameters:
        elements - Elements to colocate with this container.
        Returns:
        this
      • invocationName

        SPLStream invocationName(java.lang.String name)
        Set the invocation name of this placeable. Allows an invocation name to be assigned to this placeable. The name is visible in the Streams console for a running job and is associated with the logic for this placeable.

        Names must be unique within a topology, if this name is already in use then the 'name_N' will be used where N is a number that makes the name unique within the topology.

        For example the invocation for sending alerts as SMS (text) messages could be named as SMSAlerts:

         
         TStream<Alert> alerts = ...
         
         alerts.forEach(Alert::sendAsTextMsg).name("SMSAlerts"); 
         
         

        Note that name will eventually resolve into an identifier for an operator name and/or a stream name. Identifiers are limited to ASCII characters, thus name will be modified at code generation time if it cannot be represented as an identifier.

        Specified by:
        invocationName in interface Placeable<TStream<com.ibm.streams.operator.Tuple>>
        Parameters:
        name - Name to assigned.
        Returns:
        This.
        See Also:
        Placeable.getInvocationName()
streamsx.topology 2.1 @ IBMStreams GitHub