com.ibm.streamsx.topology.function

Interface FunctionContext



  • public interface FunctionContext
    Context for a function executing in a IBM Streams application.
    • Method Summary

      All Methods Instance Methods Abstract Methods 
      Modifier and Type Method and Description
      void addClassLibraries(java.lang.String[] libraries)
      Add class libraries to the functional class loader.
      void createCustomMetric(java.lang.String name, java.lang.String description, java.lang.String kind, java.util.function.LongSupplier value)
      Create a custom metric.
      int getChannel()
      Get the index of the parallel channel the function is on.
      FunctionContainer getContainer()
      Get the container hosting a function
      java.util.Set<java.lang.String> getCustomMetricNames()
      Get the set of custom metric names created in this context.
      int getMaxChannels()
      Get the total number of parallel channels for the parallel region that the function is in.
      java.util.concurrent.ScheduledExecutorService getScheduledExecutorService()
      Return a scheduler to execute background tasks.
      java.util.concurrent.ThreadFactory getThreadFactory()
      Return a ThreadFactory that can be used by the function with the thread context class loader set correctly.
    • Method Detail

      • getContainer

        FunctionContainer getContainer()
        Get the container hosting a function
        Returns:
        Container hosting a function.
      • getScheduledExecutorService

        java.util.concurrent.ScheduledExecutorService getScheduledExecutorService()
        Return a scheduler to execute background tasks. Functions should utilize this service or getThreadFactory() rather than creating their own threads to ensure that the SPL runtime will wait for any background work before completing an application.

        The scheduler will be shutdown when the processing element is to be shutdown. Once the scheduler is shutdown no new tasks will be accepted. Existing scheduled tasks will remain in the scheduler's queue but periodic tasks will canceled.
        Functions that implement AutoCloseable that wish to complete any outstanding tasks at close time can call ExecutorService.awaitTermination() to wait for outstanding tasks to complete or wait on the specific Future reference for a task.

        The returned scheduler service is guaranteed to be an instance of java.util.concurrent.ScheduledThreadPoolExecutor and initially has this configuration:

        • corePoolSize Set to Runtime.availableProcessors() with a minimum of 2 and maximum of 8.
        • allowsCoreThreadTimeOut() set to true
        • keepAliveTime set to 5 seconds
        Threads are created on demand to execute tasks, so that even if the corePoolSize is eight, eight threads will only be created if there are eight concurrent tasks scheduled. Threads will be removed if they are not needed for the keepAliveTime value and allowsCoreThreadTimeOut() returns true.

        Returns:
        Scheduler service that can be used by the function.
        See Also:
        "java.util.concurrent.ExecutorService", "java.util.concurrent.Future", "java.util.concurrent.ScheduledThreadPoolExecutor"
      • getThreadFactory

        java.util.concurrent.ThreadFactory getThreadFactory()
        Return a ThreadFactory that can be used by the function with the thread context class loader set correctly. Functions should utilize the returned factory to create Threads.

        Threads returned by the ThreadFactory have not been started and are set as daemon threads. Functions may set the threads as non-daemon before starting them. The SPL runtime will wait for non-daemon threads before terminating a processing element in standalone mode.

        Any uncaught exception thrown by the Runnable passed to the ThreadFactory.newThread(Runnable) will cause the processing element containing the function to terminate.

        The ThreadFactory will be shutdown when the processing element is to be shutdown. Once the ThreadFactory is shutdown a call to newThread() will return null.

        Returns:
        A ThreadFactory that can be used by the function.
      • getChannel

        int getChannel()
        Get the index of the parallel channel the function is on.

        If the function is in a parallel region, this method returns a value from 0 to N-1, where N is the number of channels in the parallel region; otherwise it returns -1.

        Returns:
        the index of the parallel channel if the function is executing in a parallel region, or -1 if the function is not executing in a parallel region.
      • getMaxChannels

        int getMaxChannels()
        Get the total number of parallel channels for the parallel region that the function is in. If the function is not in a parallel region, this method returns 0.
        Returns:
        the number of parallel channels for the parallel region that this function is in, or 0 if the function is not in a parallel region.
      • addClassLibraries

        void addClassLibraries(java.lang.String[] libraries)
                        throws java.net.MalformedURLException
        Add class libraries to the functional class loader. The functional class loader is set as the thread context class loader for thread factory, executor and any method invocation on the function instance.

        Functions use this method to add class libraries specific to the invocation in a consistent manner. An example is defining the jar files that contain the JDBC driver to be used by the application.

        Each element of libraries is trimmed and then converted into a java.net.URL. If the element cannot be converted to a URL then it is assumed to represent a file system path and is converted into an URL representing that path. If the file path is relative the used location is currently undefined, thus use of relative paths are not recommended.
        If a file path ends with /* then it is assumed to be a directory and all jar files in the directory with the extension .jar or .JAR are added to the function class loader.

        Parameters:
        libraries - String representations of URLs and file paths to be added into the functional class loader. If null then no libraries are added to the class loader.
        Throws:
        java.net.MalformedURLException
      • createCustomMetric

        void createCustomMetric(java.lang.String name,
                                java.lang.String description,
                                java.lang.String kind,
                                java.util.function.LongSupplier value)
        Create a custom metric.
        A custom metric allows monitoring of an application through IBM Streams monitoring APIs including Streams console and the REST apis. A metric has a single long value and has a kind of:
        • counter - A counter metric observes a value that represents a count of an occurrence.
        • gauge - A gauge metric observes a value that is continuously variable with time.
        • time - A time metric represents a point in time. It is recommended that the value represents the number of milliseconds since the 1970/01/01 epoch, i.e. a value consistent with System.currentTimeMillis().

        The initial value of the metric is set from value.getAsLong() during this method call. Subsequently, periodically value.getAsLong() will be called to get the current value of the metric so that it can be reported through the monitoring APIs.

        A lambda expression can be used as the supplier, for example to monitor the length of a queue (or any collection) items a metric can be created from Initializable.initialize(FunctionContext) as:

         
             this.items = new PriorityQueue();
             functionContext.createCustomMetric("queuedItems", "Number of queued items.",
                       "gauge", () -> this.items.size());
         
         
        The metric will now automatically track the length of the queue (subject to the periodic collection cycle).

        A java.util.concurrent.atomic.AtomicLong can be used as a metric's value, for example a counter where no other natural value exists, e.g.:

         
             this.nFailedRequests = new AtomicLong();
             functionContext.createCustomMetric("nFailedRequests", "Number of failed requests.",
                       "counter", this.nFailedRequests::get);
         
         
        Subsequently the counter is incremented using:
         
             this.nFailedRequests.incrementAndGet();
         
         

        Parameters:
        name - Name of the metric.
        description - Description of the metric.
        kind - Kind of the metric.
        value - function that returns the value of the metric
        Throws:
        java.lang.IllegalStateException - A metric with name already exists or kind is not valid.
        Since:
        1.7
      • getCustomMetricNames

        java.util.Set<java.lang.String> getCustomMetricNames()
        Get the set of custom metric names created in this context. The set may include additional metrics not created by this function including metric created by the topology framework.
        Returns:
        The set of custom metric names created in this context.
        Since:
        1.7
streamsx.topology 2.1 @ IBMStreams GitHub