SPL File SynchronizeTwoStreamsWithPunctuation.spl

com.ibm.streamsx.plumbing > synchronize 1.0.0 > com.ibm.streamsx.plumbing.sample.synchronize > SynchronizeTwoStreamsWithPunctuation.spl

Content

Operators

Composites

composite SynchronizeTwoStreamsWithPunctuation

This sample application for the Synchronize operator reads a pair of streams from three pairs of files. Each file contains synthentic timestamps and text labels, with even- and odd-numbered values in each pair of files. Punctuation is automatically inserted into the streams at the end of each file.

The Synchronize operator synchronizes the streams according to the timestamps and merges them into a single output file.

SPL Source Code


 public composite SynchronizeTwoStreamsWithPunctuation {
 
 	type SampleType = float64 time, rstring label;
 	
 	graph 
 
 	// produce three tuples containing integer values 1, 2, and 3 
 
 	stream<int32 i> NumberStream as Out = Beacon() { 
 	param iterations: 3;  period: 0.0; 
 	output Out: i = (int32)IterationCount() + 1; }
 		
 	// transform the three tuples above into string values "syncN.even.in"
 
 	stream<rstring filename> EvenFilenameStream as Out = Functor(NumberStream as In) { 
 	output Out: filename = getThisToolkitDir()+"/opt/"+"sync" + (rstring)i + ".even.in"; 
 	config threadedPort: queue(In, Sys.DropFirst, 100); }
 
 	// transform the three tuples above into string values "syncN.odd.in"
 
 	stream<rstring filename> OddFilenameStream as Out = Functor(NumberStream as In) { 
 	output Out: filename = getThisToolkitDir()+"/opt/"+"sync" + (rstring)i + ".odd.in"; 
 	config threadedPort:  queue(In, Sys.DropFirst, 100); }
 	
 	// produce a stream of tuples from the 'even' files named above, with punctuation after each file's tuples
 
 	stream<SampleType> EvenStream = FileSource(EvenFilenameStream) { param format: csv; initDelay: 0.0; }
 	
 	// produce another stream of tuples from the 'odd' files named above, with punctuation after each file's tuples
 
 	stream<SampleType> OddStream = FileSource(OddFilenameStream) { param format: csv; initDelay: 0.0; }
 
 	// synchronize the two streams of tuples
 	
 	( stream<SampleType> SynchronizedEvenStream ;
 	  stream<SampleType> SynchronizedOddStream ) = com.ibm.streamsx.plumbing.synchronize::Synchronize(
                                                      EvenStream as In1 ; 
                                                      OddStream as In2 ) {
 	param
 		timeAttributes: In1.time, In2.time; 
 		timeFactor: 2.0; }
 
 	// merge the two streams of synchronized tuples and dump to console
 	() as OutputPrinter= Custom(SynchronizedEvenStream, SynchronizedOddStream as I){
 		logic state: {
 			mutable int32 tuplesReceived = 0;
 		}
 		onTuple I : {
 			printStringLn((rstring)I);
 			tuplesReceived++;
 		}
 		onPunct I: {
 			if (currentPunct() == Sys.WindowMarker) {
 				printStringLn((rstring)currentPunct());
 			}
 		}		
 	}
 
 	// merge the two streams of synchronized tuples and write them to a file
 //	() as SynchronizedSink = FileSink(SynchronizedEvenStream, SynchronizedOddStream) { param file: "SynchronizeTwoStreamsWithPunctuation.SynchronizedStreams.out"; format: txt; flush: 1u; writePunctuations: true; }
 
 
 	// log intermediate streams when debugging
 //	() as EvenSink = FileSink(EvenStream) { param file: "debug.SynchronizeTwoStreamsWithPunctuation.EvenStream.out"; format: txt; flush: 1u; writePunctuations: true; }
 //	() as OddSink = FileSink(OddStream) { param file: "debug.SynchronizeTwoStreamsWithPunctuation.OddStream.out"; format: txt; flush: 1u; writePunctuations: true; }
 
 
 }