public interface SPLStream extends TStream<com.ibm.streams.operator.Tuple>, SPLInput
SPLStream
is a declaration of a continuous sequence of tuples with
a defined SPL schema. A SPLStream
is a TStream<Tuple> thus may be
handled using any functional logic where each tuple will be an instance of
com.ibm.streams.operator.Tuple
.TStream.Routing
Modifier and Type | Method and Description |
---|---|
SPLStream |
autonomous()
Return a stream matching this stream whose subsequent
processing will execute in an autonomous region.
|
SPLStream |
colocate(Placeable<?>... elements)
Colocate this element with other topology elements so that
at runtime they all execute within the same processing element
(PE or operating system process).
|
<T> TStream<T> |
convert(Function<com.ibm.streams.operator.Tuple,T> convertor)
Convert SPL tuples into Java objects.
|
SPLStream |
endLowLatency()
Return a TStream that is no longer guaranteed to run in the same process
as the calling stream.
|
SPLStream |
endParallel()
Ends a parallel region by merging the channels into a single stream.
|
SPLStream |
filter(Predicate<com.ibm.streams.operator.Tuple> filter)
Declare a new stream that filters tuples from this stream.
|
com.ibm.streams.operator.StreamSchema |
getSchema()
SPL schema of this stream.
|
SPLStream |
invocationName(java.lang.String name)
Set the invocation name of this placeable.
|
SPLStream |
isolate()
Return a stream whose immediate subsequent processing will execute
in a separate operating system process from this stream's processing.
|
SPLStream |
lowLatency()
Return a stream that is guaranteed to run in the same process as the
calling TStream.
|
SPLStream |
modify(UnaryOperator<com.ibm.streams.operator.Tuple> modifier)
Declare a new stream that modifies each tuple from this stream into one
(or zero) tuple of the same type
T . |
SPLStream |
parallel(int width)
Parallelizes the stream into a a fixed
number of parallel channels using round-robin distribution.
|
SPLStream |
parallel(Supplier<java.lang.Integer> width,
Function<com.ibm.streams.operator.Tuple,?> keyFunction)
Parallelizes the stream into a number of parallel channels
using key based distribution.
|
SPLStream |
parallel(Supplier<java.lang.Integer> width,
TStream.Routing routing)
Parallelizes the stream into
width parallel channels. |
SPLStream |
sample(double fraction)
Return a stream that is a random sample of this stream.
|
SPLStream |
setConsistent(ConsistentRegionConfig config)
Set the operator that is the source of this stream to be the start of a
consistent region to support at least once and exactly once
processing.
|
SPLStream |
setParallel(Supplier<java.lang.Integer> width)
Sets the current stream as the start of a parallel region.
|
SPLStream |
throttle(long delay,
java.util.concurrent.TimeUnit unit)
Throttle a stream by ensuring any tuple is submitted with least
delay from the previous tuple. |
TStream<com.ibm.json.java.JSONObject> |
toJSON()
Transform SPL tuples into JSON.
|
TStream<java.lang.String> |
toStringStream()
Create a TStream<Tuple> from this stream.
|
TStream<java.lang.String> |
toTupleString()
Create a stream that converts each input tuple on this stream to its SPL
character representation representation.
|
asType, connectTo, flatMap, forEach, getTupleClass, getTupleType, join, join, joinLast, joinLast, last, last, last, last, lastSeconds, map, multiTransform, output, parallel, print, publish, publish, publish, publish, sink, split, transform, union, union, window
addResourceTags, getInvocationName, getResourceTags, isPlaceable, operator
builder, topology
com.ibm.streams.operator.StreamSchema getSchema()
TStream<com.ibm.json.java.JSONObject> toJSON()
getSchema()
returns
JSONSchemas.JSON
then each tuple is taken as a serialized JSON and deserialized.
If the serialized JSON is an array,
then a JSON object is created, with
a single attribute payload
containing the deserialized
value.
com.ibm.streams.operator.encoding.JSONEncoding
.
JSONObject
.<T> TStream<T> convert(Function<com.ibm.streams.operator.Tuple,T> convertor)
transform(converter)
.convertor
- Function to convertT
transformed from this
stream's SPL tuples.TStream.transform(Function)
TStream<java.lang.String> toTupleString()
TStream<java.lang.String> toStringStream()
SPLStream
must
have a schema of SPLSchemas.STRING
.java.lang.IllegalStateException
- Stream does not have a value schema for TStream<Tuple>.SPLStreams.stringToSPLStream(TStream)
SPLStream endLowLatency()
---source---.lowLatency()---filter---transform---.endLowLatency()---filter2
It is guaranteed that the filter and transform operations will run in
the same process, but it is not guaranteed that the transform and
filter2 operations will run in the same process.
endLowLatency
in interface TStream<com.ibm.streams.operator.Tuple>
SPLStream filter(Predicate<com.ibm.streams.operator.Tuple> filter)
t
on this stream will appear in the returned stream if
filter.test(t)
returns true
. If
filter.test(t)
returns false
then then t
will not
appear in the returned stream.
Examples of filtering out all empty strings from stream s
of type
String
// Java 8 - Using lambda expression
TStream<String> s = ...
TStream<String> filtered = s.filter(t -> !t.isEmpty());
// Java 7 - Using anonymous class
TStream<String> s = ...
TStream<String> filtered = s.filter(new Predicate<String>() {
@Override
public boolean test(String t) {
return !t.isEmpty();
}} );
filter
in interface TStream<com.ibm.streams.operator.Tuple>
filter
- Filtering logic to be executed against each tuple.TStream.split(int, ToIntFunction)
SPLStream lowLatency()
TStream.endLowLatency()
is called.
---source---.lowLatency()---filter---transform---.endLowLatency()---
It is guaranteed that the filter and transform operations will run in
the same process.
lowLatency
in interface TStream<com.ibm.streams.operator.Tuple>
SPLStream isolate()
-->transform1-->.isolate()-->transform2-->transform3-->.isolate()-->sink
It is guaranteed that:
transform1
and transform2
will execute in separate processes.transform3
and sink
will execute in separate processes. t1, t2, t3
) are applied to a stream returned from isolate()
then it is guaranteed that each of them will execute in a separate operating
system process to this stream, but no guarantees are made about where t1, t2, t3
are placed in relationship to each other.
SPLStream modify(UnaryOperator<com.ibm.streams.operator.Tuple> modifier)
T
. For each tuple t
on this stream, the returned stream will contain a tuple that is the
result of modifier.apply(t)
when the return is not null
.
The function may return the same reference as its input t
or
a different object of the same type.
If modifier.apply(t)
returns null
then no tuple
is submitted to the returned stream for t
.
Example of modifying a stream String
values by adding the suffix 'extra
'.
TStream<String> strings = ...
TStream<String> modifiedStrings = strings.modify(new UnaryOperator() {
@Override
public String apply(String tuple) {
return tuple.concat("extra");
}});
This method is equivalent to
transform(Function<T,T> modifier
).
SPLStream sample(double fraction)
SPLStream throttle(long delay, java.util.concurrent.TimeUnit unit)
delay
from the previous tuple.SPLStream parallel(int width)
round-robin fashion
.
width
channels until TStream.endParallel()
is called or
the stream terminates.
TStream.parallel(Supplier, Routing)
for more information.SPLStream parallel(Supplier<java.lang.Integer> width, TStream.Routing routing)
width
parallel channels. Tuples are routed
to the parallel channels based on the TStream.Routing
parameter.
TStream.Routing.ROUND_ROBIN
is specified the tuples are routed to parallel channels such that an
even distribution is maintained.
TStream.Routing.HASH_PARTITIONED
is specified then the
hashCode()
of the tuple is used to route the tuple to a corresponding
channel, so that all tuples with the same hash code are sent to the same channel.
TStream.Routing.KEY_PARTITIONED
is specified each tuple is
is taken to be its own key and is
routed so that all tuples with the same key are sent to the same channel.
This is equivalent to calling TStream.parallel(Supplier, Function)
with
an identity function.
TStream.setParallel(Supplier)
for more information.
TStream<String> myStream = topology.source(...);
TStream<String> parallelStart = myStream.parallel(of(3), TStream.Routing.ROUND_ROBIN);
TStream<String> inParallel = parallelStart.map(...);
TStream<String> joinedParallelStreams = inParallel.endParallel();
joinedParallelStreams.print();
The following graph is created:
parallel(3)
creates three parallel channels. Each of the 3 channels contains separate
instantiations of the operations (in this case, just map) declared in the region. Such stream operations are
run in parallel as follows:
Execution Context | Parallel Behavior |
---|---|
Standalone | Each parallel channel is separately executed by one or more threads. The number of threads executing a channel is exactly 1 thread per input to the channel. |
Distributed | A parallel channel is never run in the same process as another parallel channel. Furthermore, a single parallel channel may executed across multiple processes, as determined by the Streams runtime. |
Streaming Analytics service | Same as Distributed. |
Embedded | All parallel information is ignored, and the application is executed without any added parallelism. |
TStream.parallel(int)
has been called, and an output
is a stream on which TStream.endParallel()
has been called.
A parallel region with multiple inputs is created if a stream in one parallel region connects with a stream in another
parallel region.
TStream.union(TStream)
using the other as a parameter.invokeOperator
which has multiple input ports.
TStream<String> firstStream = topology.source(...);
TStream<String> secondStream = topology.source(...);
TStream<String> firstParallelStart = firstStream.parallel(of(3), TStream.Routing.ROUND_ROBIN);
TStream<String> secondParallelStart = secondStream.parallel(of(3), TStream.Routing.ROUND_ROBIN);
TStream<String> fistMapOutput = firstParallelStart.map(...);
TStream<String> unionedStreams = firstMapOutput.union(secondParallelStart);
TStream<String> secondMapOutput = unionedStreams.map(...);
TStream<String> nonParallelStream = secondMapOutput.endParallel();
nonParallelStream.print();
This code results in the following graph:
TStream<String> myStream = topology.source(...);
TStream<String> myParallelStream = myStream.parallel(6);
myParallelStream.print();
In the above code, the parallel region is implicitly ended by the sink, without calling
TStream.endParallel()
This results in the following graph:
TStream.endParallel()
on multiple streams within the same parallel region. For example, the following code defines a parallel
region with multiple output streams:
TStream<String> myStream = topology.source(...);
TStream<String> parallelStart = myStream.parallel(of(3), TStream.Routing.ROUND_ROBIN);
TStream<String> firstInParallel = parallelStart.map(...);
TStream<String> secondInParallel = parallelStart.map(...);
TStream<String> firstParallelOutput = firstInParallel.endParallel();
TStream<String> secondParallelOutput = secondInParallel.endParallel();
The above code would yield the following graph:
TStream<String> firstStream = topology.source(...);
TStream<String> secondStream = topology.source(...);
TStream<String> parallelStream = firstStream.parallel(of(3), TStream.Routing.ROUND_ROBIN);
TStream<String> secondParallelStart =
TStream<String> firstInParallel = firstParallelStart.map(...);
TStream<String> secondInParallel = secondParallelStart.map(...);
TStream<String> unionStream = firstInParallel.union(secondInParallel);
TStream<String> nonParallelStream = unionStream.endParallel();
nonParallelStream.print();
Once connected, the stream outside of the parallel region (and all of its prior operations)
becomes part of the parallel region:
TStream.parallel(int)
is invoked on a stream which is already inside of a parallel region.
TStream.parallel(int)
twice on the same stream creates a nested
parallel region as follows:
TStream<String> stream = topology.source(...);
stream.parallel(3).map(...).parallel(3).map(...).endParallel().endParallel();
Results in a graph of the following structure:
parallel(3)
,
the second map operation is instantiated a total of 9 times since each of the 3 enclosing
parallel channels holds 3 nested parallel channels. The TStream#Routing
configurations
of the enclosing and nested parallel regions do not need to match.
TStream<String> streamToJoin = topology.source(...);
streamToJoin.setParallel();
streamToJoin = streamToJoin.map(...).endParallel();
TStream<String> parallelStream = topology.source(...);
parallelStream = parallelStream.parallel(4);
parallelStream = parallelStream.union(streamToJoin);
parallelStream.map(...).endParallel().print();
This results in the following graph structure:
endParallel()
must have a call to parallel(...)
preceding it. The
following is invalid:
TStream myStream = topology.strings("a","b","c");
myStream.endParallel();
A parallelized stream cannot be joined with another window, and a
parallelized window cannot be joined with a stream. The following is
invalid:
TWindow numWindow = topology.strings("1","2","3").last(3);
TStream stringStream = topology.strings("a","b","c");
TStream paraStringStream = myStream.parallel(5);
TStream filtered = myStream.filter(...);
filtered.join(numWindow, ...);
parallel
in interface TStream<com.ibm.streams.operator.Tuple>
width
- The degree of parallelism. see TStream.parallel(int width)
for more details.routing
- Defines how tuples will be routed channels.Topology.createSubmissionParameter(String, Class)
,
TStream.split(int, ToIntFunction)
SPLStream parallel(Supplier<java.lang.Integer> width, Function<com.ibm.streams.operator.Tuple,?> keyFunction)
t
keyer.apply(t)
is called
and then the tuples are routed
so that all tuples with the
same key are sent to the same channel
.parallel
in interface TStream<com.ibm.streams.operator.Tuple>
width
- The degree of parallelism.keyFunction
- Function to obtain the key from each tuple.width
channels
at the beginning of the parallel region.TStream.Routing.KEY_PARTITIONED
,
TStream.parallel(Supplier, Routing)
SPLStream setParallel(Supplier<java.lang.Integer> width)
setParallel
in interface TStream<com.ibm.streams.operator.Tuple>
width
- The degree of parallelism.TStream.parallel(int)
,
TStream.parallel(Supplier, Routing)
,
TStream.parallel(Supplier, Function)
SPLStream endParallel()
endParallel
in interface TStream<com.ibm.streams.operator.Tuple>
TStream.parallel(int)
,
TStream.parallel(Supplier, Routing)
,
TStream.parallel(Supplier, Function)
SPLStream autonomous()
embedded
and standalone
contexts and will be ignored.autonomous
in interface TStream<com.ibm.streams.operator.Tuple>
SPLStream setConsistent(ConsistentRegionConfig config)
TStream.autonomous()
.
Consistent regions are only supported in distributed contexts.
This must be called on a stream directly produced by an
SPL operator that supports consistent regions.
Source streams produced by methods on Topology
do not support consistent regions.
setConsistent
in interface TStream<com.ibm.streams.operator.Tuple>
ConsistentRegionConfig
SPLStream colocate(Placeable<?>... elements)
elements
may contain any Placeable
within
the same topology, there is no requirement
that the element is connected (by a stream) directly or indirectly
to this element.
When a set of colocated elements are completely within a single parallel
region then the colocation constraint is limited to that channel. So if elements
A
and B
are colocated then it
is guaranteed that A[0],B[0]
from channel 0 are in a single PE
and A[1],B[1]
from channel 1 are in a single PE,
but typically a different PE to A[0],B[0]
.
When a a set of colocated elements are not completely within a single parallel
region then the colocation constraint applies to the whole topology.
So if A,B
are in a parallel region but C
is outside then
all the replicas of A,B
are colocated with C
and each other.
Colocation is also referred to as fusing, when topology elements are connected by streams then the communication between them is in-process using direct method calls (instead of a network transport such as TCP).
SPLStream invocationName(java.lang.String name)
Names must be unique within a topology, if this name
is already in use then the 'name_N'
will be used
where N
is a number that makes the name unique
within the topology.
For example the invocation for sending alerts as SMS (text) messages could be named as SMSAlerts:
TStream<Alert> alerts = ...
alerts.forEach(Alert::sendAsTextMsg).name("SMSAlerts");
Note that name
will eventually resolve into an identifier for
an operator name and/or a stream name. Identifiers are limited
to ASCII characters, thus name
will be modified at code
generation time if it cannot be represented as an identifier.
invocationName
in interface Placeable<TStream<com.ibm.streams.operator.Tuple>>
name
- Name to assigned.Placeable.getInvocationName()