SPL File KeyedDelayTest.spl

com.ibm.streamsx.plumbing > KeyedDelayTest 1.0.1 > com.ibm.streamsx.plumbing.sample.delay > KeyedDelayTest.spl

Content

Operators

Composites

composite KeyedDelayTest

Parameters

SPL Source Code


 public composite KeyedDelayTest {
 	param
 		// Default delay time is 1 minute represented in seconds (1 * 60)
 		expression<float64> $DELAY : (float64)getSubmissionTimeValue("Delay", "60.00");
 		// User preference to configure the KeyedDelay operator to allow the
 		// overriding of the delay time value or not.
 		expression<boolean> $ALLOW_DELAY_OVERRIDE : (boolean)getSubmissionTimeValue("AllowDelayOverride", "false");
 				
 	type
 		FlightType = rstring flightId, rstring flightModel, rstring origin, rstring destination;
 		DelayOverrideType = float64 delay;
 		DeleteTupleType = rstring flightId;
                 FlightWithDelayOverrideType = FlightType, DelayOverrideType;
 		
 	graph
 		// Generate periodic timer signals to check for the full delay completion of the data tuples.
 		stream<boolean signal> TimerSignal = Beacon() {
 			param
 				// Send a tick every 10 seconds.
 				period : 10.0;
 		}
 
 		// Ingest the flight data from a file. (It has 11 rows)
 /*
 		stream<FlightType> FlightInfo = FileSource() {
 			param
 				file : "flight-info.txt";
 				hotFile : true;
 		}
 */
 		// Comment out the previous FileSource code block and then
 		// uncomment this Beacon code block in order to test it with 100K entries.
 		stream<FlightType> FlightInfo = Beacon() {
 			param
 				iterations: 100000;
 				
 			output
 				FlightInfo: flightId = (rstring)(IterationCount()+1ul),
 					flightModel = "Model" + (rstring)(IterationCount()+1ul),
 					origin = "Origin" + (rstring)(IterationCount()+1ul),
 					destination = "Destination" + (rstring)(IterationCount()+1ul);
 		}
 		
 
 		// This operator can generate a query signal for the KeyedDelay operator below.
 		// That query signal will make that operator to send out the current snapshot of
 		// its internal cache where it is holding all the events for a specific amount of time.
 		// You can use either telnet or nc (netcat) to connect to this operator's IP Address:Port and
 		// then type something (any random character followed by a new line) to generate a query signal.
 		stream<rstring str> SnapshotQuerySignal = TCPSource() {
 			param
 				role: server;
 				port: 22222u;
 		}
 
 		// Display the incoming flight info tuple.
 		(stream<FlightType> FlightIn as FIN;
 		 stream<DelayOverrideType> DelayOverride as DO;
 		 stream<DeleteTupleType> DeleteTuple as DT) = Custom(FlightInfo as FI) {
 			logic
 				state: {
 					mutable int32 _cnt = 0;
 					mutable DelayOverrideType _oTuple2 = {};
 					mutable float64 _delay = 0.0;
 					list<float64> _delayList = [90.00, 120.00, 60.00, 180.00];
 				}
 
 				onTuple FI: {
 					if ($ALLOW_DELAY_OVERRIDE == true) {
 						// We are allowed to override the delay time  whenever we want.
 						// Let us play with that feature here by changing the delay time for this tuple.
 						_delay = _delayList[_cnt % size(_delayList)];
 						_oTuple2.delay = _delay;
 						// Let us first change the delay time for this tuple by submitting the delayOverride tuple.
 						// After the override tuple, we can submit the actual data tuple which will get delayed  for the
 						// newly overridden delay time value.
 						submit(_oTuple2, DO);
 					} else {
 						// Delay time is as specified at the start of the application. No override possible.
 						_delay = $DELAY;
 					}
 					
 					appTrc(Trace.error, "IN: FlightInfo tuple " + (rstring)++_cnt + 
 						" with a delay of " + (rstring)_delay + " second(s)=" + (rstring) FI);
 					submit(FI, FIN);
 					
 					// We will randomly delete the 12th data tuple we submitted that is being delaye for a while.
 					if (_cnt == 12) {
 						mutable DeleteTupleType oTuple = {};
 						oTuple.flightId = FI.flightId;
 						submit(oTuple, DT);
 						
 						// Let us also send an invalid flight id to be deleted (just for testing).
 						oTuple.flightId = "XYZ";
 						submit(oTuple, DT);
 					}
 				}
 		}
 
 		// This operator will hold the incoming tuples for a specified duration and then send them out
 		// after that delay is fully realized. If a new tuple arrives with the same key, then an earlier tuple
 		// with the same key being held/delayed will be replaced with the newly arrived tuple and the
 		// count down for the delay time will be started freshly.
 		//
 		// 1) First input stream is the regular data tuple that needs to be held/delayed for a
 		//    configured amount of time.
 		//
 		// 2) Second input stream is where the periodic timer signal that must be sent to this operator in order to 
                 //    check for the delay completion time and then send the fully delayed tuples for downstream consumption.
 		//
 		// 3) Third input stream is used to override/change the delay time in the middle of the operation.
                 //    Simply send a tuple to this port with a float64 attribute named `delay` set to a new delayTime value.
                 //    This input stream will be effective only when the operator parameter allowDelayOverride is
                 //    set to true. If it is set to false, any tuple received via this stream will be ignored.
                 //    When the delay time is changed via this stream, any future data tuples sent into this
                 //    operator will be delayed for the duration of the newly overridden delay time.
                 //    All the existing data tuples being held inside this operator will not be affected by
                 //    the newly overridden delay time and they will be delayed for the duration that was in effect before
                 //    the override happened.
 		// 
 		// 4) Fourth input stream is used in situations where there is a need to delete 
                 //    an existing data tuple that is being held inside this operator's memory. Simply send a 
                 //    tuple via this input stream with an attribute (named as the value of this operator's key parameter) 
                 //    carrying an identifier required to locate the data tuple to be deleted from 
                 //    this operator's internal in-memory data structure.
 		//
 		// 5) Fifth input stream is optional and it lets this operator to be queried to return a snapshot of its
                 //    internal in-memory cache where the tuples are being held and delayed at a given point in time.
                 //    Simply send any dummy tuple into this port as a snapshot query signal.
                 //    This optional input port works in conjunction with the second optional output port of this operator as
                 //    explained below.
 		//
 		// Output streams: 
 		// a) First output port will carry the original data tuples that were held inside this operator and 
                 //    then got evicted after a preset time delay.
 		// b) Second (optional) output port will carry the snapshot of the tuples that are being held inside
                 //    this operator when a query is made via the optional fifth input port as explained above. 
                 //    This output port should use the same stream schema as the first output port.
 		//
 		(stream<FlightType> FlightInfoAfterDelayCompletion; stream<FlightType> FlightInfoBeingDelayed) =
 			KeyedDelay(FlightIn; TimerSignal; DelayOverride; DeleteTuple; SnapshotQuerySignal) {
 			param
 				// Attribute in the incoming tuple that will be used as a key.
 				key: FlightIn.flightId;
 				// Duration (in milliseconds) for which data tuples should be held/delayed inside this operator.
 				// Those tuples will be sent out after that delay is fully realized.
 				// It takes a float64 value.
 				delay: $DELAY;
 				// Do we want to allow the user to change the delay time value in the middle of the operation?
 				// It takes a boolean value.
 				// If there is going to be a maximum of 100K entries held inside this operator, it is okay to
 				// allow the user to change the delay time value in between different tuples.
 				// For more than 100K entries, changing the delay time value arbitrarily will 
 				// have a performance impact in iterating over the entire data structure inside this
 				// operator to find the fully delayed entries. This parameter lets the operator execute an
 				// optimized logic if the user wants to use the same delay time for the full life of this
 				// operator versus letting the user override the delay time arbitrarily via the third input stream.
 				allowDelayOverride: $ALLOW_DELAY_OVERRIDE;
                                 // An optional parameter below can be used to specify a file name into which
                                 // this operator will write any data tuples held in its memory at the time of
                                 // operator shutdown. That file can be read later by the application code
                                 // to reprocess those data tuples during the next application run.
                                 shutdownSnapshotFileName: "/tmp/keyed-delay-data-tuples.text_file";
 		}
 		
 		// Display the outgoing flight info tuple.
 		() as FlightOut = Custom(FlightInfoAfterDelayCompletion as FIADC) {
 			logic
 				state: {
 					mutable int32 _cnt = 0;
 				}
 
 				onTuple FIADC: {
 					appTrc(Trace.error, "OUT: FlightInfo tuple " +(rstring) ++ _cnt + "=" +(rstring) FIADC);
 				}
 		}
 		
 		// Display the delayed tuples snapshot information.
                 // Snapshot of delayed tuples will arrive here one tuple at a time.
                 // When a window marker arrives, that is an indication that the snapshot of delayed tuples is fully done.
 		() as SnapshotResult = Custom(FlightInfoBeingDelayed as FIBD) {
 			logic
                                 state: {
                                        mutable int32 _cnt = 0;
                                 }
 
 				onTuple FIBD: {
                                         if (++_cnt == 1) {
 					   printStringLn("========================");
 					   printStringLn(ctime(getTimestamp()) + " Current snapshot of the tuples getting delayed is as shown below.");
                                         }
 
 					printStringLn("FlightInfo " + (rstring)_cnt + ": " + (rstring)FIBD);
 				}
 
                                 onPunct FIBD: {
                                         // This punctuation indicates that the snapshot information is over.
                                         if (currentPunct() == Sys.WindowMarker) {
                                            _cnt = 0;
 					   printStringLn("========================");
                                         }
                                 }
 		}
 
                 // ===============================================================================
                 // Following block of code demonstrates how to overide the delay for every tuple via
                 // a delay override attribute present in the regaular data tuple.
                 //
 		// Display the incoming flight info tuple.
 		(stream<FlightWithDelayOverrideType> FlightIn2 as FIN2;
 		 stream<DelayOverrideType> DelayOverride2 as DO2;
 		 stream<DeleteTupleType> DeleteTuple2 as DT2;
                  stream<rstring str> SnapshotQuerySignal2) = Custom(FlightInfo as FI) {
 			logic
 				state: {
 					mutable int32 _cnt = 0;
 					mutable FlightWithDelayOverrideType _oTuple1 = {};
 					mutable float64 _delay = 0.0;
 					list<float64> _delayList = [90.00, 120.00, 60.00, 180.00];
 				}
 
 				onTuple FI: {
 					// We are allowed to override the delay time whenever we want.
 					// Let us play with that feature here by changing the delay time for this tuple.
 					_delay = _delayList[_cnt % size(_delayList)];
                                         assignFrom(_oTuple1, FI); 
 					_oTuple1.delay = _delay;
                                         // We are overriding the delay value via an attribute in our data tuple.
                                         // This is a different way overriding the delay value when compared to 
                                         // what is being done in the previous block of the code snippet.
                                         // [Read the commentary at the top of this file to learn about the two ways of overriding a delay value.]
 					submit(_oTuple1, FIN2);
 					
 					appTrc(Trace.error, "IN2: FlightInfo tuple " + (rstring)++_cnt + 
 						"=" + (rstring)_oTuple1);
 				}
 		}
 
                 // See the previous invocation of this same operator above for detailed parameter documentation.
 		(stream<FlightWithDelayOverrideType> FlightInfoAfterDelayCompletion2; 
                  stream<FlightWithDelayOverrideType> FlightInfoBeingDelayed2) =
 			KeyedDelay(FlightIn2; TimerSignal; DelayOverride2; DeleteTuple2; SnapshotQuerySignal2) {
 			param
 				key: FlightIn2.flightId;
 				delay: $DELAY;
 				allowDelayOverride: true;
                                 delayOverrideAttribute: FlightIn2.delay;
 		}
 		
 		// Display the outgoing flight info tuple.
 		() as FlightOut2 = Custom(FlightInfoAfterDelayCompletion2 as FIADC) {
 			logic
 				state: {
 					mutable int32 _cnt = 0;
 				}
 
 				onTuple FIADC: {
 					appTrc(Trace.error, "OUT2: FlightInfo tuple " +(rstring) ++ _cnt + "=" +(rstring) FIADC);
 				}
 		}
                 // ===============================================================================
 
 }