SPL File SynchronizeThreeStreams.spl

com.ibm.streamsx.plumbing > synchronize 1.0.0 > com.ibm.streamsx.plumbing.sample.synchronize > SynchronizeThreeStreams.spl

Content

Operators
  • SynchronizeThreeStreams: This sample application for the Synchronize operator reads three pre-recorded streams from three files.

Composites

composite SynchronizeThreeStreams

This sample application for the Synchronize operator reads three pre-recorded streams from three files. Two files contain phrases of text from the audio tracks of several short videos, as transcribed by two speech recognition engines. The third file contains phrases of text from the subtitle track of one of the videos. Each tuple created from each line of each file includes the starting time of each phrase, relative to the beginning of its video, and duration of the phrase in the video. Punctuation is manually inserted into the streams at the end of each video's phrases.

The Synchronize operator uses the starting times to synchronize the three streams, which are merged into a single transcript, with each stream's phrases in a separate column, annotated with their starting times and durations.

SPL Source Code


 public composite SynchronizeThreeStreams {
 
 	type PhraseType =  
 		rstring outputDirectory, 
 		rstring outputFilename, 
 		rstring indention,
 		rstring phrase, 
 		float64 start, 
 		float64 duration ;
 	
 	graph 
 
 	// read stream of phrases from first speech recognition engine and insert punctuation between videos
 	
 	stream<PhraseType> Speech1Stream = FileSource() { param file: getThisToolkitDir()+"/opt/"+"sync4.Speech1PhraseStream.in"; format: csv; }
 	
 	stream<PhraseType> PunctuatedSpeech1Stream = Punctor(Speech1Stream) {
 	param punctuate: outputDirectory=="WindowMarker"; position: after; }
 	
 	stream<PhraseType> IndentedPunctuatedSpeech1Stream as Out1 = Functor(PunctuatedSpeech1Stream as Out) {
 	param filter: outputDirectory!="WindowMarker"; 
 	output Out1: indention = ""; }
 
 
 	// read stream of phrases from second speech recognition engine, add indention, and insert punctuation between videos
 	
 	stream<PhraseType> Speech2Stream = FileSource() { param file: getThisToolkitDir()+"/opt/"+"sync4.Speech2PhraseStream.in"; format: csv; }
 	
 	stream<PhraseType> PunctuatedSpeech2Stream = Punctor(Speech2Stream) {
 	param punctuate: outputDirectory=="WindowMarker"; position: after; }
 	
 	stream<PhraseType> IndentedPunctuatedSpeech2Stream as Out2 = Functor(PunctuatedSpeech2Stream) {
 	param filter: outputDirectory!="WindowMarker"; 
 	output Out2: indention = "\t\t\t\t"; }
 	
 
 	// read stream of phrases from subtitle track of one video, add more indention, and insert punctuation between videos
 
 	stream<PhraseType> SubtitleStream = FileSource() { param file: getThisToolkitDir()+"/opt/"+"sync4.SubtitleStream.in"; format: csv; }
 	
 	stream<PhraseType> PunctuatedSubtitleStream = Punctor(SubtitleStream) {
 	param punctuate: outputDirectory=="WindowMarker"; position: after; }
 	
 	stream<PhraseType> IndentedPunctuatedSubtitleStream as OutT = Functor(PunctuatedSubtitleStream) {
 	param filter: outputDirectory!="WindowMarker"; 
 	output OutT: indention = "\t\t\t\t\t\t\t\t"; }
 	
 	// synchronize all three streams
 	
 	( stream<PhraseType> SynchronizedSpeech1Stream ;
 	  stream<PhraseType> SynchronizedSpeech2Stream ;
 	  stream<PhraseType> SynchronizedSubtitleStream ) = com.ibm.streamsx.plumbing.synchronize::Synchronize( 
                                                             IndentedPunctuatedSpeech1Stream as In1; 
                                                             IndentedPunctuatedSpeech2Stream as In2; 
                                                             IndentedPunctuatedSubtitleStream as InT) {
 	param 
 		timeAttributes: In1.start, In2.start, InT.start; 
 		timeFactor: 0.0; // replay as fast as possible
 	config threadedPort: 
 		queue(In1, Sys.DropFirst, 1000),
 		queue(In2, Sys.DropFirst, 1000),
 		queue(InT, Sys.DropFirst, 1000); }	
 
 
 	// merge synchronized streams and dump to console
 	() as OutputPrinter= Custom(SynchronizedSpeech1Stream, SynchronizedSpeech2Stream, SynchronizedSubtitleStream as I){
 		logic state: {
 			mutable int32 tuplesReceived = 0;
 		}
 		onTuple I : {
 			printStringLn((rstring)I);
 			tuplesReceived++;
 		}		
 	}
 
 /*
     	// merge synchronized streams into a single transcript
 	() as Transcript = Custom(SynchronizedSpeech1Stream, SynchronizedSpeech2Stream, SynchronizedSubtitleStream as In) {
 	logic state: {
 		rstring filename = "SynchronizeThreeStreams.transcript.out";
 		mutable uint64 file = 0; }
 	onTuple In: {	
 		mutable int32 error = 0;	
 		if (file==0ul) { 
  			file = fopen(filename , "w", error); 
 			assert(error==0, "sorry, could not open '" + filename + "', " + strerror(error)); }
 		if (length(phrase)>0) {
 			rstring timeline = indention + "at " + SecondsToTimestamp(start) + " for " + formatNumber(duration,1u,3u,false) + " seconds:\n";
 			rstring textline = indention + phrase + "\n\n";
 			fwriteString(timeline+textline, file, error);
 			assert(error==0, "sorry, could not write to '" + filename + "', " + strerror(error)); } }
 	onPunct In: {
 		mutable int32 error = 0;
 		if (file!=0ul) {	
 		fwriteString("------------------------\t------------------------\t------------------------\n", file, error);
 		if (currentPunct()==Sys.FinalMarker) { fclose(file, error); } } }
 	} 
 */
 
     // log intermediate streams when debugging
 
 //  () as Speech1Sink = FileSink(Speech1Stream) { param file: "debug.SynchronizeThreeStreams.Speech1Stream.out"; format: txt; flush: 1u; writePunctuations: true; }
 //  () as PunctuatedSpeech1Sink = FileSink(PunctuatedSpeech1Stream) { param file: "debug.SynchronizeThreeStreams.PunctuatedSpeech1Stream.out"; format: txt; flush: 1u; writePunctuations: true; }
 //  () as IndentedPunctuatedSpeech1Sink = FileSink(IndentedPunctuatedSpeech1Stream) { param file: "debug.SynchronizeThreeStreams.IndentedPunctuatedSpeech1Stream.out"; format: txt; flush: 1u; writePunctuations: true; }
 //	() as Speech2Sink = FileSink(Speech2Stream) { param file: "debug.SynchronizeThreeStreams.Speech2Stream.out"; format: txt; flush: 1u; writePunctuations: true; }
 //  () as PunctuatedSpeech2Sink = FileSink(PunctuatedSpeech2Stream) { param file: "debug.SynchronizeThreeStreams.PunctuatedSpeech2Stream.out"; format: txt; flush: 1u; writePunctuations: true; }
 //  () as IndentedPunctuatedSpeech2Sink = FileSink(IndentedPunctuatedSpeech2Stream) { param file: "debug.SynchronizeThreeStreams.IndentedPunctuatedSpeech2Stream.out"; format: txt; flush: 1u; writePunctuations: true; }
 //  () as SubtitleSink = FileSink(SubtitleStream) { param file: "debug.SynchronizeThreeStreams.SubtitleStream.out"; format: txt; flush: 1u; writePunctuations: true; }
 //  () as PunctuatedSubtitleSink = FileSink(PunctuatedSubtitleStream) { param file: "debug.SynchronizeThreeStreams.PunctuatedSubtitleStream.out"; format: txt; flush: 1u; writePunctuations: true; }
 //  () as IndentedPunctuatedSubtitleSink = FileSink(IndentedPunctuatedSubtitleStream) { param file: "debug.SynchronizeThreeStreams.IndentedPunctuatedSubtitleStream.out"; format: txt; flush: 1u; writePunctuations: true; }
 //  () as SyncSink = FileSink(SynchronizedSpeech1Stream, SynchronizedSpeech2Stream, SynchronizedSubtitleStream) { param file: "debug.SynchronizeThreeStreams.SyncStreams.out"; format: txt; flush: 1u; writePunctuations: true; }
 
 
 }