SPL File Flow.spl

com.ibm.streamsx.plumbing > redundant 1.0.1 > com.ibm.streamsx.plumbing.sample.redundant.flow > Flow.spl




composite Flow

Simple flow to be executed multiple times to provide redundant copies. The actual functionality of the flow is basically nothing, it is just to demonstrate the execution of redundant copies.

The premise is an application that reads from some data source (say a message broker), analyzes the streams and then sends SMS alerts based upon the analysis.

SPL Source Code

 public composite Flow
     stream<T> Alerts = SourceAnalyticsFlow() {}
     () as SMS = SendSMS(Alerts) {}


composite SourceAnalyticsFlow(output Alerts)

Output Ports

SPL Source Code

 public composite SourceAnalyticsFlow(output Alerts) {
     stream<T> Messages = Source() {}
     stream<T> Alerts = Analytics(Messages) {}


composite Source(output Messages)

Output Ports

SPL Source Code

 public composite Source(output Messages)
         stream<T> Raw = Beacon()
             period: 0.05;
           output Raw:
                id = IterationCount();
         // Random value for v that will be consistent
         // across multiple replicas
         stream<T> Messages = Functor(Raw) {
              state: {
                 mutable uint64 seed = 9221ul;
                 mutable uint64 next = 0l;
              onTuple Raw: {
                seed = seed * 1103515245ul + 12345ul;
                next = (seed / 65536ul) % 1000ul;
           output Messages: v = ((float64) next) / 1000.0l;


composite Analytics(output Alerts; input Messages)

The "analytics" portion of the application including the source.

Input Ports

Output Ports

SPL Source Code

 public composite Analytics(input Messages; output Alerts)
         stream<T> FA1 = Functor(Messages) { }
         stream<T> FA2 = Filter(FA1) {
           param filter: v <= 0.10;
         stream<T> FB1 = Functor(Messages) { }
         stream<T> FB2 = Functor(FB1) {
           param filter: v >= 0.80;
         stream<T> Alerts = Functor(FA2,FB2) { }


composite SendSMS(input Alerts)

The sink of the application. This does nothing but consume the tuples. It it representing a hypothetical operator that sends a SMS (text message) alert for each tuple received.

Input Ports

SPL Source Code

 public composite SendSMS(input Alerts)
      () as SMS = Custom(Alerts) { }




Type used in Flow.

T = uint64 id, float64 v;