com.ibm.streamsx.plumbing > redundant 1.0.1 > com.ibm.streamsx.plumbing.sample.redundant.flow > Flow.spl
Simple flow to be executed multiple times to provide redundant copies. The actual functionality of the flow is basically nothing, it is just to demonstrate the execution of redundant copies.
The premise is an application that reads from some data source (say a message broker), analyzes the streams and then sends SMS alerts based upon the analysis.
public composite Flow { graph stream<T> Alerts = SourceAnalyticsFlow() {} () as SMS = SendSMS(Alerts) {} }
public composite SourceAnalyticsFlow(output Alerts) { graph stream<T> Messages = Source() {} stream<T> Alerts = Analytics(Messages) {} }
public composite Source(output Messages) { graph stream<T> Raw = Beacon() { param period: 0.05; output Raw: id = IterationCount(); } // Random value for v that will be consistent // across multiple replicas stream<T> Messages = Functor(Raw) { logic state: { mutable uint64 seed = 9221ul; mutable uint64 next = 0l; } onTuple Raw: { seed = seed * 1103515245ul + 12345ul; next = (seed / 65536ul) % 1000ul; } output Messages: v = ((float64) next) / 1000.0l; } }
The "analytics" portion of the application including the source.
public composite Analytics(input Messages; output Alerts) { graph stream<T> FA1 = Functor(Messages) { } stream<T> FA2 = Filter(FA1) { param filter: v <= 0.10; } stream<T> FB1 = Functor(Messages) { } stream<T> FB2 = Functor(FB1) { param filter: v >= 0.80; } stream<T> Alerts = Functor(FA2,FB2) { } }
The sink of the application. This does nothing but consume the tuples. It it representing a hypothetical operator that sends a SMS (text message) alert for each tuple received.
public composite SendSMS(input Alerts) { graph () as SMS = Custom(Alerts) { } }
Type used in Flow.
T = uint64 id, float64 v;