com.ibm.streamsx.plumbing > synchronize 1.0.0 > com.ibm.streamsx.plumbing.sample.synchronize > SynchronizeTwoStreams.spl
This sample application for the Synchronize operator reads a pair of streams from files. Each file contains synthentic timestamps and text labels, with even- and odd-numbered values in different files.
The Synchronize operator synchronizes the streams according to the timestamps and merges them into a single output file.
public composite SynchronizeTwoStreams {
type SampleType = float64 time, rstring label;
graph
// produce a stream of tuples from the lines in a file
stream<SampleType> EvenStream = FileSource() { param file: getThisToolkitDir()+"/opt/"+"sync2.even.in"; format: csv; }
// produce another stream of tuples from the lines in a different file
stream<SampleType> OddStream = FileSource() { param file: getThisToolkitDir()+"/opt/"+"sync2.odd.in"; format: csv; }
// synchronize the two streams of tuples
( stream<SampleType> SynchronizedEvenStream ;
stream<SampleType> SynchronizedOddStream ) = com.ibm.streamsx.plumbing.synchronize::Synchronize(
EvenStream as In1 ;
OddStream as In2 ) {
param
timeAttributes: In1.time, In2.time;
timeFactor: 2.0;
config threadedPort:
queue(In1, Sys.DropFirst, 100),
queue(In2, Sys.DropFirst, 100); }
// merge the two streams of synchronized tuples and dump to console
() as OutputPrinter= Custom(SynchronizedEvenStream, SynchronizedOddStream as I){
logic state: {
mutable int32 tuplesReceived = 0;
}
onTuple I : {
printStringLn((rstring)I);
tuplesReceived++;
}
}
// merge the two streams of synchronized tuples and write them to a file
//() as SynchronizedSink = FileSink(SynchronizedEvenStream, SynchronizedOddStream) { param file: "SynchronizeTwoStreams.SynchronizedStreams.out"; format: txt; flush: 1u; writePunctuations: true; }
}