SPL File leader.spl

com.ibm.streamsx.plumbing > com.ibm.streamsx.plumbing 1.0.0 > com.ibm.streamsx.plumbing.leader > leader.spl

Content

Operators
Types

Composites

composite LeadershipChangeover(output WhenLeader, WhenNotLeader, stream<LeaderStatus> LeaderControl; input In)

Changeover switch that changes state based upon if the PE is marked as the leader of group. Requires the application contains a JobControlPlane operator invocation.

Parameters

Input Ports

Output Ports

SPL Source Code


 public composite LeadershipChangeover(input In ; output WhenLeader, WhenNotLeader, stream<LeaderStatus> LeaderControl)
 {
     param
         expression<rstring> $group ;
         expression<rstring> $path : "" ;
     graph
       stream<LeaderStatus> LeaderControl = LeadershipElection()
       {
         param
           group : $group ;
           path : $path ;
       }
 
       (stream<In> WhenLeader ; stream<In> WhenNotLeader) as LCS = Changeover(In ; LeaderControl)
       {
         param
            status : leader ;
            initialState : false ; // start off open, only close when become the leader
       }
 }

   

composite LeadershipSwitchWithReplay(output Out; input In)

Switch operator that only generates output when its PE is the leader of the group.

LeadershipChangeover is used to elect the leader which in turn uses LeadershipElection.

When this operator's PE is the leader, all tuples from In are submitted to Out.

When this operator's PE is not the leader, the last period seconds of tuples from In are maintained in a replay buffer.

When this operator's PE is elected the leader after a failure of the previous leader, all the tuples in the replay buffer are submitted to Out in addition to tuples arriving on In.

For correct behavior all operators in this composite must be fused into a single processing element using partition colocation. This composite does not itself colocate as it is likely it will be fused with additional operators specific to the application.

stream<B1> L1 = LeadershipSwitchWithReplay(B1) {
  param
    group : "lg" ;
  config
    placement: partitionColocation("lg1");
}

Note that instances of LeadershipSwitchWithReplay within the same group must not be partition colocated.

Parameters

Input Ports

Output Ports

SPL Source Code


 public composite LeadershipSwitchWithReplay(input In ; output Out)
 {
 param
         expression<rstring> $group ;        
         expression<rstring> $path : "" ;
         expression<float64> $period : 10.0;
 
    graph
 
 		(stream<In> WhenLeader ; stream<In> WhenNotLeader; stream<LeaderStatus> LeaderControl ) as Switch =
 			LeadershipChangeover(In)
 		{
 			param
 				group : $group ;
 				path : $path ;
 		}
 		
 		// Stream that will drive the replay when this PE becomes the leader.
 		stream<LeaderControl> IsLeader = Filter(LeaderControl) {
 		param
 		filter: leader == true;
 		}
 
 		stream<In> Replay = ReplayBuffer(WhenNotLeader ; IsLeader)
 		{
 			param
 				period : $period ;
 		}
 		
 		stream<In> Out = Functor(WhenLeader,Replay) {
 		}
 }

   

Types

LeaderStatus

Type of tuples submitted by LeadershipElection on its output port. If leader is true then the LeadershipElection operator that submitted it is the leader of the group.

LeaderStatus = rstring group, rstring id, boolean leader, timestamp ts;