Toolkits > SampleShellPipe 1.1.0 > sample > TestShellPipeCopyAttributes_1toN_naive.spl
This sample application executes a Perl script with the ShellPipe operator, flowing Streams tuples through the script asynchronously, naively copying input attributes to output tuples.
The script copies each word on input lines read from STDIN to a separate output line written to STDOUT, prefixing them with word numbers and the number of characters in each word, and following them with a separate delimiter output line. The operator writes lines of text consumed from input tuples to STDIN, and produces output tuples containing words read from STDOUT. (This application does not expect the script to write anything to STDERR. If something is written to STDERR, it will be logged in the Streams application trace.) The operator copies other output attributes from the most recently consumed input tuple. This is probably not what you want.
The ShellPipe operator executes its command asynchronously from Streams, and the shell buffers the data flowing between them. This is good for performance, because it allows Streams and the command to execute concurrently. But, it allows the operator to consume multiple input tuples before the script's output from previous tuples is received. That is, input and output tuples may get out of sync. In this application, the operator may copy attributes from later input tuples to earlier output tuples.
This sample application illustrates the problem of copying attributes directly from input tuples to output tuples that are out of sync due to buffering and asynchronous execution in a shell. The sample::TestShellPipeCopyAttributes_1toN_queued application shows a solution to this problem for scripts that write multiple output lines for each input line, that is, for scripts with a "1 to N" ratio of input to output lines.
The Perl script used by this sample application can be tested independently of Streams by entering these commands at a Linux prompt:
cd .../samples/SampleShellPipe cat ./data/ozymandias.txt | ./etc/wordCounter.pl - -----
composite TestShellPipeCopyAttributes_1toN_naive { param expression<rstring> $inputFile: dataDirectory() + "/ozymandias.txt"; expression<rstring> $shellCommand: "./etc/wordCounter.pl"; expression<rstring> $tokenDelimiter: "-"; expression<rstring> $lineDelimiter: "-----"; type InputType = float64 inputTimestamp, int64 inputLineNumber, rstring inputLine; OutputType = float64 inputTimestamp, int64 inputLineNumber, rstring inputLine, int64 outputWordNumber, int64 outputWordLength, rstring outputWord; graph // create a stream of tuples containing text lines for the ShellPipe operators below to consume stream<InputType> InputStream as Out = FileSource() { param file: $inputFile; format: line; output Out: inputTimestamp = getTimestampInSecs(), inputLineNumber = TupleNumber(); } () as DebugInputStream = FileSink(InputStream) { param file: "debug.TestShellPipeCopyAttributes_1toN_naive.InputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } // execute a Perl script that copies lines from STDIN to STDOUT, and implicitly copy input attributes from most recently received input tuple ( stream<InputType, tuple<rstring stdoutLine>> STDOUTStream ; stream<rstring stderrLine> STDERRStream ) = ShellPipe(InputStream) { param command: $shellCommand + " " + $tokenDelimiter + " " + $lineDelimiter; stdinAttribute: inputLine; stdoutAttribute: "stdoutLine"; stderrAttribute: "stderrLine"; } //() as DebugSTDOUTStream = FileSink(STDOUTStream) { param file: "debug.TestShellPipeCopyAttributes_1toN_naive.STDOUTStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } //() as DebugSTDERRStream = FileSink(STDERRStream) { param file: "debug.TestShellPipeCopyAttributes_1toN_naive.STDERRStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } // parse the lines written to STDOUT by the command, and copy input attributes with matching name and type stream<OutputType> OutputStream = Custom(STDOUTStream) { logic state: { mutable OutputType outputTuple; } onTuple STDOUTStream: { if (stdoutLine==$lineDelimiter) return; list<rstring> tokens = tokenize(stdoutLine, $tokenDelimiter, true); if (size(tokens)!=3) return; assignFrom(outputTuple, STDOUTStream); outputTuple.outputWordNumber = (int64)tokens[0]; outputTuple.outputWordLength = (int64)tokens[1]; outputTuple.outputWord = tokens[2]; submit(outputTuple, OutputStream); } } () as DebugOutputStream = FileSink(OutputStream) { param file: "debug.TestShellPipeCopyAttributes_1toN_naive.OutputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; } // log STDERR lines to Streams application trace () as LogSTDERRStream = Custom(STDERRStream) { logic onTuple STDERRStream: { appTrc(Trace.info, "STDERR from command '" + $shellCommand + "': " + stderrLine); } } }