com.ibm.streamsx.plumbing > KeyedDelayTest 1.0.1 > com.ibm.streamsx.plumbing.sample.delay > KeyedDelayTest.spl
public composite KeyedDelayTest { param // Default delay time is 1 minute represented in seconds (1 * 60) expression<float64> $DELAY : (float64)getSubmissionTimeValue("Delay", "60.00"); // User preference to configure the KeyedDelay operator to allow the // overriding of the delay time value or not. expression<boolean> $ALLOW_DELAY_OVERRIDE : (boolean)getSubmissionTimeValue("AllowDelayOverride", "false"); type FlightType = rstring flightId, rstring flightModel, rstring origin, rstring destination; DelayOverrideType = float64 delay; DeleteTupleType = rstring flightId; FlightWithDelayOverrideType = FlightType, DelayOverrideType; graph // Generate periodic timer signals to check for the full delay completion of the data tuples. stream<boolean signal> TimerSignal = Beacon() { param // Send a tick every 10 seconds. period : 10.0; } // Ingest the flight data from a file. (It has 11 rows) /* stream<FlightType> FlightInfo = FileSource() { param file : "flight-info.txt"; hotFile : true; } */ // Comment out the previous FileSource code block and then // uncomment this Beacon code block in order to test it with 100K entries. stream<FlightType> FlightInfo = Beacon() { param iterations: 100000; output FlightInfo: flightId = (rstring)(IterationCount()+1ul), flightModel = "Model" + (rstring)(IterationCount()+1ul), origin = "Origin" + (rstring)(IterationCount()+1ul), destination = "Destination" + (rstring)(IterationCount()+1ul); } // This operator can generate a query signal for the KeyedDelay operator below. // That query signal will make that operator to send out the current snapshot of // its internal cache where it is holding all the events for a specific amount of time. // You can use either telnet or nc (netcat) to connect to this operator's IP Address:Port and // then type something (any random character followed by a new line) to generate a query signal. stream<rstring str> SnapshotQuerySignal = TCPSource() { param role: server; port: 22222u; } // Display the incoming flight info tuple. (stream<FlightType> FlightIn as FIN; stream<DelayOverrideType> DelayOverride as DO; stream<DeleteTupleType> DeleteTuple as DT) = Custom(FlightInfo as FI) { logic state: { mutable int32 _cnt = 0; mutable DelayOverrideType _oTuple2 = {}; mutable float64 _delay = 0.0; list<float64> _delayList = [90.00, 120.00, 60.00, 180.00]; } onTuple FI: { if ($ALLOW_DELAY_OVERRIDE == true) { // We are allowed to override the delay time whenever we want. // Let us play with that feature here by changing the delay time for this tuple. _delay = _delayList[_cnt % size(_delayList)]; _oTuple2.delay = _delay; // Let us first change the delay time for this tuple by submitting the delayOverride tuple. // After the override tuple, we can submit the actual data tuple which will get delayed for the // newly overridden delay time value. submit(_oTuple2, DO); } else { // Delay time is as specified at the start of the application. No override possible. _delay = $DELAY; } appTrc(Trace.error, "IN: FlightInfo tuple " + (rstring)++_cnt + " with a delay of " + (rstring)_delay + " second(s)=" + (rstring) FI); submit(FI, FIN); // We will randomly delete the 12th data tuple we submitted that is being delaye for a while. if (_cnt == 12) { mutable DeleteTupleType oTuple = {}; oTuple.flightId = FI.flightId; submit(oTuple, DT); // Let us also send an invalid flight id to be deleted (just for testing). oTuple.flightId = "XYZ"; submit(oTuple, DT); } } } // This operator will hold the incoming tuples for a specified duration and then send them out // after that delay is fully realized. If a new tuple arrives with the same key, then an earlier tuple // with the same key being held/delayed will be replaced with the newly arrived tuple and the // count down for the delay time will be started freshly. // // 1) First input stream is the regular data tuple that needs to be held/delayed for a // configured amount of time. // // 2) Second input stream is where the periodic timer signal that must be sent to this operator in order to // check for the delay completion time and then send the fully delayed tuples for downstream consumption. // // 3) Third input stream is used to override/change the delay time in the middle of the operation. // Simply send a tuple to this port with a float64 attribute named `delay` set to a new delayTime value. // This input stream will be effective only when the operator parameter allowDelayOverride is // set to true. If it is set to false, any tuple received via this stream will be ignored. // When the delay time is changed via this stream, any future data tuples sent into this // operator will be delayed for the duration of the newly overridden delay time. // All the existing data tuples being held inside this operator will not be affected by // the newly overridden delay time and they will be delayed for the duration that was in effect before // the override happened. // // 4) Fourth input stream is used in situations where there is a need to delete // an existing data tuple that is being held inside this operator's memory. Simply send a // tuple via this input stream with an attribute (named as the value of this operator's key parameter) // carrying an identifier required to locate the data tuple to be deleted from // this operator's internal in-memory data structure. // // 5) Fifth input stream is optional and it lets this operator to be queried to return a snapshot of its // internal in-memory cache where the tuples are being held and delayed at a given point in time. // Simply send any dummy tuple into this port as a snapshot query signal. // This optional input port works in conjunction with the second optional output port of this operator as // explained below. // // Output streams: // a) First output port will carry the original data tuples that were held inside this operator and // then got evicted after a preset time delay. // b) Second (optional) output port will carry the snapshot of the tuples that are being held inside // this operator when a query is made via the optional fifth input port as explained above. // This output port should use the same stream schema as the first output port. // (stream<FlightType> FlightInfoAfterDelayCompletion; stream<FlightType> FlightInfoBeingDelayed) = KeyedDelay(FlightIn; TimerSignal; DelayOverride; DeleteTuple; SnapshotQuerySignal) { param // Attribute in the incoming tuple that will be used as a key. key: FlightIn.flightId; // Duration (in milliseconds) for which data tuples should be held/delayed inside this operator. // Those tuples will be sent out after that delay is fully realized. // It takes a float64 value. delay: $DELAY; // Do we want to allow the user to change the delay time value in the middle of the operation? // It takes a boolean value. // If there is going to be a maximum of 100K entries held inside this operator, it is okay to // allow the user to change the delay time value in between different tuples. // For more than 100K entries, changing the delay time value arbitrarily will // have a performance impact in iterating over the entire data structure inside this // operator to find the fully delayed entries. This parameter lets the operator execute an // optimized logic if the user wants to use the same delay time for the full life of this // operator versus letting the user override the delay time arbitrarily via the third input stream. allowDelayOverride: $ALLOW_DELAY_OVERRIDE; // An optional parameter below can be used to specify a file name into which // this operator will write any data tuples held in its memory at the time of // operator shutdown. That file can be read later by the application code // to reprocess those data tuples during the next application run. shutdownSnapshotFileName: "/tmp/keyed-delay-data-tuples.text_file"; } // Display the outgoing flight info tuple. () as FlightOut = Custom(FlightInfoAfterDelayCompletion as FIADC) { logic state: { mutable int32 _cnt = 0; } onTuple FIADC: { appTrc(Trace.error, "OUT: FlightInfo tuple " +(rstring) ++ _cnt + "=" +(rstring) FIADC); } } // Display the delayed tuples snapshot information. // Snapshot of delayed tuples will arrive here one tuple at a time. // When a window marker arrives, that is an indication that the snapshot of delayed tuples is fully done. () as SnapshotResult = Custom(FlightInfoBeingDelayed as FIBD) { logic state: { mutable int32 _cnt = 0; } onTuple FIBD: { if (++_cnt == 1) { printStringLn("========================"); printStringLn(ctime(getTimestamp()) + " Current snapshot of the tuples getting delayed is as shown below."); } printStringLn("FlightInfo " + (rstring)_cnt + ": " + (rstring)FIBD); } onPunct FIBD: { // This punctuation indicates that the snapshot information is over. if (currentPunct() == Sys.WindowMarker) { _cnt = 0; printStringLn("========================"); } } } // =============================================================================== // Following block of code demonstrates how to overide the delay for every tuple via // a delay override attribute present in the regaular data tuple. // // Display the incoming flight info tuple. (stream<FlightWithDelayOverrideType> FlightIn2 as FIN2; stream<DelayOverrideType> DelayOverride2 as DO2; stream<DeleteTupleType> DeleteTuple2 as DT2; stream<rstring str> SnapshotQuerySignal2) = Custom(FlightInfo as FI) { logic state: { mutable int32 _cnt = 0; mutable FlightWithDelayOverrideType _oTuple1 = {}; mutable float64 _delay = 0.0; list<float64> _delayList = [90.00, 120.00, 60.00, 180.00]; } onTuple FI: { // We are allowed to override the delay time whenever we want. // Let us play with that feature here by changing the delay time for this tuple. _delay = _delayList[_cnt % size(_delayList)]; assignFrom(_oTuple1, FI); _oTuple1.delay = _delay; // We are overriding the delay value via an attribute in our data tuple. // This is a different way overriding the delay value when compared to // what is being done in the previous block of the code snippet. // [Read the commentary at the top of this file to learn about the two ways of overriding a delay value.] submit(_oTuple1, FIN2); appTrc(Trace.error, "IN2: FlightInfo tuple " + (rstring)++_cnt + "=" + (rstring)_oTuple1); } } // See the previous invocation of this same operator above for detailed parameter documentation. (stream<FlightWithDelayOverrideType> FlightInfoAfterDelayCompletion2; stream<FlightWithDelayOverrideType> FlightInfoBeingDelayed2) = KeyedDelay(FlightIn2; TimerSignal; DelayOverride2; DeleteTuple2; SnapshotQuerySignal2) { param key: FlightIn2.flightId; delay: $DELAY; allowDelayOverride: true; delayOverrideAttribute: FlightIn2.delay; } // Display the outgoing flight info tuple. () as FlightOut2 = Custom(FlightInfoAfterDelayCompletion2 as FIADC) { logic state: { mutable int32 _cnt = 0; } onTuple FIADC: { appTrc(Trace.error, "OUT2: FlightInfo tuple " +(rstring) ++ _cnt + "=" +(rstring) FIADC); } } // =============================================================================== }