com.ibm.streamsx.topology

Class Topology

  • java.lang.Object
    • com.ibm.streamsx.topology.Topology
  • All Implemented Interfaces:
    TopologyElement


    public class Topology
    extends java.lang.Object
    implements TopologyElement
    A declaration of a topology of streaming data. This class provides some fundamental generic methods to create source streams, such as source(Supplier), subscribe(String, Class), strings(String...).
    Utility methods in the com.ibm.streamsx.topology.streams package provide specific source streams, or transformations on streams with specific types.

    A Topology has a namespace and a name. When a Streams application is created application name will be namespace::name. Thus:

    • the Streams application bundle will be named namespace.name.sab,
    • when submitted the job name (if not supplied) will be namespace::name_jobid.
    Note that if a namespace or name has non ASCII characters then actual values used will be modified to only contain ASCII characters.

    • Field Summary

      Fields 
      Modifier and Type Field and Description
      static java.util.logging.Logger TOPOLOGY_LOGGER
      Logger used for the Topology API, name com.ibm.streamsx.topology.
    • Constructor Summary

      Constructors 
      Constructor and Description
      Topology()
      Create an new topology with a default name.
      Topology(java.lang.String namespace, java.lang.String name)
      Create an new topology with a given name and namespace.
      Topology(java.lang.String name)
      Create an new topology with a given name.
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method and Description
      void addClassDependency(java.lang.Class<?> clazz)
      Ensures that a class file is loaded into the application's runtime.
      void addFileDependency(java.lang.String location, java.lang.String dstDirName)
      Add file or directory tree location to directory dstDirName in the application bundle, making it available to the topology's SPL operators at runtime - e.g., an operator configuration file in "etc" in the application directory.
      void addJarDependency(java.lang.String location)
      Includes a jar file, specified by the location String, into the application runtime.
      void addJobControlPlane()
      Add job control plane to this topology.
      com.ibm.streamsx.topology.builder.GraphBuilder builder()
      Get the underlying OperatorGraph.
      void checkpointPeriod(long period, java.util.concurrent.TimeUnit unit)
      Checkpoint the state of the graph periodically.
      <T> TStream<T> constants(java.util.List<T> data)
      Create a stream containing all the tuples in data.
      <T> Supplier<T> createSubmissionParameter(java.lang.String name, java.lang.Class<T> valueClass)
      Create a submission parameter without a default value.
      <T> Supplier<T> createSubmissionParameter(java.lang.String name, T defaultValue)
      Create a submission parameter with a default value.
      <T> TStream<T> endlessSource(Supplier<T> data)
      Declare an endless source stream.
      <T> TStream<T> endlessSourceN(Function<java.lang.Long,T> data)
      Declare an endless source stream.
      void finalizeGraph(StreamsContext<?> context)
      Resolves the jar dependencies, sets the respective parameters.
      java.util.Map<java.lang.String,java.lang.Object> getConfig() 
      com.ibm.streamsx.topology.internal.core.DependencyResolver getDependencyResolver()
      Internal use only.
      java.lang.String getName()
      Name of this topology.
      java.lang.String getNamespace()
      Namespace of this topology.
      Tester getTester()
      Get the tester for this topology.
      boolean hasTester()
      Has the tester been created for this topology.
      <T> TStream<T> limitedSource(Supplier<T> data, long count)
      Declare a limited source stream, where the number of tuples is limited to count.
      <T> TStream<T> limitedSourceN(Function<java.lang.Long,T> data, long count)
      Declare a limited source stream, where the number of tuples is limited to count.
      TStream<java.lang.Number> numbers(java.lang.Number... tuples)
      Create a stream of Number tuples.
      <T> TStream<T> periodicMultiSource(Supplier<java.lang.Iterable<T>> data, long period, java.util.concurrent.TimeUnit unit)
      Declare a new source stream that calls data.get() periodically.
      <T> TStream<T> periodicSource(Supplier<T> data, long period, java.util.concurrent.TimeUnit unit)
      Declare a new source stream that calls data.get() periodically.
      <T> TStream<T> source(Supplier<java.lang.Iterable<T>> data)
      Declare a new source stream that iterates over the return of Iterable<T> get() from data.
      TStream<java.lang.String> strings(java.lang.String... tuples)
      Create a stream of String tuples.
      <T> TStream<T> subscribe(java.lang.String topic, java.lang.Class<T> tupleTypeClass)
      Declare a stream that is a subscription to topic.
      <T> TStream<T> subscribe(Supplier<java.lang.String> topic, java.lang.Class<T> tupleTypeClass)
      Declare a stream that is a subscription to topic.
      Topology topology()
      Return this topology.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Field Detail

      • TOPOLOGY_LOGGER

        public static java.util.logging.Logger TOPOLOGY_LOGGER
        Logger used for the Topology API, name com.ibm.streamsx.topology.
    • Constructor Detail

      • Topology

        public Topology()
        Create an new topology with a default name. The name is taken to be the calling method name if it is available. If the calling method name is main then the the calling class is used if it is available.
        If a name cannot be determined then Topology is used.
      • Topology

        public Topology(java.lang.String name)
        Create an new topology with a given name.
        Parameters:
        name - Name of the topology.
      • Topology

        public Topology(java.lang.String namespace,
                        java.lang.String name)
        Create an new topology with a given name and namespace.
        Parameters:
        namespace - Namespace of the topology.
        name - Name of the topology.
        Since:
        1.7
    • Method Detail

      • getName

        public java.lang.String getName()
        Name of this topology.
        Returns:
        Name of this topology.
      • getNamespace

        public java.lang.String getNamespace()
        Namespace of this topology.
        Returns:
        Namespace of this topology.
        Since:
        1.7
      • getConfig

        public java.util.Map<java.lang.String,java.lang.Object> getConfig()
      • topology

        public Topology topology()
        Return this topology. Returns this.
        Specified by:
        topology in interface TopologyElement
        Returns:
        The topology for this element.
      • strings

        public TStream<java.lang.String> strings(java.lang.String... tuples)
        Create a stream of String tuples.
        Parameters:
        tuples -
        Returns:
        Stream containing tuples.
      • numbers

        public TStream<java.lang.Number> numbers(java.lang.Number... tuples)
        Create a stream of Number tuples.
        Parameters:
        tuples -
        Returns:
        Stream containing tuples.
      • constants

        public <T> TStream<T> constants(java.util.List<T> data)
        Create a stream containing all the tuples in data.
        Parameters:
        data - List of tuples.
        Returns:
        Declared stream containing tuples from data.
      • source

        public <T> TStream<T> source(Supplier<java.lang.Iterable<T>> data)
        Declare a new source stream that iterates over the return of Iterable<T> get() from data. Once all the tuples from data.get() have been submitted on the stream, no more tuples are submitted. In some cases the iteration may never complete leading to an endless stream.
        Parameters:
        data - Function that produces that data for the stream.
        Returns:
        New stream containing the tuples from the iterator returned by data.get().
      • periodicMultiSource

        public <T> TStream<T> periodicMultiSource(Supplier<java.lang.Iterable<T>> data,
                                                  long period,
                                                  java.util.concurrent.TimeUnit unit)
        Declare a new source stream that calls data.get() periodically. Each non-null value present in from the returned Iterable will appear on the returned stream. If there is no data to be sent then an empty Iterable must be returned. Thus each call to {code data.get()} will result in zero, one or N tuples on the stream.
        Parameters:
        data - Function that produces that data for the stream.
        period - Approximate period {code data.get()} will be called.
        unit - Time unit of period.
        Returns:
        New stream containing the tuples from the iterator returned by data.get().
      • periodicSource

        public <T> TStream<T> periodicSource(Supplier<T> data,
                                             long period,
                                             java.util.concurrent.TimeUnit unit)
        Declare a new source stream that calls data.get() periodically. Each non-null value returned will appear on the returned stream. Thus each call to {code data.get()} will result in zero tuples or one tuple on the stream.
        Parameters:
        data - Function that produces that data for the stream.
        period - Approximate period {code data.get()} will be called.
        unit - Time unit of period.
        Returns:
        New stream containing the tuples returned by data.get().
      • endlessSource

        public <T> TStream<T> endlessSource(Supplier<T> data)
        Declare an endless source stream. data.get() will be called repeatably. Each non-null returned value will be present on the stream.
        Parameters:
        data - Supplier of the tuples.
        Returns:
        New stream containing the tuples from calls to data.get() .
      • endlessSourceN

        public <T> TStream<T> endlessSourceN(Function<java.lang.Long,T> data)
        Declare an endless source stream. data.apply(n) will be called repeatably, where n is the iteration number, starting at zero. Each non-null returned value will be present on the stream.
        Parameters:
        data - Supplier of the tuples.
        Returns:
        New stream containing the tuples from calls to data.apply(n).
      • limitedSource

        public <T> TStream<T> limitedSource(Supplier<T> data,
                                            long count)
        Declare a limited source stream, where the number of tuples is limited to count. data.get() will be called count number of times. Each non-null returned value will be present on the stream.
        Parameters:
        data - Supplier of the tuples.
        count - Maximum number of tuples that will be seen on the stream.
        Returns:
        New stream containing the tuples from calls to data.get() .
      • limitedSourceN

        public <T> TStream<T> limitedSourceN(Function<java.lang.Long,T> data,
                                             long count)
        Declare a limited source stream, where the number of tuples is limited to count. data.apply(n) will be called count number of times, where n is the iteration number, starting at zero. Each non-null returned value will be present on the stream.
        Parameters:
        data - Supplier of the tuples.
        count - Maximum number of tuples that will be seen on the stream.
        Returns:
        New stream containing the tuples from calls to data.apply(n).
      • subscribe

        public <T> TStream<T> subscribe(java.lang.String topic,
                                        java.lang.Class<T> tupleTypeClass)
        Declare a stream that is a subscription to topic. A topic is published using TStream.publish(String). Subscribers are matched to published streams when the topic is an exact match and the type of the stream (T, tupleTypeClass) is an exact match.
        Publish-subscribe is a many to many relationship, multiple streams from multiple applications may be published on the same topic and type. Multiple subscribers may subscribe to a topic and type.
        A subscription will match all publishers using the same topic and tuple type. Tuples on the published streams will appear on the returned stream, as a single stream.
        The subscription is dynamic, the returned stream will subscribe to a matching stream published by a newly submitted application (a job), and stops a subscription when an running job is cancelled.

        Publish-subscribe only works when the topology is submitted to a StreamsContext.Type.DISTRIBUTED or StreamsContext.Type.STREAMING_ANALYTICS_SERVICE context. This allows different applications (or even within the same application) to communicate using published streams.

        If tupleTypeClass is JSONObject.class then the subscription is the generic IBM Streams schema for JSON (JSONSchemas.JSON). Streams of type JSONObject are always published and subscribed using the generic schema to allow interchange between applications implemented in different languages.

        Parameters:
        topic - Topic to subscribe to.
        tupleTypeClass - Type to subscribe to.
        Returns:
        Stream the will contain tuples from matching publishers.
        See Also:
        TStream.publish(String), SPLStreams.subscribe(TopologyElement, String, com.ibm.streams.operator.StreamSchema)
      • subscribe

        public <T> TStream<T> subscribe(Supplier<java.lang.String> topic,
                                        java.lang.Class<T> tupleTypeClass)
        Declare a stream that is a subscription to topic. Differs from subscribe(String, Class) in that it supports topic as a submission time parameter, for example using the topic defined by the submission parameter eventTopic:
         
         Supplier topicParam = topology.createSubmissionParameter("eventTopic", String.class);
         TStream events = topology.subscribe(topicParam, String.class);
         
         
        Parameters:
        topic - Topic to subscribe to.
        tupleTypeClass - Type to subscribe to.
        Returns:
        Stream the will contain tuples from matching publishers.
        Since:
        1.8
        See Also:
        subscribe(String, Class)
      • finalizeGraph

        public void finalizeGraph(StreamsContext<?> context)
                           throws java.lang.Exception
        Resolves the jar dependencies, sets the respective parameters. Internal use only.
        Not intended to be called by applications, may be removed at any time.
        Throws:
        java.lang.Exception
      • addJarDependency

        public void addJarDependency(java.lang.String location)
        Includes a jar file, specified by the location String, into the application runtime. For example, the following code includes the myResource.jar jar file such that it could be used when running in a non-EMBEDDED context.
        
         Topology top = new Topology("myTopology");
         top.addJarDependency("./libs/myResource.jar");
         
        For running embedded, simply adding the jar to the classpath when compiling/running is sufficient.
        Parameters:
        location - The location of a jar to be included in the application's runtime when submitting with a DISTRIBUTED or STANDALONE context.
      • addClassDependency

        public void addClassDependency(java.lang.Class<?> clazz)
        Ensures that a class file is loaded into the application's runtime. If the .class file is contained in a jar file, the jar file is also included in the application's runtime. If the .class file is in the directory structure of a package, the top-level directory is converted to a jar file and included in the application's runtime.

        For example, if the class exists in the following package directory structure:
        
         upperDir
             |_com
                 |_foo
                     |_bar
                     |   |_bar.class
                     |   |_baz.class
                     |_fiz
                         |_buzz.class
         
        and addClassDependency is invoked as follows:
        
         Topology top = new Topology("myTopology");
         top.addJarDependency(bar.class);
         
        Then the entire contents of upperDir is turned into a jar file and included in the application's runtime -- this includes baz and buzz, not just bar!

        As with addJarDependency(String location), this is only required when using additional jars when running in a non-EMBEDDED context.
        Parameters:
        clazz - The class of a resource to be included in the application's runtime when submitting with a DISTRIBUTED or STANDALONE context.
      • addFileDependency

        public void addFileDependency(java.lang.String location,
                                      java.lang.String dstDirName)
        Add file or directory tree location to directory dstDirName in the application bundle, making it available to the topology's SPL operators at runtime - e.g., an operator configuration file in "etc" in the application directory.

        Use addClassDependency(Class) or addJarDependency(String) to add class or jar dependencies.

        Functional logic implementations that need to access resources should package the resources in a jar or classes directory, add that to the topology as a dependency using addJarDependency(String) or addClassDependency(Class) and access them as resources from the class loader as described here: https://docs.oracle.com/javase/tutorial/deployment/webstart/retrievingResources.html

        Legal values for dstDirName are etc or opt.

        e.g.,

         // add "myConfigFile" to the bundle's "etc" directory
         addFileDependency("etc", "/tmp/myConfigFile");
         
         // add "myApp" directory tree to the bundle's "etc" directory
         addFileDependency("etc", "/tmp/myApp");
         
        Parameters:
        location - path to a file or directory
        dstDirName - name of directory in the bundle
        Throws:
        java.lang.IllegalArgumentException - if dstDirName is not etc or opt, or location is not a file or directory.
      • hasTester

        public final boolean hasTester()
        Has the tester been created for this topology. Returns true if getTester() has been called.
        Returns:
        True if the tester has been created, false otherwise.
      • checkpointPeriod

        public void checkpointPeriod(long period,
                                     java.util.concurrent.TimeUnit unit)
        Checkpoint the state of the graph periodically. Each stateful element in the topology checkpoints its state periodically according to period and unit. Every element persists its state autonomously, asynchronously with processing its streams.
        Upon a failure of an element's container the element will restart in a new container using its last checkpointed state. If no state is available, due to a failure before the first checkpoint, then the element reverts to its initial state.

        For stream processing elements defined by Java functions (such as source(Supplier) and TStream.transform(Function)) the state is the serialized form of the object representing the function. Synchronization is applied to ensure that checkpointed state of the object does not include inconsistencies due to ongoing stream processing.
        If the function object is immutable then no checkpointing occurs for that element. A function object is taken as mutable if any of these conditions are true:

        • It contains at least one non-transient, non-final instance field.
        • A final instance field is a reference to a mutable object. Note that identification of what is a immutable object may be limited and so in some cases function objects may be checkpointed even though they are immutable.
        Otherwise the function object is taken as immutable.

        Checkpointing is only supported in distributed contexts.

        Parameters:
        period - Approximate period for checkpointing.
        unit - Time unit of period.
      • getDependencyResolver

        public com.ibm.streamsx.topology.internal.core.DependencyResolver getDependencyResolver()
        Internal use only.
        Not intended to be called by applications, may be removed at any time.
        Returns:
        the dependencyResolver
      • builder

        public com.ibm.streamsx.topology.builder.GraphBuilder builder()
        Description copied from interface: TopologyElement
        Get the underlying OperatorGraph. Internal use only.
        Not intended to be called by applications, may be removed at any time.
        Specified by:
        builder in interface TopologyElement
      • createSubmissionParameter

        public <T> Supplier<T> createSubmissionParameter(java.lang.String name,
                                                         java.lang.Class<T> valueClass)
        Create a submission parameter without a default value.

        A submission parameter is a handle for a T whose actual value is not defined until topology submission time. Submission parameters enable the creation of more reusable topology bundles.

        A submission parameter has a name. The name must be unique within the topology.

        The parameter is a Supplier. Prior to submitting the topology, while constructing the topology, parameter.get() returns null.

        When the topology is submitted, parameter.get() in the executing topology returns the actual submission time value (or the default value see createSubmissionParameter(String, Object)).

        Submission parameters may be used within functional logic. e.g.,

        
         Supplier<Integer> threshold = topology.createSubmissionParameter("threshold", 100);
         TStream<Integer> s = ...;
         // with a Java8 lambda expression
         TStream<Integer> filtered1 = s.filter(v -> v > threshold.get());
         // without
         TStream<Integer> filtered2 = s.filter(new Predicate() {
              public boolean test(Integer v) {
                  return v > threshold.get();
              }} );
         

        Submission parameters may also be used for values in various cases such as TStream.parallel(Supplier) width value and MQTT connector configuration and topic values. e.g.,

        
         Supplier<Integer> width = topology.createSubmissionParameter("width", 1);
         TStream<String> s = ...;
         TStream<String> parallel_start = s.parallel(width);
         TStream<String> in_parallel = parallel_start.filter(...).transform(...);
         TStream<String> joined_parallel_streams = in_parallel.endParallel();
         

        Finally, submission parameters may be used in Java Primitive Operator and SPL Operator parameter values.

        The submission parameter's name is used to supply an actual value at topology submission time via StreamsContext.submit(com.ibm.streamsx.topology.Topology, java.util.Map) and ContextProperties.SUBMISSION_PARAMS, or when submitting a topology bundle for execution via other execution runtime native mechanisms such as IBM Streams streamtool.

        Topology submission behavior when a submission parameter lacking a default value is created and a value is not provided at submission time is defined by the underlying topology execution runtime. Submission fails for contexts DISTRIBUTED, STANDALONE, ANALYTIC_SERVICE, or EMBEDDED.

        Parameters:
        name - submission parameter name
        valueClass - class object for T
        Returns:
        the Supplier<T> for the submission parameter
        Throws:
        java.lang.IllegalArgumentException - if name is null, empty, or has already been defined.
      • createSubmissionParameter

        public <T> Supplier<T> createSubmissionParameter(java.lang.String name,
                                                         T defaultValue)
        Create a submission parameter with a default value.

        See createSubmissionParameter(String, Class) for a description of submission parameters.

        Parameters:
        name - submission parameter name
        defaultValue - default value if parameter isn't specified.
        Returns:
        the Supplier<T> for the submission parameter
        Throws:
        java.lang.IllegalArgumentException - if name is null, empty, or has already been defined.
        java.lang.IllegalArgumentException - if defaultValue is null
      • addJobControlPlane

        public void addJobControlPlane()
        Add job control plane to this topology. Creates a job control plane, which other operators can use to communicate control information. Job control plane is a JMX MBean Server that supports registration of MBeans from operators and logic within the job. Operators then use their MBeans, or the ones that are automatically registered in the job control plane, to provide intra-job control, co-ordination, and configuration.

        Multiple calls to this method may be made, a single job control plane will be created.

        A job control plane is required when a consistent region is used and is added automatically by TODO-add link.

        Job control plane is only supported in distributed contexts.

        Since:
        com.ibm.streamsx.topology 1.5
streamsx.topology 2.1 @ IBMStreams GitHub