SPL File SynchronizeTwoStreams.spl

com.ibm.streamsx.plumbing > synchronize 1.0.0 > com.ibm.streamsx.plumbing.sample.synchronize > SynchronizeTwoStreams.spl

Content

Operators
  • SynchronizeTwoStreams: This sample application for the Synchronize operator reads a pair of streams from files.

Composites

composite SynchronizeTwoStreams

This sample application for the Synchronize operator reads a pair of streams from files. Each file contains synthentic timestamps and text labels, with even- and odd-numbered values in different files.

The Synchronize operator synchronizes the streams according to the timestamps and merges them into a single output file.

SPL Source Code


 public composite SynchronizeTwoStreams {
 
 	type SampleType = float64 time, rstring label;
 	
 	graph 
 	
 	// produce a stream of tuples from the lines in a file
 
 	stream<SampleType> EvenStream = FileSource() { param file: getThisToolkitDir()+"/opt/"+"sync2.even.in"; format: csv; }
 	
 	// produce another stream of tuples from the lines in a different file
 
 	stream<SampleType> OddStream = FileSource() { param file: getThisToolkitDir()+"/opt/"+"sync2.odd.in"; format: csv; }
 	
 	// synchronize the two streams of tuples
 
 	( stream<SampleType> SynchronizedEvenStream ;
 	  stream<SampleType> SynchronizedOddStream ) = com.ibm.streamsx.plumbing.synchronize::Synchronize(
                                                        EvenStream as In1 ; 
                                                        OddStream as In2 ) {
 	param 
 		timeAttributes: In1.time, In2.time; 
 		timeFactor: 2.0;
 	config threadedPort: 
 		queue(In1, Sys.DropFirst, 100),
 		queue(In2, Sys.DropFirst, 100); }	
 
 	// merge the two streams of synchronized tuples and dump to console
 	() as OutputPrinter= Custom(SynchronizedEvenStream, SynchronizedOddStream as I){
 		logic state: {
 			mutable int32 tuplesReceived = 0;
 		}
 		onTuple I : {
 			printStringLn((rstring)I);
 			tuplesReceived++;
 		}		
 	}
 
 	// merge the two streams of synchronized tuples and write them to a file
 	//() as SynchronizedSink = FileSink(SynchronizedEvenStream, SynchronizedOddStream) { param file: "SynchronizeTwoStreams.SynchronizedStreams.out"; format: txt; flush: 1u; writePunctuations: true; }
 }