Toolkits > SampleShellPipe 1.1.0 > sample > TestShellPipeCopyAttributes_1to1_paired.spl
This sample application executes a Perl script with the ShellPipe operator, flowing Streams tuples through the script asynchronously, then pairing the ShellPipe operator's input and output tuples, using a downstream Barrier operator.
The script copies lines of text from STDIN to STDOUT, prefixing them with line numbers and the number of characters on each line. The operator writes lines of text consumed from input tuples to STDIN, and produces output tuples containing lines of text read from STDOUT. (This application does not expect the script to write anything to STDERR. If something is written to STDERR, it will be logged in the Streams application trace.) A downstream Barrier operator pairs input tuples and output tuples and copies attributes with matching names and types. This is probably what you want for scripts that write one output line for each input line, that is, for scripts with a "1 to 1" ratio of input and output lines.
The ShellPipe operator executes its command asynchronously from Streams, and the shell buffers the data flowing between them. This is good for performance, because it allows Streams and the command to execute concurrently. But, it allows the operator to consume multiple input tuples before the script's output from previous tuples is received. That is, input and output tuples may get out of sync, so it should not be used to copy attributes from input tuples to output tuples. In this application, the Barrier operator copies attributes into output tuples from the corresponding input tuple.
This sample application illustrates one possible solution to the problem of copying attributes from input tuples to output tuples that are out of sync due to buffering and asynchronous execution in a shell. The sample::TestShellPipeCopyAttributes_1toN_queued application shows another solution to this problem for scripts that write more than one output line for each input line.
The Perl script used by this sample application can be tested independently of Streams by entering these commands at a Linux prompt:
cd .../samples/SampleShellPipe cat ./data/ozymandias.txt | ./etc/lineCounter.pl -
composite TestShellPipeCopyAttributes_1to1_paired { param expression<rstring> $inputFile: dataDirectory() + "/ozymandias.txt"; expression<rstring> $shellCommand: "./etc/lineCounter.pl"; expression<rstring> $tokenDelimiter: "-"; type InputType = float64 inputTimestamp, int64 inputLineNumber, rstring inputLine; OutputType = float64 inputTimestamp, int64 inputLineNumber, rstring inputLine, int64 outputLineNumber, int64 outputLineLength, rstring outputLine; graph // create a stream of tuples containing text lines for the ShellPipe operators below to consume stream<InputType> InputStream as Out = FileSource() { param file: $inputFile; format: line; output Out: inputTimestamp = getTimestampInSecs(), inputLineNumber = TupleNumber(); } () as DebugInputStream = FileSink(InputStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_paired.InputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } // execute the shell command, writing input attributes to STDIN and reading output attributes from STDOUT ( stream<rstring stdoutLine> STDOUTStream ; stream<rstring stderrLine> STDERRStream ) = ShellPipe(InputStream) { param command: $shellCommand + " " + $tokenDelimiter; stdinAttribute: inputLine; stdoutAttribute: "stdoutLine"; stderrAttribute: "stderrLine"; } //() as DebugSTDOUTStream = FileSink(STDOUTStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_paired.STDOUTStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } //() as DebugSTDERRStream = FileSink(STDERRStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_paired.STDERRStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } // pair each tuple consumed by the ShellPipe operator with the tuple it produced, and copy matching output attributes from the input tuple stream<InputType, tuple<rstring stdoutLine>> PairedSTDOUTStream = Barrier(InputStream ; STDOUTStream) {} //() as DebugPairedSTDOUTStream = FileSink(PairedSTDOUTStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_paired.PairedSTDOUTStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } // parse the lines written to STDOUT by the command, and copy matching attributes stream<OutputType> OutputStream = Custom(PairedSTDOUTStream) { logic state: { mutable OutputType outputTuple; } onTuple PairedSTDOUTStream: { list<rstring> tokens = tokenize(stdoutLine, $tokenDelimiter, true); if (size(tokens)!=3) return; assignFrom(outputTuple, PairedSTDOUTStream); outputTuple.outputLineNumber = (int64)tokens[0]; outputTuple.outputLineLength = (int64)tokens[1]; outputTuple.outputLine = tokens[2]; submit(outputTuple, OutputStream); } } () as DebugOutputStream = FileSink(OutputStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_paired.OutputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } // log STDERR lines to Streams application trace () as LogSTDERRStream = Custom(STDERRStream as In) { logic onTuple In: { appTrc(Trace.info, "STDERR from command '" + $shellCommand + "': " + stderrLine); } } }