SPL File TestShellPipeCopyAttributes_1toN_queued.spl

Toolkits > SampleShellPipe 1.1.0 > sample > TestShellPipeCopyAttributes_1toN_queued.spl

Content

Operators
  • TestShellPipeCopyAttributes_1toN_queued: This sample application executes a Perl script with the ShellPipe operator, flowing Streams tuples through the script asynchronously, then matching the ShellPipe operator's output tuples with the input tuple that caused them, using a downstream Custom operator.

Composites

public composite TestShellPipeCopyAttributes_1toN_queued

This sample application executes a Perl script with the ShellPipe operator, flowing Streams tuples through the script asynchronously, then matching the ShellPipe operator's output tuples with the input tuple that caused them, using a downstream Custom operator.

The script copies each word on input lines read from STDIN to a separate output line written to STDOUT, prefixing them with word numbers and the number of characters in each word, and following them with a separate delimiter output line. The operator writes lines of text consumed from input tuples to STDIN, and produces output tuples containing words read from STDOUT. (This application does not expect the script to write anything to STDERR. If something is written to STDERR, it will be logged in the Streams application trace.) A downstream Custom operator enqueues input tuples and matches output tuples to them, based on the delimiter output lines, and copies attributes with matching names and types. This is probably what you want for scripts that write multiple output lines for each input line, that is, for scripts with a "1 to N" ratio of input and output lines.

The ShellPipe operator executes its command asynchronously from Streams, and the shell buffers the data flowing between them. This is good for performance, because it allows Streams and the command to execute concurrently. But, it allows the operator to consume multiple input tuples before the script's output from previous tuples is received. That is, input and output tuples may get out of sync, so it should not be used to copy attributes from input tuples to output tuples. In this application, the Custom operator copies attributes into output tuples from the matching input tuple.

This sample application illustrates one possible solution to the problem of copying attributes from input tuples to output tuples that are out of sync due to buffering and asynchronous execution in a shell. The sample::TestShellPipeCopyAttributes_1to1_paired application shows another solution to this problem for scripts that write one output line for each input line.

Note: The Perl script used by this sample application can be tested independently of Streams by entering these commands at a Linux prompt:

cd .../samples/SampleShellPipe
cat ./data/ozymandias.txt | ./etc/wordCounter.pl - -----

Parameters

SPL Source Code


 composite TestShellPipeCopyAttributes_1toN_queued {
 
     param
     expression<rstring> $inputFile: dataDirectory() + "/ozymandias.txt";
     expression<rstring> $shellCommand: "./etc/wordCounter.pl";
     expression<rstring> $tokenDelimiter: "-";
     expression<rstring> $lineDelimiter: "-----";
 
     type 
 
     InputType =
         float64 inputTimestamp,
         int64   inputLineNumber,
         rstring inputLine;
 
     OutputType =
         float64 inputTimestamp,
         int64   inputLineNumber,
         rstring inputLine,
         int64   outputWordNumber,
         int64   outputWordLength,
         rstring outputWord;
     
 
     graph
 
 
     // create a stream of tuples containing text lines for the ShellPipe operators below to consume
     stream<InputType> InputStream as Out = FileSource() {
     param
         file: $inputFile;
         format: line;
    output Out:
         inputTimestamp = getTimestampInSecs(),
         inputLineNumber = TupleNumber(); }
     () as DebugInputStream = FileSink(InputStream) { param file: "debug.TestShellPipeCopyAttributes_1toN_queued.InputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; }
 
 
     // execute the shell command, writing input attributes to STDIN and reading output attributes from STDOUT
     ( stream<rstring stdoutLine> STDOUTStream ;
       stream<rstring stderrLine> STDERRStream ) = ShellPipe(InputStream) {
     param
         command: $shellCommand + " " + $tokenDelimiter + " " + $lineDelimiter;
         stdinAttribute: inputLine;
         stdoutAttribute: "stdoutLine";
         stderrAttribute: "stderrLine"; }
     //() as DebugSTDOUTStream = FileSink(STDOUTStream) { param file: "debug.TestShellPipeCopyAttributes_1toN_queued.STDOUTStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; }
     //() as DebugSTDERRStream = FileSink(STDERRStream) { param file: "debug.TestShellPipeCopyAttributes_1toN_queued.STDERRStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; }
         
 
     // enqueue input tuples and match output tuples to them, based on delimiter output lines, and then parse the output linesa
     stream<OutputType> OutputStream = Custom(InputStream ; STDOUTStream) {
     logic 
     state: { 
         mutable list<InputType> inputQueue;
         mutable OutputType outputTuple; }
     onTuple InputStream: {
         appendM(inputQueue, InputStream); }
     onTuple STDOUTStream: {
         if (stdoutLine==$lineDelimiter) {
             removeM(inputQueue, 0);
         } else {
             list<rstring> tokens = tokenize(stdoutLine, $tokenDelimiter, true);
             if (size(tokens)!=3) return;
             assignFrom(outputTuple, inputQueue[0]);
             outputTuple.outputWordNumber = (int64)tokens[0];
             outputTuple.outputWordLength = (int64)tokens[1];
             outputTuple.outputWord = tokens[2];
             submit(outputTuple, OutputStream); }
     } }
     () as DebugOutputStream = FileSink(OutputStream) { param file: "debug.TestShellPipeCopyAttributes_1toN_queued.OutputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; }
 
 
     // log STDERR lines to Streams application trace
     () as LogSTDERRStream = Custom(STDERRStream) {
     logic onTuple STDERRStream: {
           appTrc(Trace.info, "STDERR from command '" + $shellCommand + "': " + stderrLine); } }
 }