com.ibm.streamsx.plumbing > KeyedDelayTest 1.0.1 > com.ibm.streamsx.plumbing.sample.delay > KeyedDelayTest.spl
public composite KeyedDelayTest {
param
// Default delay time is 1 minute represented in seconds (1 * 60)
expression<float64> $DELAY : (float64)getSubmissionTimeValue("Delay", "60.00");
// User preference to configure the KeyedDelay operator to allow the
// overriding of the delay time value or not.
expression<boolean> $ALLOW_DELAY_OVERRIDE : (boolean)getSubmissionTimeValue("AllowDelayOverride", "false");
type
FlightType = rstring flightId, rstring flightModel, rstring origin, rstring destination;
DelayOverrideType = float64 delay;
DeleteTupleType = rstring flightId;
FlightWithDelayOverrideType = FlightType, DelayOverrideType;
graph
// Generate periodic timer signals to check for the full delay completion of the data tuples.
stream<boolean signal> TimerSignal = Beacon() {
param
// Send a tick every 10 seconds.
period : 10.0;
}
// Ingest the flight data from a file. (It has 11 rows)
/*
stream<FlightType> FlightInfo = FileSource() {
param
file : "flight-info.txt";
hotFile : true;
}
*/
// Comment out the previous FileSource code block and then
// uncomment this Beacon code block in order to test it with 100K entries.
stream<FlightType> FlightInfo = Beacon() {
param
iterations: 100000;
output
FlightInfo: flightId = (rstring)(IterationCount()+1ul),
flightModel = "Model" + (rstring)(IterationCount()+1ul),
origin = "Origin" + (rstring)(IterationCount()+1ul),
destination = "Destination" + (rstring)(IterationCount()+1ul);
}
// This operator can generate a query signal for the KeyedDelay operator below.
// That query signal will make that operator to send out the current snapshot of
// its internal cache where it is holding all the events for a specific amount of time.
// You can use either telnet or nc (netcat) to connect to this operator's IP Address:Port and
// then type something (any random character followed by a new line) to generate a query signal.
stream<rstring str> SnapshotQuerySignal = TCPSource() {
param
role: server;
port: 22222u;
}
// Display the incoming flight info tuple.
(stream<FlightType> FlightIn as FIN;
stream<DelayOverrideType> DelayOverride as DO;
stream<DeleteTupleType> DeleteTuple as DT) = Custom(FlightInfo as FI) {
logic
state: {
mutable int32 _cnt = 0;
mutable DelayOverrideType _oTuple2 = {};
mutable float64 _delay = 0.0;
list<float64> _delayList = [90.00, 120.00, 60.00, 180.00];
}
onTuple FI: {
if ($ALLOW_DELAY_OVERRIDE == true) {
// We are allowed to override the delay time whenever we want.
// Let us play with that feature here by changing the delay time for this tuple.
_delay = _delayList[_cnt % size(_delayList)];
_oTuple2.delay = _delay;
// Let us first change the delay time for this tuple by submitting the delayOverride tuple.
// After the override tuple, we can submit the actual data tuple which will get delayed for the
// newly overridden delay time value.
submit(_oTuple2, DO);
} else {
// Delay time is as specified at the start of the application. No override possible.
_delay = $DELAY;
}
appTrc(Trace.error, "IN: FlightInfo tuple " + (rstring)++_cnt +
" with a delay of " + (rstring)_delay + " second(s)=" + (rstring) FI);
submit(FI, FIN);
// We will randomly delete the 12th data tuple we submitted that is being delaye for a while.
if (_cnt == 12) {
mutable DeleteTupleType oTuple = {};
oTuple.flightId = FI.flightId;
submit(oTuple, DT);
// Let us also send an invalid flight id to be deleted (just for testing).
oTuple.flightId = "XYZ";
submit(oTuple, DT);
}
}
}
// This operator will hold the incoming tuples for a specified duration and then send them out
// after that delay is fully realized. If a new tuple arrives with the same key, then an earlier tuple
// with the same key being held/delayed will be replaced with the newly arrived tuple and the
// count down for the delay time will be started freshly.
//
// 1) First input stream is the regular data tuple that needs to be held/delayed for a
// configured amount of time.
//
// 2) Second input stream is where the periodic timer signal that must be sent to this operator in order to
// check for the delay completion time and then send the fully delayed tuples for downstream consumption.
//
// 3) Third input stream is used to override/change the delay time in the middle of the operation.
// Simply send a tuple to this port with a float64 attribute named `delay` set to a new delayTime value.
// This input stream will be effective only when the operator parameter allowDelayOverride is
// set to true. If it is set to false, any tuple received via this stream will be ignored.
// When the delay time is changed via this stream, any future data tuples sent into this
// operator will be delayed for the duration of the newly overridden delay time.
// All the existing data tuples being held inside this operator will not be affected by
// the newly overridden delay time and they will be delayed for the duration that was in effect before
// the override happened.
//
// 4) Fourth input stream is used in situations where there is a need to delete
// an existing data tuple that is being held inside this operator's memory. Simply send a
// tuple via this input stream with an attribute (named as the value of this operator's key parameter)
// carrying an identifier required to locate the data tuple to be deleted from
// this operator's internal in-memory data structure.
//
// 5) Fifth input stream is optional and it lets this operator to be queried to return a snapshot of its
// internal in-memory cache where the tuples are being held and delayed at a given point in time.
// Simply send any dummy tuple into this port as a snapshot query signal.
// This optional input port works in conjunction with the second optional output port of this operator as
// explained below.
//
// Output streams:
// a) First output port will carry the original data tuples that were held inside this operator and
// then got evicted after a preset time delay.
// b) Second (optional) output port will carry the snapshot of the tuples that are being held inside
// this operator when a query is made via the optional fifth input port as explained above.
// This output port should use the same stream schema as the first output port.
//
(stream<FlightType> FlightInfoAfterDelayCompletion; stream<FlightType> FlightInfoBeingDelayed) =
KeyedDelay(FlightIn; TimerSignal; DelayOverride; DeleteTuple; SnapshotQuerySignal) {
param
// Attribute in the incoming tuple that will be used as a key.
key: FlightIn.flightId;
// Duration (in milliseconds) for which data tuples should be held/delayed inside this operator.
// Those tuples will be sent out after that delay is fully realized.
// It takes a float64 value.
delay: $DELAY;
// Do we want to allow the user to change the delay time value in the middle of the operation?
// It takes a boolean value.
// If there is going to be a maximum of 100K entries held inside this operator, it is okay to
// allow the user to change the delay time value in between different tuples.
// For more than 100K entries, changing the delay time value arbitrarily will
// have a performance impact in iterating over the entire data structure inside this
// operator to find the fully delayed entries. This parameter lets the operator execute an
// optimized logic if the user wants to use the same delay time for the full life of this
// operator versus letting the user override the delay time arbitrarily via the third input stream.
allowDelayOverride: $ALLOW_DELAY_OVERRIDE;
// An optional parameter below can be used to specify a file name into which
// this operator will write any data tuples held in its memory at the time of
// operator shutdown. That file can be read later by the application code
// to reprocess those data tuples during the next application run.
shutdownSnapshotFileName: "/tmp/keyed-delay-data-tuples.text_file";
}
// Display the outgoing flight info tuple.
() as FlightOut = Custom(FlightInfoAfterDelayCompletion as FIADC) {
logic
state: {
mutable int32 _cnt = 0;
}
onTuple FIADC: {
appTrc(Trace.error, "OUT: FlightInfo tuple " +(rstring) ++ _cnt + "=" +(rstring) FIADC);
}
}
// Display the delayed tuples snapshot information.
// Snapshot of delayed tuples will arrive here one tuple at a time.
// When a window marker arrives, that is an indication that the snapshot of delayed tuples is fully done.
() as SnapshotResult = Custom(FlightInfoBeingDelayed as FIBD) {
logic
state: {
mutable int32 _cnt = 0;
}
onTuple FIBD: {
if (++_cnt == 1) {
printStringLn("========================");
printStringLn(ctime(getTimestamp()) + " Current snapshot of the tuples getting delayed is as shown below.");
}
printStringLn("FlightInfo " + (rstring)_cnt + ": " + (rstring)FIBD);
}
onPunct FIBD: {
// This punctuation indicates that the snapshot information is over.
if (currentPunct() == Sys.WindowMarker) {
_cnt = 0;
printStringLn("========================");
}
}
}
// ===============================================================================
// Following block of code demonstrates how to overide the delay for every tuple via
// a delay override attribute present in the regaular data tuple.
//
// Display the incoming flight info tuple.
(stream<FlightWithDelayOverrideType> FlightIn2 as FIN2;
stream<DelayOverrideType> DelayOverride2 as DO2;
stream<DeleteTupleType> DeleteTuple2 as DT2;
stream<rstring str> SnapshotQuerySignal2) = Custom(FlightInfo as FI) {
logic
state: {
mutable int32 _cnt = 0;
mutable FlightWithDelayOverrideType _oTuple1 = {};
mutable float64 _delay = 0.0;
list<float64> _delayList = [90.00, 120.00, 60.00, 180.00];
}
onTuple FI: {
// We are allowed to override the delay time whenever we want.
// Let us play with that feature here by changing the delay time for this tuple.
_delay = _delayList[_cnt % size(_delayList)];
assignFrom(_oTuple1, FI);
_oTuple1.delay = _delay;
// We are overriding the delay value via an attribute in our data tuple.
// This is a different way overriding the delay value when compared to
// what is being done in the previous block of the code snippet.
// [Read the commentary at the top of this file to learn about the two ways of overriding a delay value.]
submit(_oTuple1, FIN2);
appTrc(Trace.error, "IN2: FlightInfo tuple " + (rstring)++_cnt +
"=" + (rstring)_oTuple1);
}
}
// See the previous invocation of this same operator above for detailed parameter documentation.
(stream<FlightWithDelayOverrideType> FlightInfoAfterDelayCompletion2;
stream<FlightWithDelayOverrideType> FlightInfoBeingDelayed2) =
KeyedDelay(FlightIn2; TimerSignal; DelayOverride2; DeleteTuple2; SnapshotQuerySignal2) {
param
key: FlightIn2.flightId;
delay: $DELAY;
allowDelayOverride: true;
delayOverrideAttribute: FlightIn2.delay;
}
// Display the outgoing flight info tuple.
() as FlightOut2 = Custom(FlightInfoAfterDelayCompletion2 as FIADC) {
logic
state: {
mutable int32 _cnt = 0;
}
onTuple FIADC: {
appTrc(Trace.error, "OUT2: FlightInfo tuple " +(rstring) ++ _cnt + "=" +(rstring) FIADC);
}
}
// ===============================================================================
}