com.ibm.streamsx.plumbing > synchronize 1.0.0 > com.ibm.streamsx.plumbing.sample.synchronize > SynchronizeTwoStreamsWithPunctuation.spl
This sample application for the Synchronize operator reads a pair of streams from three pairs of files. Each file contains synthentic timestamps and text labels, with even- and odd-numbered values in each pair of files. Punctuation is automatically inserted into the streams at the end of each file.
The Synchronize operator synchronizes the streams according to the timestamps and merges them into a single output file.
public composite SynchronizeTwoStreamsWithPunctuation { type SampleType = float64 time, rstring label; graph // produce three tuples containing integer values 1, 2, and 3 stream<int32 i> NumberStream as Out = Beacon() { param iterations: 3; period: 0.0; output Out: i = (int32)IterationCount() + 1; } // transform the three tuples above into string values "syncN.even.in" stream<rstring filename> EvenFilenameStream as Out = Functor(NumberStream as In) { output Out: filename = getThisToolkitDir()+"/opt/"+"sync" + (rstring)i + ".even.in"; config threadedPort: queue(In, Sys.DropFirst, 100); } // transform the three tuples above into string values "syncN.odd.in" stream<rstring filename> OddFilenameStream as Out = Functor(NumberStream as In) { output Out: filename = getThisToolkitDir()+"/opt/"+"sync" + (rstring)i + ".odd.in"; config threadedPort: queue(In, Sys.DropFirst, 100); } // produce a stream of tuples from the 'even' files named above, with punctuation after each file's tuples stream<SampleType> EvenStream = FileSource(EvenFilenameStream) { param format: csv; initDelay: 0.0; } // produce another stream of tuples from the 'odd' files named above, with punctuation after each file's tuples stream<SampleType> OddStream = FileSource(OddFilenameStream) { param format: csv; initDelay: 0.0; } // synchronize the two streams of tuples ( stream<SampleType> SynchronizedEvenStream ; stream<SampleType> SynchronizedOddStream ) = com.ibm.streamsx.plumbing.synchronize::Synchronize( EvenStream as In1 ; OddStream as In2 ) { param timeAttributes: In1.time, In2.time; timeFactor: 2.0; } // merge the two streams of synchronized tuples and dump to console () as OutputPrinter= Custom(SynchronizedEvenStream, SynchronizedOddStream as I){ logic state: { mutable int32 tuplesReceived = 0; } onTuple I : { printStringLn((rstring)I); tuplesReceived++; } onPunct I: { if (currentPunct() == Sys.WindowMarker) { printStringLn((rstring)currentPunct()); } } } // merge the two streams of synchronized tuples and write them to a file // () as SynchronizedSink = FileSink(SynchronizedEvenStream, SynchronizedOddStream) { param file: "SynchronizeTwoStreamsWithPunctuation.SynchronizedStreams.out"; format: txt; flush: 1u; writePunctuations: true; } // log intermediate streams when debugging // () as EvenSink = FileSink(EvenStream) { param file: "debug.SynchronizeTwoStreamsWithPunctuation.EvenStream.out"; format: txt; flush: 1u; writePunctuations: true; } // () as OddSink = FileSink(OddStream) { param file: "debug.SynchronizeTwoStreamsWithPunctuation.OddStream.out"; format: txt; flush: 1u; writePunctuations: true; } }