com.ibm.streamsx.plumbing > redundant 1.0.1 > com.ibm.streamsx.plumbing.sample.redundant.flow > Flow.spl
Simple flow to be executed multiple times to provide redundant copies. The actual functionality of the flow is basically nothing, it is just to demonstrate the execution of redundant copies.
The premise is an application that reads from some data source (say a message broker), analyzes the streams and then sends SMS alerts based upon the analysis.
public composite Flow
{
graph
stream<T> Alerts = SourceAnalyticsFlow() {}
() as SMS = SendSMS(Alerts) {}
}
public composite SourceAnalyticsFlow(output Alerts) {
graph
stream<T> Messages = Source() {}
stream<T> Alerts = Analytics(Messages) {}
}
public composite Source(output Messages)
{
graph
stream<T> Raw = Beacon()
{
param
period: 0.05;
output Raw:
id = IterationCount();
}
// Random value for v that will be consistent
// across multiple replicas
stream<T> Messages = Functor(Raw) {
logic
state: {
mutable uint64 seed = 9221ul;
mutable uint64 next = 0l;
}
onTuple Raw: {
seed = seed * 1103515245ul + 12345ul;
next = (seed / 65536ul) % 1000ul;
}
output Messages: v = ((float64) next) / 1000.0l;
}
}
The "analytics" portion of the application including the source.
public composite Analytics(input Messages; output Alerts)
{
graph
stream<T> FA1 = Functor(Messages) { }
stream<T> FA2 = Filter(FA1) {
param filter: v <= 0.10;
}
stream<T> FB1 = Functor(Messages) { }
stream<T> FB2 = Functor(FB1) {
param filter: v >= 0.80;
}
stream<T> Alerts = Functor(FA2,FB2) { }
}
The sink of the application. This does nothing but consume the tuples. It it representing a hypothetical operator that sends a SMS (text message) alert for each tuple received.
public composite SendSMS(input Alerts)
{
graph
() as SMS = Custom(Alerts) { }
}
Type used in Flow.
T = uint64 id, float64 v;