SPL File Flow.spl

com.ibm.streamsx.plumbing > redundant 1.0.1 > com.ibm.streamsx.plumbing.sample.redundant.flow > Flow.spl

Content

Operators
Types

Composites

composite Flow

Simple flow to be executed multiple times to provide redundant copies. The actual functionality of the flow is basically nothing, it is just to demonstrate the execution of redundant copies.

The premise is an application that reads from some data source (say a message broker), analyzes the streams and then sends SMS alerts based upon the analysis.

SPL Source Code


 public composite Flow
 {
     graph
     stream<T> Alerts = SourceAnalyticsFlow() {}
     () as SMS = SendSMS(Alerts) {}
 }

   

composite SourceAnalyticsFlow(output Alerts)

Output Ports

SPL Source Code


 public composite SourceAnalyticsFlow(output Alerts) {
   graph
     stream<T> Messages = Source() {}
     stream<T> Alerts = Analytics(Messages) {}
 }

   

composite Source(output Messages)

Output Ports

SPL Source Code


 public composite Source(output Messages)
 {
     graph
         stream<T> Raw = Beacon()
         {
           param
             period: 0.05;
           output Raw:
                id = IterationCount();
         }
         // Random value for v that will be consistent
         // across multiple replicas
         stream<T> Messages = Functor(Raw) {
           logic
              state: {
                 mutable uint64 seed = 9221ul;
                 mutable uint64 next = 0l;
              }
              onTuple Raw: {
                seed = seed * 1103515245ul + 12345ul;
                next = (seed / 65536ul) % 1000ul;
              }
           output Messages: v = ((float64) next) / 1000.0l;
         }
 }

   

composite Analytics(output Alerts; input Messages)

The "analytics" portion of the application including the source.

Input Ports

Output Ports

SPL Source Code


 public composite Analytics(input Messages; output Alerts)
 {
     graph
         stream<T> FA1 = Functor(Messages) { }
         stream<T> FA2 = Filter(FA1) {
           param filter: v <= 0.10;
         }
 
         stream<T> FB1 = Functor(Messages) { }
         stream<T> FB2 = Functor(FB1) {
           param filter: v >= 0.80;
         }
 
         stream<T> Alerts = Functor(FA2,FB2) { }
 }

   

composite SendSMS(input Alerts)

The sink of the application. This does nothing but consume the tuples. It it representing a hypothetical operator that sends a SMS (text message) alert for each tuple received.

Input Ports

SPL Source Code


 public composite SendSMS(input Alerts)
 {
     graph
      () as SMS = Custom(Alerts) { }
 }

   

Types

T

Type used in Flow.

T = uint64 id, float64 v;