com.ibm.streamsx.plumbing > synchronize 1.0.0 > com.ibm.streamsx.plumbing.sample.synchronize > SynchronizeTwoStreams.spl
This sample application for the Synchronize operator reads a pair of streams from files. Each file contains synthentic timestamps and text labels, with even- and odd-numbered values in different files.
The Synchronize operator synchronizes the streams according to the timestamps and merges them into a single output file.
public composite SynchronizeTwoStreams { type SampleType = float64 time, rstring label; graph // produce a stream of tuples from the lines in a file stream<SampleType> EvenStream = FileSource() { param file: getThisToolkitDir()+"/opt/"+"sync2.even.in"; format: csv; } // produce another stream of tuples from the lines in a different file stream<SampleType> OddStream = FileSource() { param file: getThisToolkitDir()+"/opt/"+"sync2.odd.in"; format: csv; } // synchronize the two streams of tuples ( stream<SampleType> SynchronizedEvenStream ; stream<SampleType> SynchronizedOddStream ) = com.ibm.streamsx.plumbing.synchronize::Synchronize( EvenStream as In1 ; OddStream as In2 ) { param timeAttributes: In1.time, In2.time; timeFactor: 2.0; config threadedPort: queue(In1, Sys.DropFirst, 100), queue(In2, Sys.DropFirst, 100); } // merge the two streams of synchronized tuples and dump to console () as OutputPrinter= Custom(SynchronizedEvenStream, SynchronizedOddStream as I){ logic state: { mutable int32 tuplesReceived = 0; } onTuple I : { printStringLn((rstring)I); tuplesReceived++; } } // merge the two streams of synchronized tuples and write them to a file //() as SynchronizedSink = FileSink(SynchronizedEvenStream, SynchronizedOddStream) { param file: "SynchronizeTwoStreams.SynchronizedStreams.out"; format: txt; flush: 1u; writePunctuations: true; } }