905_gate_load_balancer

/*
This example is the same code that can be found in the SPL Standard Toolkit PDF file.
Please see that PDF file for a description about what this application does.
*/
namespace my.sample ;
// Implementing Load Balancing of Tuples

// A ThreadedSplit operator may be used to split a stream, 
// and process tuples in parallel.
// This works for many applications, but if the processing time of 
// a tuple varies considerably depending on the tuple data, it may 
// cause problems where a tuple with a long processing time may cause 
// subsequent tuples to be backed up in the stream waiting for processing,
// even though there may be another thread available and idle.
// This may be aggravated by tuples that are in a TCP/IP connection to 
// another PE. In order to ensure load balancing, a ThreadedSplit 
// operator with a buffer size of 1 may be tied to two or more Gate 
// operators with a maximum unacknowled tuple count of 1 or more.
// The ThreadedSplit and the  Gate operators must be on the same PE, to 
// avoid tuples queuing between PEs. A partitionColocation placement 
// config is used to ensure this.
//
// The Gate operator will allow only the specified number of tuples 
// (1 in this case)  to pass at a time.  It will wait until a 
// subsequent operator has acknowledged  receipt of the tuple before 
// passing the next tuple.  In this manner, no tuple will  ever be 
// queued behind another, waiting to be processed.
//
composite Main {
	graph
		// Generate a stream of data to process
		stream <uint32 i > I = Beacon(){
			logic
				state : mutable uint32 c = 0 ;

			output I : 
				i = c ++ ;
		} // End of I = Beacon()
		
		// Split the stream into 2 streams.  Use a following Gate to ensure load balancing
		( stream < I > X ; stream < I > Y) =ThreadedSplit ( I){
			param 
				bufferSize : 1u ;
				
			config
				placement : partitionColocation( "Split") , // ensure same PE as the Gates
				partitionExlocation( "Process") ;
		}  // End of (... stream < I > Y) =ThreadedSplit(I)
		
		stream <I > O0 = Gate ( X ; Control0 ) {
			param
				maxUnackedTupleCount : 1u ; 
				
			config 
				placement : partitionColocation( "Split") ; // ensure same PE as ThreadedSplit
		} // End of O0 = Gate ( X ; Control0 )
		
		stream <I > O1 = Gate ( Y ; Control1 ) { 
			param 
				maxUnackedTupleCount : 1u ; 

			config 
				placement : partitionColocation("Split") ; // ensure same PE as ThreadedSplit
		} // End of O1 = Gate ( Y ; Control1 )

		( stream <I > R0 as out ; stream < uint32 i > Control0 as control) = Custom ( O0 as In ) { 
			logic 
				onTuple In : { 
					// do some processing
					submit ( In , out ) ; // forward tuple
					submit({i = 1u} , control ) ; 
				} // End of onTupe In
				
			// Place on a different PE from Gate or other processing operator
			config 
				placement : partitionExlocation ( "Process" ) ; 
		} // End of (... Control0 as control) = Custom ( O0 as In )
		
		( stream < I > R1 as out ; stream < uint32 i > Control1 as control ) = Custom ( O1 as In ) {
			logic
				onTuple In : { 
					// do some processing
					submit ( In , out ) ; // forward tuple
					submit ( { i = 1u } , control ) ; 
				} 

			// Place on a different PE from Gate and other processing operator
			config
				placement : partitionExlocation ( "Process" ) ; 
		} // End of (... Control1 as control ) = Custom ( O1 as In )
} // End of composite Main