Toolkits > SampleShellPipe 1.1.0 > sample > TestShellPipeSynchronize.spl
This sample application executes a Perl script with the ShellPipe operator, flowing Streams tuple attributes through the script, synchronizing the tuple flow with the script's input and output, using an upstream Gate operator.
The script copies each word on input lines read from STDIN to a separate output line written to STDOUT, prefixing them with word numbers and the number of characters in each word, and following them with a separate delimiter output line. The operator writes lines of text consumed from input tuples to STDIN, and produces output tuples containing words read from STDOUT. (This application does not expect the script to write anything to STDERR. If something is written to STDERR, it will be logged in the Streams application trace.) An upstream Gate operator releases one input tuple at a time to the ShellPipe operator, then blocks until a downstream Custom operator receives a delimiter line from the script, indicating that it has finished processing the input line. The Custom operator copies attributes with matching names and types from each input tuple to all of the output tuples it produces. This may be what you need when Streams applications and shell commands cannot execute concurrently.
The ShellPipe operator normally executes its command asynchronously from Streams, and the shell buffers the data flowing between them. This is good for performance, because it allows Streams and the command to execute concurrently. But, it allows the operator to consume multiple input tuples before the script's output from previous tuples is received. That is, input and output tuples may get out of sync, so the operator may copy attributes from later input tuples to earlier output tuples. In this application, the Gate and Custom operators block the Streams flow while the script processes each input tuple, until a delimiter line is received from the script.
Note: When a Streams application waits for a specific output line from a ShellPipe command, such as the delimiter line in this application, normal shell buffering may cause a deadlock. This can happen when the line the Streams application is waiting for is written into a partially-filled shell buffer. The shell will not normally send output buffers to Streams until they are full, and a synchronized Stream application like this one will not send more input to the shell until it receives the output that's already in the buffer, so the command will not write any more output to the shell, so its buffer will never fill. To avoid this deadlock, ensure that the command flushes its output buffer after writing the specific output line the application is waiting for. In Bash scripts, output lines from the echo command are flushed automatically. In Perl scripts, you can ensure the output lines are flushed automatically like this:
use IO::Handle; STDERR->autoflush(1); STDOUT->autoflush(1);
This sample application illustrates one possible solution to the problem of copying attributes from input tuples to output tuples when Streams and a shell command cannot execute concurrently. The sample::TestShellPipeCopyAttributes_1to1_paired and sample::TestShellPipeCopyAttributes_1toN_queued applications show other solutions to this problem for scripts that can execute concurrently with Streams.
The Perl script used by this sample application can be tested independently of Streams by entering these commands at a Linux prompt:
cd .../samples/SampleShellPipe cat ./data/ozymandias.txt | ./etc/wordCounterWithAutoflush.pl - -----
composite TestShellPipeSynchronize { param expression<rstring> $inputFile: dataDirectory() + "/ozymandias.txt"; expression<rstring> $shellCommand: "./etc/wordCounterWithAutoflush.pl"; expression<rstring> $tokenDelimiter: "-"; expression<rstring> $lineDelimiter: "-----"; type InputType = float64 inputTimestamp, int64 inputLineNumber, rstring inputLine; OutputType = float64 inputTimestamp, int64 inputLineNumber, rstring inputLine, int64 outputWordNumber, int64 outputWordLength, rstring outputWord; graph // create a stream of tuples containing text lines for the ShellPipe operators below to consume stream<InputType> InputStream as Out = FileSource() { param file: $inputFile; format: line; output Out: inputTimestamp = getTimestampInSecs(), inputLineNumber = TupleNumber(); } () as DebugInputStream = FileSink(InputStream) { param file: "debug.TestShellPipeSynchronize.InputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } // send one input tuple at a time to the shell command, then wait for an acknowledgement from the command that the input line has been completely processed stream<InputType> GatedInputStream = Gate(InputStream ; GateStream) { param maxUnackedTupleCount: 1; numTuplesToAck: ackCount; } //() as DebugGatedInputStream = FileSink(GatedInputStream) { param file: "debug.TestShellPipeSynchronize.GatedInputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } // execute the shell command, writing input attributes to STDIN and reading output attributes from STDOUT ( stream<InputType, tuple<rstring stdoutLine>> STDOUTStream ; stream<rstring stderrLine> STDERRStream ) = ShellPipe(GatedInputStream) { param command: $shellCommand + " " + $tokenDelimiter + " " + $lineDelimiter; stdinAttribute: inputLine; stdoutAttribute: "stdoutLine"; stderrAttribute: "stderrLine"; } //() as DebugSTDOUTStream = FileSink(STDOUTStream) { param file: "debug.TestShellPipeSynchronize.STDOUTStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } //() as DebugSTDERRStream = FileSink(STDERRStream) { param file: "debug.TestShellPipeSynchronize.STDERRStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } // parse the output lines, and send the line delimiters to the Gate operator, which will send the next input tuple to the script ( stream<OutputType> OutputStream ; stream<int32 ackCount> GateStream ) = Custom(STDOUTStream) { logic state: { mutable OutputType outputTuple; } onTuple STDOUTStream: { if (stdoutLine==$lineDelimiter) { submit( { ackCount=1 }, GateStream); } else { list<rstring> tokens = tokenize(stdoutLine, $tokenDelimiter, true); if (size(tokens)!=3) return; assignFrom(outputTuple, STDOUTStream); outputTuple.outputWordNumber = (int64)tokens[0]; outputTuple.outputWordLength = (int64)tokens[1]; outputTuple.outputWord = tokens[2]; submit(outputTuple, OutputStream); } } } () as DebugOutputStream = FileSink(OutputStream) { param file: "debug.TestShellPipeSynchronize.OutputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } // log STDERR lines to Streams application trace () as LogSTDERRStream = Custom(STDERRStream) { logic onTuple STDERRStream: { appTrc(Trace.info, "STDERR from command '" + $shellCommand + "': " + stderrLine); } } }