Interface SPLStream
-
- All Superinterfaces:
- Placeable<TStream<com.ibm.streams.operator.Tuple>>, SPLInput, TopologyElement, TStream<com.ibm.streams.operator.Tuple>
public interface SPLStream extends TStream<com.ibm.streams.operator.Tuple>, SPLInput
ASPLStream
is a declaration of a continuous sequence of tuples with a defined SPL schema. ASPLStream
is a TStream<Tuple> thus may be handled using any functional logic where each tuple will be an instance ofcom.ibm.streams.operator.Tuple
.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface com.ibm.streamsx.topology.TStream
TStream.Routing
-
-
Method Summary
All Methods Instance Methods Abstract Methods Modifier and Type Method and Description SPLStream
autonomous()
Return a stream matching this stream whose subsequent processing will execute in an autonomous region.SPLStream
colocate(Placeable<?>... elements)
Colocate this element with other topology elements so that at runtime they all execute within the same processing element (PE or operating system process).<T> TStream<T>
convert(Function<com.ibm.streams.operator.Tuple,T> convertor)
Convert SPL tuples into Java objects.SPLStream
endLowLatency()
Return a TStream that is no longer guaranteed to run in the same process as the calling stream.SPLStream
endParallel()
Ends a parallel region by merging the channels into a single stream.SPLStream
filter(Predicate<com.ibm.streams.operator.Tuple> filter)
Declare a new stream that filters tuples from this stream.com.ibm.streams.operator.StreamSchema
getSchema()
SPL schema of this stream.SPLStream
invocationName(java.lang.String name)
Set the invocation name of this placeable.SPLStream
isolate()
Return a stream whose immediate subsequent processing will execute in a separate operating system process from this stream's processing.SPLStream
lowLatency()
Return a stream that is guaranteed to run in the same process as the calling TStream.SPLStream
modify(UnaryOperator<com.ibm.streams.operator.Tuple> modifier)
Declare a new stream that modifies each tuple from this stream into one (or zero) tuple of the same typeT
.SPLStream
parallel(int width)
Parallelizes the stream into a a fixed number of parallel channels using round-robin distribution.SPLStream
parallel(Supplier<java.lang.Integer> width, Function<com.ibm.streams.operator.Tuple,?> keyFunction)
Parallelizes the stream into a number of parallel channels using key based distribution.SPLStream
parallel(Supplier<java.lang.Integer> width, TStream.Routing routing)
Parallelizes the stream intowidth
parallel channels.SPLStream
sample(double fraction)
Return a stream that is a random sample of this stream.SPLStream
setConsistent(ConsistentRegionConfig config)
Set the operator that is the source of this stream to be the start of a consistent region to support at least once and exactly once processing.SPLStream
setParallel(Supplier<java.lang.Integer> width)
Sets the current stream as the start of a parallel region.SPLStream
throttle(long delay, java.util.concurrent.TimeUnit unit)
Throttle a stream by ensuring any tuple is submitted with leastdelay
from the previous tuple.TStream<com.ibm.json.java.JSONObject>
toJSON()
Transform SPL tuples into JSON.TStream<java.lang.String>
toStringStream()
Create a TStream<Tuple> from this stream.TStream<java.lang.String>
toTupleString()
Create a stream that converts each input tuple on this stream to its SPL character representation representation.-
Methods inherited from interface com.ibm.streamsx.topology.TStream
asType, connectTo, flatMap, forEach, getTupleClass, getTupleType, join, join, joinLast, joinLast, last, last, last, last, lastSeconds, map, multiTransform, output, parallel, print, publish, publish, publish, publish, sink, split, transform, union, union, window
-
Methods inherited from interface com.ibm.streamsx.topology.context.Placeable
addResourceTags, getInvocationName, getResourceTags, isPlaceable, operator
-
Methods inherited from interface com.ibm.streamsx.topology.TopologyElement
builder, topology
-
-
-
-
Method Detail
-
getSchema
com.ibm.streams.operator.StreamSchema getSchema()
SPL schema of this stream.- Returns:
- SPL schema of this stream.
-
toJSON
TStream<com.ibm.json.java.JSONObject> toJSON()
Transform SPL tuples into JSON. Each tuple from this stream is converted into a JSON representation.-
If
getSchema()
returnsJSONSchemas.JSON
then each tuple is taken as a serialized JSON and deserialized. If the serialized JSON is an array, then a JSON object is created, with a single attributepayload
containing the deserialized value. -
Otherwise the tuple is converted to JSON using the
encoding provided by the SPL Java Operator API
com.ibm.streams.operator.encoding.JSONEncoding
.
- Returns:
- A stream with each tuple as a
JSONObject
.
-
If
-
convert
<T> TStream<T> convert(Function<com.ibm.streams.operator.Tuple,T> convertor)
Convert SPL tuples into Java objects. This call is equivalent totransform(converter)
.- Parameters:
convertor
- Function to convert- Returns:
- Stream containing tuples of type
T
transformed from this stream's SPL tuples. - See Also:
TStream.transform(Function)
-
toTupleString
TStream<java.lang.String> toTupleString()
Create a stream that converts each input tuple on this stream to its SPL character representation representation.- Returns:
- Stream containing SPL character representations of this stream's SPL tuples.
-
toStringStream
TStream<java.lang.String> toStringStream()
Create a TStream<Tuple> from this stream. ThisSPLStream
must have a schema ofSPLSchemas.STRING
.- Returns:
- This stream declared as a TStream<Tuple>.
- Throws:
java.lang.IllegalStateException
- Stream does not have a value schema for TStream<Tuple>.- See Also:
SPLStreams.stringToSPLStream(TStream)
-
endLowLatency
SPLStream endLowLatency()
Return a TStream that is no longer guaranteed to run in the same process as the calling stream. For example, in the following topology:
It is guaranteed that the filter and transform operations will run in the same process, but it is not guaranteed that the transform and filter2 operations will run in the same process.---source---.lowLatency()---filter---transform---.endLowLatency()---filter2
Only applies for distributed contexts.- Specified by:
endLowLatency
in interfaceTStream<com.ibm.streams.operator.Tuple>
- Returns:
- A stream that is not guaranteed to run in the same process as the calling stream.
-
filter
SPLStream filter(Predicate<com.ibm.streams.operator.Tuple> filter)
Declare a new stream that filters tuples from this stream. Each tuplet
on this stream will appear in the returned stream iffilter.test(t)
returnstrue
. Iffilter.test(t)
returnsfalse
then thent
will not appear in the returned stream.Examples of filtering out all empty strings from stream
s
of typeString
// Java 8 - Using lambda expression TStream<String> s = ... TStream<String> filtered = s.filter(t -> !t.isEmpty()); // Java 7 - Using anonymous class TStream<String> s = ... TStream<String> filtered = s.filter(new Predicate<String>() { @Override public boolean test(String t) { return !t.isEmpty(); }} );
- Specified by:
filter
in interfaceTStream<com.ibm.streams.operator.Tuple>
- Parameters:
filter
- Filtering logic to be executed against each tuple.- Returns:
- Filtered stream
- See Also:
TStream.split(int, ToIntFunction)
-
lowLatency
SPLStream lowLatency()
Return a stream that is guaranteed to run in the same process as the calling TStream. All streams that are created from the returned stream are also guaranteed to run in the same process untilTStream.endLowLatency()
is called.
For example, for the following topology:
It is guaranteed that the filter and transform operations will run in the same process.---source---.lowLatency()---filter---transform---.endLowLatency()---
Only applies for distributed contexts.- Specified by:
lowLatency
in interfaceTStream<com.ibm.streams.operator.Tuple>
- Returns:
- A stream that is guaranteed to run in the same process as the calling stream.
-
isolate
SPLStream isolate()
Return a stream whose immediate subsequent processing will execute in a separate operating system process from this stream's processing.
For the following Topology:
It is guaranteed that:-->transform1-->.isolate()-->transform2-->transform3-->.isolate()-->sink
transform1
andtransform2
will execute in separate processes.transform3
andsink
will execute in separate processes.
t1, t2, t3
) are applied to a stream returned fromisolate()
then it is guaranteed that each of them will execute in a separate operating system process to this stream, but no guarantees are made about wheret1, t2, t3
are placed in relationship to each other.
Only applies for distributed contexts.
-
modify
SPLStream modify(UnaryOperator<com.ibm.streams.operator.Tuple> modifier)
Declare a new stream that modifies each tuple from this stream into one (or zero) tuple of the same typeT
. For each tuplet
on this stream, the returned stream will contain a tuple that is the result ofmodifier.apply(t)
when the return is notnull
. The function may return the same reference as its inputt
or a different object of the same type. Ifmodifier.apply(t)
returnsnull
then no tuple is submitted to the returned stream fort
.Example of modifying a stream
String
values by adding the suffix 'extra
'.TStream<String> strings = ... TStream<String> modifiedStrings = strings.modify(new UnaryOperator
() { @Override public String apply(String tuple) { return tuple.concat("extra"); }}); This method is equivalent to
transform(Function<T,T> modifier
).
-
sample
SPLStream sample(double fraction)
Return a stream that is a random sample of this stream.
-
throttle
SPLStream throttle(long delay, java.util.concurrent.TimeUnit unit)
Throttle a stream by ensuring any tuple is submitted with leastdelay
from the previous tuple.
-
parallel
SPLStream parallel(int width)
Parallelizes the stream into a a fixed number of parallel channels using round-robin distribution.
Tuples are routed to the parallel channels in around-robin fashion
.
Subsequent transformations on the returned stream will be executedwidth
channels untilTStream.endParallel()
is called or the stream terminates.
SeeTStream.parallel(Supplier, Routing)
for more information.
-
parallel
SPLStream parallel(Supplier<java.lang.Integer> width, TStream.Routing routing)
Parallelizes the stream intowidth
parallel channels. Tuples are routed to the parallel channels based on theTStream.Routing
parameter.
IfTStream.Routing.ROUND_ROBIN
is specified the tuples are routed to parallel channels such that an even distribution is maintained.
IfTStream.Routing.HASH_PARTITIONED
is specified then thehashCode()
of the tuple is used to route the tuple to a corresponding channel, so that all tuples with the same hash code are sent to the same channel.
IfTStream.Routing.KEY_PARTITIONED
is specified each tuple is is taken to be its own key and is routed so that all tuples with the same key are sent to the same channel. This is equivalent to callingTStream.parallel(Supplier, Function)
with an identity function.
Source operations may be parallelized as well, refer toTStream.setParallel(Supplier)
for more information.
Given the following code:TStream<String> myStream = topology.source(...); TStream<String> parallelStart = myStream.parallel(of(3), TStream.Routing.ROUND_ROBIN); TStream<String> inParallel = parallelStart.map(...); TStream<String> joinedParallelStreams = inParallel.endParallel(); joinedParallelStreams.print();
Callingparallel(3)
creates three parallel channels. Each of the 3 channels contains separate instantiations of the operations (in this case, just map) declared in the region. Such stream operations are run in parallel as follows:
Execution Context Parallel Behavior Standalone Each parallel channel is separately executed by one or more threads. The number of threads executing a channel is exactly 1 thread per input to the channel. Distributed A parallel channel is never run in the same process as another parallel channel. Furthermore, a single parallel channel may executed across multiple processes, as determined by the Streams runtime. Streaming Analytics service Same as Distributed. Embedded All parallel information is ignored, and the application is executed without any added parallelism.
A parallel region can have multiple inputs and multiple outputs. An input to a parallel region is a stream on whichTStream.parallel(int)
has been called, and an output is a stream on whichTStream.endParallel()
has been called. A parallel region with multiple inputs is created if a stream in one parallel region connects with a stream in another parallel region.
Two streams "connect" if:- One stream invokes
TStream.union(TStream)
using the other as a parameter. - Both streams are used as inputs to an SPL operator created through
invokeOperator
which has multiple input ports.
For example, the following code connects two separate parallel regions into a single parallel region with multiple inputs:TStream<String> firstStream = topology.source(...); TStream<String> secondStream = topology.source(...); TStream<String> firstParallelStart = firstStream.parallel(of(3), TStream.Routing.ROUND_ROBIN); TStream<String> secondParallelStart = secondStream.parallel(of(3), TStream.Routing.ROUND_ROBIN); TStream<String> fistMapOutput = firstParallelStart.map(...); TStream<String> unionedStreams = firstMapOutput.union(secondParallelStart); TStream<String> secondMapOutput = unionedStreams.map(...); TStream<String> nonParallelStream = secondMapOutput.endParallel(); nonParallelStream.print();
When creating a parallel region with multiple inputs, the different inputs must all have the same value for the degree of parallelism. For example, it can not be the case that one parallel region input specifies a width of 4, while another input to the same region specifies a width of 5. Additionally, if a submission time parameter is used to specify the width of a parallel region, then different inputs to that region must all use the same submission time parameter object.
A parallel region may contain a sink; it is not required that a parallel region have an output stream. The following defines a sink in a parallel region:TStream<String> myStream = topology.source(...); TStream<String> myParallelStream = myStream.parallel(6); myParallelStream.print();
TStream.endParallel()
This results in the following graph:
A parallel region with multiple output streams can be created by invokingTStream.endParallel()
on multiple streams within the same parallel region. For example, the following code defines a parallel region with multiple output streams:TStream<String> myStream = topology.source(...); TStream<String> parallelStart = myStream.parallel(of(3), TStream.Routing.ROUND_ROBIN); TStream<String> firstInParallel = parallelStart.map(...); TStream<String> secondInParallel = parallelStart.map(...); TStream<String> firstParallelOutput = firstInParallel.endParallel(); TStream<String> secondParallelOutput = secondInParallel.endParallel();
When a stream outside of a parallel region connects to a stream inside of a parallel region, the outside stream and all of its prior operations implicitly become part of the parallel region. For example, the following code connects a stream outside a parallel region to a stream inside of a parallel region.TStream<String> firstStream = topology.source(...); TStream<String> secondStream = topology.source(...); TStream<String> parallelStream = firstStream.parallel(of(3), TStream.Routing.ROUND_ROBIN); TStream<String> secondParallelStart = TStream<String> firstInParallel = firstParallelStart.map(...); TStream<String> secondInParallel = secondParallelStart.map(...); TStream<String> unionStream = firstInParallel.union(secondInParallel); TStream<String> nonParallelStream = unionStream.endParallel(); nonParallelStream.print();
The Streams runtime supports the nesting of parallel regions inside of another parallel region. A parallel region will become nested inside of another parallel region in one of two cases:-
If
TStream.parallel(int)
is invoked on a stream which is already inside of a parallel region. - A stream inside of a parallel region becomes connected to a stream that has a parallel region in its previous operations.
TStream.parallel(int)
twice on the same stream creates a nested parallel region as follows:TStream<String> stream = topology.source(...); stream.parallel(3).map(...).parallel(3).map(...).endParallel().endParallel();
Whereas the first map operation is instantiated exactly 3 times due toparallel(3)
, the second map operation is instantiated a total of 9 times since each of the 3 enclosing parallel channels holds 3 nested parallel channels. TheTStream#Routing
configurations of the enclosing and nested parallel regions do not need to match.
As previously mentioned, nesting also occurs when a stream inside of a parallel region becomes connected to a stream that has a parallel region in its previous operations. The following code creates such a situation:TStream<String> streamToJoin = topology.source(...); streamToJoin.setParallel(); streamToJoin = streamToJoin.map(...).endParallel(); TStream<String> parallelStream = topology.source(...); parallelStream = parallelStream.parallel(4); parallelStream = parallelStream.union(streamToJoin); parallelStream.map(...).endParallel().print();
Limitations of parallel() are as follows:
Every call toendParallel()
must have a call toparallel(...)
preceding it. The following is invalid:TStream
myStream = topology.strings("a","b","c"); myStream.endParallel(); TWindow
numWindow = topology.strings("1","2","3").last(3); TStream stringStream = topology.strings("a","b","c"); TStream paraStringStream = myStream.parallel(5); TStream filtered = myStream.filter(...); filtered.join(numWindow, ...); - Specified by:
parallel
in interfaceTStream<com.ibm.streams.operator.Tuple>
- Parameters:
width
- The degree of parallelism. seeTStream.parallel(int width)
for more details.routing
- Defines how tuples will be routed channels.- Returns:
- A reference to a TStream<> at the beginning of the parallel region.
- See Also:
Topology.createSubmissionParameter(String, Class)
,TStream.split(int, ToIntFunction)
- One stream invokes
-
parallel
SPLStream parallel(Supplier<java.lang.Integer> width, Function<com.ibm.streams.operator.Tuple,?> keyFunction)
Parallelizes the stream into a number of parallel channels using key based distribution.
For each tuplet
keyer.apply(t)
is called and then the tuples are routed so that all tuples with thesame key are sent to the same channel
.- Specified by:
parallel
in interfaceTStream<com.ibm.streams.operator.Tuple>
- Parameters:
width
- The degree of parallelism.keyFunction
- Function to obtain the key from each tuple.- Returns:
- A reference to a stream with
width
channels at the beginning of the parallel region. - See Also:
TStream.Routing.KEY_PARTITIONED
,TStream.parallel(Supplier, Routing)
-
setParallel
SPLStream setParallel(Supplier<java.lang.Integer> width)
Sets the current stream as the start of a parallel region.- Specified by:
setParallel
in interfaceTStream<com.ibm.streams.operator.Tuple>
- Parameters:
width
- The degree of parallelism.- See Also:
TStream.parallel(int)
,TStream.parallel(Supplier, Routing)
,TStream.parallel(Supplier, Function)
-
endParallel
SPLStream endParallel()
Ends a parallel region by merging the channels into a single stream.- Specified by:
endParallel
in interfaceTStream<com.ibm.streams.operator.Tuple>
- Returns:
- A stream for which subsequent transformations are no longer parallelized.
- See Also:
TStream.parallel(int)
,TStream.parallel(Supplier, Routing)
,TStream.parallel(Supplier, Function)
-
autonomous
SPLStream autonomous()
Return a stream matching this stream whose subsequent processing will execute in an autonomous region. By default IBM Streams processing is executed in an autonomous region where any checkpointing of operator state is autonomous (independent) of other operators.
This method may be used to end a consistent region by starting an autonomous region. This may be called even if this stream is in an autonomous region.
Autonomous is not applicable when a topology is submitted toembedded
andstandalone
contexts and will be ignored.- Specified by:
autonomous
in interfaceTStream<com.ibm.streams.operator.Tuple>
-
setConsistent
SPLStream setConsistent(ConsistentRegionConfig config)
Set the operator that is the source of this stream to be the start of a consistent region to support at least once and exactly once processing. IBM Streams calculates the boundaries of the consistent region based on the reachability graph of this stream. A region can be bounded though use ofTStream.autonomous()
.Consistent regions are only supported in distributed contexts.
This must be called on a stream directly produced by an SPL operator that supports consistent regions. Source streams produced by methods on
Topology
do not support consistent regions.- Specified by:
setConsistent
in interfaceTStream<com.ibm.streams.operator.Tuple>
- Returns:
- this
- See Also:
ConsistentRegionConfig
-
colocate
SPLStream colocate(Placeable<?>... elements)
Colocate this element with other topology elements so that at runtime they all execute within the same processing element (PE or operating system process).elements
may contain anyPlaceable
within the same topology, there is no requirement that the element is connected (by a stream) directly or indirectly to this element.When a set of colocated elements are completely within a single parallel region then the colocation constraint is limited to that channel. So if elements
A
andB
are colocated then it is guaranteed thatA[0],B[0]
from channel 0 are in a single PE andA[1],B[1]
from channel 1 are in a single PE, but typically a different PE toA[0],B[0]
.
When a a set of colocated elements are not completely within a single parallel region then the colocation constraint applies to the whole topology. So ifA,B
are in a parallel region butC
is outside then all the replicas ofA,B
are colocated withC
and each other.Colocation is also referred to as fusing, when topology elements are connected by streams then the communication between them is in-process using direct method calls (instead of a network transport such as TCP).
-
invocationName
SPLStream invocationName(java.lang.String name)
Set the invocation name of this placeable. Allows an invocation name to be assigned to this placeable. The name is visible in the Streams console for a running job and is associated with the logic for this placeable.Names must be unique within a topology, if this name is already in use then the '
name_N'
will be used whereN
is a number that makes the name unique within the topology.For example the invocation for sending alerts as SMS (text) messages could be named as SMSAlerts:
TStream<Alert> alerts = ... alerts.forEach(Alert::sendAsTextMsg).name("SMSAlerts");
Note that
name
will eventually resolve into an identifier for an operator name and/or a stream name. Identifiers are limited to ASCII characters, thusname
will be modified at code generation time if it cannot be represented as an identifier.- Specified by:
invocationName
in interfacePlaceable<TStream<com.ibm.streams.operator.Tuple>>
- Parameters:
name
- Name to assigned.- Returns:
- This.
- See Also:
Placeable.getInvocationName()
-
-