066_load_balancing_using_gate

/*
This application demonstrates how we can combine the ThreadedSplit and Gate operators to
achieve load balancing of tuples. This idea appears in the Streams InfoCenter documentation for
the Gate operator. I thought it is useful to include that example here to benefit our users.
Full credit goes to the InfoCenter authors for the following problem description and the code.

When using ThreadedSplit, a tuple with a long processing time may cause subsequent tuples to be
backed up in the stream waiting for processing (even though there may be another thread available and
idle. This may be aggravated by tuples that are in a TCP/IP connection to another PE.

In order to ensure load balancing, a ThreadedSplit operator with a buffer size of 1 may be tied to 
two or more Gate operators with a maximum unacknowledged tuple count of 1 or more. The ThreadedSplit
and the Gate operators must be on the same PE, to avoid tuples queuing between PEs. A partitionColocation
placement config is used to ensure this.

The Gate operator will allow only the specified number of tuples (1 in this case) to pass at a time.
It will wait until a subsequent operator has acknowledged receipt of the tuple before passing the next
tuple. In this manner, no tuple will ever be queued behind another, waiting to be processed.

(To see the actual effect of this example, it is better to run this application in Distributed Mode.) 
*/
namespace com.acme.test;

composite LoadBalancingUsingGate {
	graph
		// Generate a stream of data to process                                                    
		stream<uint32 i> I = Beacon(){                                                             
			logic                                                                                    
				state : mutable uint32 c = 0 ;                                                         
			
			output                                                                                     
				I : i = c ++ ;                                                                         
		}
		
		// Split the stream into 2 streams.  Use a following Gate to ensure load balancing
		// Fuse the ThreadedSplit and the Gate operators so that there is no queuing between them.         
		(stream<I> X ; stream<I> Y)= ThreadedSplit(I){                                             
			param                                                                                    
				bufferSize : 1u ;                                                                      
		
			config                                                                                   
				placement : partitionColocation("Split"), // ensure same PE as the Gates
				// Make it away from the downstream processing operator.               
				partitionExlocation("Process");                                            
    	}
    	
		stream<I> O0 = Gate(X ; Control0){                                                         
			param                                                                                    
				maxUnackedTupleCount : 1u ;
				                                                            
			config                                                                                   
				placement : partitionColocation("Split"); // ensure same PE as ThreadedSplit                                                                                                          
		}    	

		stream<I> O1 = Gate(Y ; Control1){                                                         
			param                                                                                    
				maxUnackedTupleCount : 1u ;
				                                                            
			config                                                                                   
				placement : partitionColocation("Split"); // ensure same PE as ThreadedSplit           
    	}

		(stream<I> R0 as out ; stream<uint32 i> Control0 as control)= Custom(O0 as In) {            
			logic                                                                                    
				onTuple In : {                                                                         
					// do some processing                                                                
					submit(In, out); // forward tuple                                                    
					submit({ i = 1u }, control);                                                         
				}                                                                                      

			// Place on a different PE from Gate or other processing operator                                                                                                                     
			config                                                                                   
			placement : partitionExlocation("Process");                                            
		}

		(stream<I> R1 as out ; stream<uint32 i> Control1 as control)= Custom(O1 as In){            
			logic                                                                                    
				onTuple In : {                                                                         
					// do some processing                                                                
					submit(In, out); // forward tuple                                                    
					submit({ i = 1u }, control);                                                         
				}
				                                                                                      
			// Place on a different PE from Gate and other processing operator                     
			config                                                                                   
				placement : partitionExlocation("Process");                                            
		}
		
		() as Sink1 = FileSink(R0) {
			param
				file: "/dev/null";
		}
		
		() as Sink2 = FileSink(R1) {
			param
				file: "/dev/null";
		}
}