com.ibm.streamsx.plumbing > com.ibm.streamsx.plumbing 1.0.0 > com.ibm.streamsx.plumbing.delay > KeyedDelay
The KeyedDelay operator is used to delay the incoming tuples for a configured amount of seconds before sending them out for downstream processing.
Such incoming tuples requiring a delayed send must arrive on the first input port of this operator. Those tuples are stored inside this operator's memory based on a specified key that is present in one of the tuple attributes. Additionally, this operator will also allow the user to arbitrarily change the delay time on the basis of either per tuple or per group of tuples. It also provides a way for the user to delete an existing tuple being held in memory before its full delay time is realized. If the user wants to get a snapshot of all the tuples currently being delayed, this operator provides a mechanism to do that as well. When a new data tuple arrives on the first input port, then an earlier tuple with the same key already being held/delayed in memory will be replaced with the newly arrived tuple and the count down for the delay time will be started freshly. When the delay time is set to 0, then the incoming data tuples will be sent out immediately without incurring any delay. When the operator parameter allowDelayOverride is set to true, then the delay override can be done using one of the two ways as explained below. First way is to send a separate delay override tuple into the third input port of this operator with a new delay value. Second way is to have an attribute in the regular data tuple going into the first input port of this operator to carry the delay value required for that particular tuple. If the user wants to use the second way, it is necessary to specify the delay attribute name via the delayOverrideAttribute parameter. It is recommended to use one of these two delay override ways during the life of an application to follow a consistent approach instead of using both ways. First way is ideal if the delay override is needed periodically after a batch of incoming data tuples. Second way is ideal if the delay override is needed for every incoming data tuple.
Possible use cases where this operator can be used:
1) Aging out tuples based on application specific criteria.
2) Response timeout detection and replaying data tuples to an external service.
3) Event notification to external consuming applications at a later time.
4) Design patterns requiring storing and forwarding of tuples after a preset delay.
Required: allowDelayOverride, delay, key
Optional: delayOverrideAttribute, shutdownSnapshotFileName
First input port is where the regular data tuples arrive that need to be delayed for a configured amount of time.
Second input port is where the periodic timer signal that must be sent to this operator in order to check for the delay completion time and then send the fully delayed tuples for downstream consumption.
Third input port is used to override/change the delay time in the middle of the operation. This is the first of two possible ways available to do a delay override (Both ways of doing it is explained in the operator description). Simply send a tuple to this port with a float64 attribute named delay set to a new delay time value. This input stream will be effective only when the operator parameter allowDelayOverride is set to true. If it is set to false, any tuple received via this stream will be ignored. When the delay time is changed via this stream, any future data tuples sent into this operator will be delayed for the duration of the newly overridden delay time. All the existing data tuples being held inside this operator will not be affected by the newly overridden delay time and they will be delayed for the duration that was in effect before the override happened.
Fourth input port is used in situations where there is a need to delete an existing data tuple that is being held/delayed inside this operator's memory. Simply send a tuple via this input stream with an attribute (named as the value of this operator's key parameter) carrying an identifier required to locate the data tuple to be deleted from this operator's internal in-memory data structure.
Fifth input port is optional and it lets this operator to be queried to return a snapshot of its internal in-memory cache where the tuples are being held and delayed at a given point in time. Simply send any dummy tuple into this port as a snapshot query signal. This optional input port works in conjunction with the second optional output port of this operator as explained below.
First output port will carry the original data tuples that were held inside this operator and then got evicted after a preset time delay.
Second (optional) output port will carry the snapshot of the tuples that are being held/delayed inside this operator when a query is made via the optional fifth input port as explained above. This output port should use the same stream schema as the first output port.
Required: allowDelayOverride, delay, key
Optional: delayOverrideAttribute, shutdownSnapshotFileName
This mandatory parameter with type boolean specifies whether the user is allowed to override the delay time value in the middle of operation using one of the two ways as explained in the operator description.
This mandatory parameter with type float64 specifies the delay time in number of seconds.
This optional parameter must point to a float64 attribute in the regular data tuples that will carry the delay time in number of seconds for a given data tuple.
This mandatory parameter must point to an attribute in the incoming data tuples that will be used as a key for holding and delaying the tuples inside this operator's memory.
This optional parameter specifies a filename to which the data tuples held in memory should be written when this operator is being shutdown. That file can be read later to reprocess the data tuples during the subsequent runs of this operator.
(stream<${eventStreamType}> ${outputStream1};stream<${eventStreamType}> ${outputStream2}) = KeyedDelay(${eventStream}; ${timerStream}; ${delayOverrideStream}; ${deleteEventStream}; ${delayedTuplesSnapshotQueryStream}) { param key: ${keyAttribute}; delay: ${timeInSeconds}; allowDelayTimeOverride: ${overrideFlag}; }