com.ibm.streamsx.plumbing > com.ibm.streamsx.plumbing 1.0.0 > com.ibm.streamsx.plumbing.leader > leader.spl
Changeover switch that changes state based upon if the PE is marked as the leader of group. Requires the application contains a JobControlPlane operator invocation.
public composite LeadershipChangeover(input In ; output WhenLeader, WhenNotLeader, stream<LeaderStatus> LeaderControl) { param expression<rstring> $group ; expression<rstring> $path : "" ; graph stream<LeaderStatus> LeaderControl = LeadershipElection() { param group : $group ; path : $path ; } (stream<In> WhenLeader ; stream<In> WhenNotLeader) as LCS = Changeover(In ; LeaderControl) { param status : leader ; initialState : false ; // start off open, only close when become the leader } }
Switch operator that only generates output when its PE is the leader of the group.
LeadershipChangeover is used to elect the leader which in turn uses LeadershipElection.
When this operator's PE is the leader, all tuples from In are submitted to Out.
When this operator's PE is not the leader, the last period seconds of tuples from In are maintained in a replay buffer.
When this operator's PE is elected the leader after a failure of the previous leader, all the tuples in the replay buffer are submitted to Out in addition to tuples arriving on In.
For correct behavior all operators in this composite must be fused into a single processing element using partition colocation. This composite does not itself colocate as it is likely it will be fused with additional operators specific to the application.
stream<B1> L1 = LeadershipSwitchWithReplay(B1) { param group : "lg" ; config placement: partitionColocation("lg1"); }
Note that instances of LeadershipSwitchWithReplay within the same group must not be partition colocated.
public composite LeadershipSwitchWithReplay(input In ; output Out) { param expression<rstring> $group ; expression<rstring> $path : "" ; expression<float64> $period : 10.0; graph (stream<In> WhenLeader ; stream<In> WhenNotLeader; stream<LeaderStatus> LeaderControl ) as Switch = LeadershipChangeover(In) { param group : $group ; path : $path ; } // Stream that will drive the replay when this PE becomes the leader. stream<LeaderControl> IsLeader = Filter(LeaderControl) { param filter: leader == true; } stream<In> Replay = ReplayBuffer(WhenNotLeader ; IsLeader) { param period : $period ; } stream<In> Out = Functor(WhenLeader,Replay) { } }
Type of tuples submitted by LeadershipElection on its output port. If leader is true then the LeadershipElection operator that submitted it is the leader of the group.
LeaderStatus = rstring group, rstring id, boolean leader, timestamp ts;