public class Topology extends java.lang.Object implements TopologyElement
source(Supplier)
,
subscribe(String, Class)
, strings(String...)
. com.ibm.streamsx.topology.streams
package
provide specific source streams, or transformations on streams with specific
types.
A Topology
has a namespace and a name. When a Streams application is
created application name will be namespace::name
. Thus:
namespace.name.sab
, namespace::name_jobid
.Modifier and Type | Field and Description |
---|---|
static java.util.logging.Logger |
TOPOLOGY_LOGGER
Logger used for the Topology API, name
com.ibm.streamsx.topology . |
Constructor and Description |
---|
Topology()
Create an new topology with a default name.
|
Topology(java.lang.String namespace,
java.lang.String name)
Create an new topology with a given name and namespace.
|
Topology(java.lang.String name)
Create an new topology with a given name.
|
Modifier and Type | Method and Description |
---|---|
void |
addClassDependency(java.lang.Class<?> clazz)
Ensures that a class file is loaded into the application's runtime.
|
void |
addFileDependency(java.lang.String location,
java.lang.String dstDirName)
Add file or directory tree
location to directory
dstDirName in the application bundle, making it
available to the topology's SPL operators at runtime - e.g.,
an operator configuration file in "etc" in the application directory. |
void |
addJarDependency(java.lang.String location)
Includes a jar file, specified by the
location String, into
the application runtime. |
void |
addJobControlPlane()
Add job control plane to this topology.
|
com.ibm.streamsx.topology.builder.GraphBuilder |
builder()
Get the underlying
OperatorGraph . |
void |
checkpointPeriod(long period,
java.util.concurrent.TimeUnit unit)
Checkpoint the state of the graph periodically.
|
<T> TStream<T> |
constants(java.util.List<T> data)
Create a stream containing all the tuples in
data . |
<T> Supplier<T> |
createSubmissionParameter(java.lang.String name,
java.lang.Class<T> valueClass)
Create a submission parameter without a default value.
|
<T> Supplier<T> |
createSubmissionParameter(java.lang.String name,
T defaultValue)
Create a submission parameter with a default value.
|
<T> TStream<T> |
endlessSource(Supplier<T> data)
Declare an endless source stream.
|
<T> TStream<T> |
endlessSourceN(Function<java.lang.Long,T> data)
Declare an endless source stream.
|
void |
finalizeGraph(StreamsContext<?> context)
Resolves the jar dependencies, sets the respective parameters.
|
java.util.Map<java.lang.String,java.lang.Object> |
getConfig() |
com.ibm.streamsx.topology.internal.core.DependencyResolver |
getDependencyResolver()
Internal use only.
|
java.lang.String |
getName()
Name of this topology.
|
java.lang.String |
getNamespace()
Namespace of this topology.
|
Tester |
getTester()
Get the tester for this topology.
|
boolean |
hasTester()
Has the tester been created for this topology.
|
<T> TStream<T> |
limitedSource(Supplier<T> data,
long count)
Declare a limited source stream, where the number of tuples is limited to
count . |
<T> TStream<T> |
limitedSourceN(Function<java.lang.Long,T> data,
long count)
Declare a limited source stream, where the number of tuples is limited to
count . |
TStream<java.lang.Number> |
numbers(java.lang.Number... tuples)
Create a stream of
Number tuples. |
<T> TStream<T> |
periodicMultiSource(Supplier<java.lang.Iterable<T>> data,
long period,
java.util.concurrent.TimeUnit unit)
Declare a new source stream that calls
data.get() periodically. |
<T> TStream<T> |
periodicSource(Supplier<T> data,
long period,
java.util.concurrent.TimeUnit unit)
Declare a new source stream that calls
data.get() periodically. |
<T> TStream<T> |
source(Supplier<java.lang.Iterable<T>> data)
Declare a new source stream that iterates over the return of
Iterable<T> get() from data . |
TStream<java.lang.String> |
strings(java.lang.String... tuples)
Create a stream of
String tuples. |
<T> TStream<T> |
subscribe(java.lang.String topic,
java.lang.Class<T> tupleTypeClass)
Declare a stream that is a subscription to
topic . |
<T> TStream<T> |
subscribe(Supplier<java.lang.String> topic,
java.lang.Class<T> tupleTypeClass)
Declare a stream that is a subscription to
topic . |
Topology |
topology()
Return this topology.
|
public static java.util.logging.Logger TOPOLOGY_LOGGER
com.ibm.streamsx.topology
.public Topology()
main
then the the calling
class is used if it is available.
Topology
is used.public Topology(java.lang.String name)
name
- Name of the topology.public Topology(java.lang.String namespace, java.lang.String name)
namespace
- Namespace of the topology.name
- Name of the topology.public java.lang.String getName()
public java.lang.String getNamespace()
public java.util.Map<java.lang.String,java.lang.Object> getConfig()
public Topology topology()
this
.topology
in interface TopologyElement
public TStream<java.lang.String> strings(java.lang.String... tuples)
String
tuples.tuples
- tuples
.public TStream<java.lang.Number> numbers(java.lang.Number... tuples)
Number
tuples.tuples
- tuples
.public <T> TStream<T> constants(java.util.List<T> data)
data
.data
- List of tuples.data
.public <T> TStream<T> source(Supplier<java.lang.Iterable<T>> data)
Iterable<T> get()
from data
. Once all the tuples from
data.get()
have been submitted on the stream, no more tuples are
submitted. In some cases the iteration may never complete leading
to an endless stream.data
- Function that produces that data for the stream.data.get()
.public <T> TStream<T> periodicMultiSource(Supplier<java.lang.Iterable<T>> data, long period, java.util.concurrent.TimeUnit unit)
data.get()
periodically. Each non-null value
present in from the returned Iterable
will
appear on the returned stream. If there is no data to be
sent then an empty Iterable
must be returned.
Thus each call to {code data.get()} will result in
zero, one or N tuples on the stream.data
- Function that produces that data for the stream.period
- Approximate period {code data.get()} will be called.unit
- Time unit of period
.data.get()
.public <T> TStream<T> periodicSource(Supplier<T> data, long period, java.util.concurrent.TimeUnit unit)
data.get()
periodically. Each non-null value
returned will appear on the returned stream.
Thus each call to {code data.get()} will result in
zero tuples or one tuple on the stream.data
- Function that produces that data for the stream.period
- Approximate period {code data.get()} will be called.unit
- Time unit of period
.data.get()
.public <T> TStream<T> endlessSource(Supplier<T> data)
data.get()
will be called repeatably.
Each non-null returned value will be present on the stream.data
- Supplier of the tuples.data.get()
.public <T> TStream<T> endlessSourceN(Function<java.lang.Long,T> data)
data.apply(n)
will be called repeatably, where n
is the iteration number,
starting at zero. Each
non-null returned value will be present on the stream.data
- Supplier of the tuples.data.apply(n)
.public <T> TStream<T> limitedSource(Supplier<T> data, long count)
count
. data.get()
will be called count
number of
times. Each non-null returned value will be present on the stream.data
- Supplier of the tuples.count
- Maximum number of tuples that will be seen on the stream.data.get()
.public <T> TStream<T> limitedSourceN(Function<java.lang.Long,T> data, long count)
count
. data.apply(n)
will be called count
number
of times, where n
is the iteration number, starting at zero. Each
non-null returned value will be present on the stream.data
- Supplier of the tuples.count
- Maximum number of tuples that will be seen on the stream.data.apply(n)
.public <T> TStream<T> subscribe(java.lang.String topic, java.lang.Class<T> tupleTypeClass)
topic
.
A topic is published using TStream.publish(String)
.
Subscribers are matched to published streams when the topic
is an exact match and the type of the stream (T
,
tupleTypeClass
) is an exact match.
Publish-subscribe only works when the topology is
submitted to a StreamsContext.Type.DISTRIBUTED
or StreamsContext.Type.STREAMING_ANALYTICS_SERVICE
context. This allows different applications (or
even within the same application) to communicate
using published streams.
If tupleTypeClass
is JSONObject.class
then the
subscription is the generic IBM Streams schema for JSON
(JSONSchemas.JSON
). Streams of type JSONObject
are always published and subscribed using the generic schema
to allow interchange between applications implemented in
different languages.
topic
- Topic to subscribe to.tupleTypeClass
- Type to subscribe to.TStream.publish(String)
,
SPLStreams.subscribe(TopologyElement, String, com.ibm.streams.operator.StreamSchema)
public <T> TStream<T> subscribe(Supplier<java.lang.String> topic, java.lang.Class<T> tupleTypeClass)
topic
.
Differs from subscribe(String, Class)
in that it
supports topic
as a submission time parameter, for example
using the topic defined by the submission parameter eventTopic
:
Supplier topicParam = topology.createSubmissionParameter("eventTopic", String.class);
TStream events = topology.subscribe(topicParam, String.class);
topic
- Topic to subscribe to.tupleTypeClass
- Type to subscribe to.subscribe(String, Class)
public void finalizeGraph(StreamsContext<?> context) throws java.lang.Exception
java.lang.Exception
public void addJarDependency(java.lang.String location)
location
String, into
the application runtime. For example, the following code includes the
myResource.jar jar file such that it could be used when running in a
non-EMBEDDED context.
Topology top = new Topology("myTopology");
top.addJarDependency("./libs/myResource.jar");
For running embedded, simply adding the jar to the classpath when
compiling/running is sufficient.location
- The location of a jar to be included in the application's
runtime when submitting with a DISTRIBUTED or STANDALONE context.public void addClassDependency(java.lang.Class<?> clazz)
upperDir
|_com
|_foo
|_bar
| |_bar.class
| |_baz.class
|_fiz
|_buzz.class
and addClassDependency is invoked as follows:
Topology top = new Topology("myTopology");
top.addJarDependency(bar.class);
Then the entire contents of upperDir is turned into a jar file and included
in the application's runtime -- this includes baz and buzz, not just bar!
addJarDependency(String location)
,
this is only required when using additional jars when running in
a non-EMBEDDED context.clazz
- The class of a resource to be included in the
application's runtime when submitting with a DISTRIBUTED or STANDALONE
context.public void addFileDependency(java.lang.String location, java.lang.String dstDirName)
location
to directory
dstDirName
in the application bundle, making it
available to the topology's SPL operators at runtime - e.g.,
an operator configuration file in "etc" in the application directory.
Use addClassDependency(Class)
or addJarDependency(String)
to add class or jar dependencies.
Functional logic implementations that need to access resources should
package the resources in a jar or classes directory, add that to the
topology as a dependency using addJarDependency(String)
or addClassDependency(Class)
and access them as resources from
the class loader as described here:
https://docs.oracle.com/javase/tutorial/deployment/webstart/retrievingResources.html
Legal values for dstDirName
are etc
or opt
.
e.g.,
// add "myConfigFile" to the bundle's "etc" directory addFileDependency("etc", "/tmp/myConfigFile"); // add "myApp" directory tree to the bundle's "etc" directory addFileDependency("etc", "/tmp/myApp");
location
- path to a file or directorydstDirName
- name of directory in the bundlejava.lang.IllegalArgumentException
- if dstDirName
is not etc
or opt
, or location
is not a file or directory.public Tester getTester()
Tester
only impacts
the topology if submitted to a StreamsContext
of type
StreamsContext.Type.EMBEDDED_TESTER
,
StreamsContext.Type.STANDALONE_TESTER
,
or StreamsContext.Type.DISTRIBUTED_TESTER
.public final boolean hasTester()
getTester()
has been called.public void checkpointPeriod(long period, java.util.concurrent.TimeUnit unit)
period
and unit
. Every element persists
its state autonomously, asynchronously with processing
its streams.
For stream processing elements defined by Java functions
(such as source(Supplier)
and
TStream.transform(Function)
) the state is the
serialized form of the object representing the function.
Synchronization is applied to ensure that checkpointed state of
the object does not include inconsistencies due to ongoing stream processing.
If the function object is immutable then no checkpointing occurs for that element.
A function object is taken as mutable if any of these conditions are true:
Checkpointing is only supported in distributed contexts.
period
- Approximate period for checkpointing.unit
- Time unit of period
.public com.ibm.streamsx.topology.internal.core.DependencyResolver getDependencyResolver()
public com.ibm.streamsx.topology.builder.GraphBuilder builder()
TopologyElement
OperatorGraph
. Internal use only.
builder
in interface TopologyElement
public <T> Supplier<T> createSubmissionParameter(java.lang.String name, java.lang.Class<T> valueClass)
A submission parameter is a handle for a T
whose actual value
is not defined until topology submission time. Submission
parameters enable the creation of more reusable topology bundles.
A submission parameter has a name
. The name must be unique
within the topology.
The parameter is a Supplier
.
Prior to submitting the topology, while constructing the topology,
parameter.get()
returns null.
When the topology is submitted, parameter.get()
in the executing topology returns the actual submission time value
(or the default value see createSubmissionParameter(String, Object)
).
Submission parameters may be used within functional logic. e.g.,
Supplier<Integer> threshold = topology.createSubmissionParameter("threshold", 100);
TStream<Integer> s = ...;
// with a Java8 lambda expression
TStream<Integer> filtered1 = s.filter(v -> v > threshold.get());
// without
TStream<Integer> filtered2 = s.filter(new Predicate() {
public boolean test(Integer v) {
return v > threshold.get();
}} );
Submission parameters may also be used for values in various
cases such as TStream.parallel(Supplier)
width value
and MQTT connector configuration and topic values.
e.g.,
Supplier<Integer> width = topology.createSubmissionParameter("width", 1);
TStream<String> s = ...;
TStream<String> parallel_start = s.parallel(width);
TStream<String> in_parallel = parallel_start.filter(...).transform(...);
TStream<String> joined_parallel_streams = in_parallel.endParallel();
Finally, submission parameters may be used in Java Primitive Operator and SPL Operator parameter values.
The submission parameter's name is used to supply an actual value
at topology submission time
via StreamsContext.submit(com.ibm.streamsx.topology.Topology, java.util.Map)
and ContextProperties.SUBMISSION_PARAMS
,
or when submitting a topology bundle for execution via other
execution runtime native mechanisms such as IBM Streams streamtool
.
Topology submission behavior when a submission parameter
lacking a default value is created and a value is not provided at
submission time is defined by the underlying topology execution runtime.
Submission fails for contexts DISTRIBUTED
, STANDALONE
,
ANALYTIC_SERVICE
, or EMBEDDED
.
name
- submission parameter namevalueClass
- class object for T
Supplier<T>
for the submission parameterjava.lang.IllegalArgumentException
- if name
is null, empty,
or has already been defined.public <T> Supplier<T> createSubmissionParameter(java.lang.String name, T defaultValue)
See createSubmissionParameter(String, Class)
for a description
of submission parameters.
name
- submission parameter namedefaultValue
- default value if parameter isn't specified.Supplier<T>
for the submission parameterjava.lang.IllegalArgumentException
- if name
is null, empty,
or has already been defined.java.lang.IllegalArgumentException
- if defaultValue
is nullpublic void addJobControlPlane()
Multiple calls to this method may be made, a single job control plane will be created.
A job control plane is required when a consistent region is used and is added automatically by TODO-add link.
Job control plane is only supported in distributed contexts.