SPL File TestShellPipeCopyAttributes_1to1_naive.spl

Toolkits > SampleShellPipe 1.1.0 > sample > TestShellPipeCopyAttributes_1to1_naive.spl

Content

Operators
  • TestShellPipeCopyAttributes_1to1_naive: This sample application executes a Perl script with the ShellPipe operator, flowing Streams tuples through the script asynchronously, naively copying input attributes to output tuples.

Composites

public composite TestShellPipeCopyAttributes_1to1_naive

This sample application executes a Perl script with the ShellPipe operator, flowing Streams tuples through the script asynchronously, naively copying input attributes to output tuples.

The script copies lines of text from STDIN to STDOUT, prefixing them with line numbers and the number of characters on each line. The operator writes lines of text consumed from input tuples to STDIN, and produces output tuples containing lines of text read from STDOUT. (This application does not expect the script to write anything to STDERR. If something is written to STDERR, it will be logged in the Streams application trace.) The operator copies other output attributes from the most recently consumed input tuple. This is probably not what you want.

The ShellPipe operator executes its command asynchronously from Streams, and the shell buffers the data flowing between them. This is good for performance, because it allows Streams and the command to execute concurrently. But, it allows the operator to consume multiple input tuples before the script's output from previous tuples is received. That is, input and output tuples may get out of sync. In this application, the operator may copy attributes from later input tuples to earlier output tuples.

This sample application illustrates the problem of copying attributes directly from input tuples to output tuples that are out of sync due to buffering and asynchronous execution in a shell. The sample::TestShellPipeCopyAttributes_1to1_paired application shows a solution to this problem for scripts that write one output line for each input line, that is, for scripts with a "1 to 1" ratio of input to output lines.

The Perl script used by this sample application can be tested independently of Streams by entering these commands at a Linux prompt:

cd .../samples/SampleShellPipe
cat ./data/ozymandias.txt | ./etc/lineCounter.pl - 

Parameters

SPL Source Code


 composite TestShellPipeCopyAttributes_1to1_naive {
 
     param
     expression<rstring> $inputFile: dataDirectory() + "/ozymandias.txt";
     expression<rstring> $shellCommand: "./etc/lineCounter.pl";
     expression<rstring> $tokenDelimiter: "-";
 
     type 
 
     InputType =
         float64 inputTimestamp,
         int64   inputLineNumber,
         rstring inputLine;
 
     OutputType =
         float64 inputTimestamp,
         int64   inputLineNumber,
         rstring inputLine,
         int64   outputLineNumber,
         int64   outputLineLength,
         rstring outputLine;
 
     graph
 
 
     // create a stream of tuples containing text lines for the ShellPipe operators below to consume
     stream<InputType> InputStream as Out = FileSource() {
     param
         file: $inputFile;
         format: line;
    output Out:
         inputTimestamp = getTimestampInSecs(),
         inputLineNumber = TupleNumber(); }
     () as DebugInputStream = FileSink(InputStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_naive.InputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; }
 
 
     // execute the shell command, writing input attributes to STDIN and reading output attributes from STDOUT, and copying other output attributes from the most recently consumed input tuple
     ( stream<InputType, tuple<rstring stdoutLine>> STDOUTStream ;
       stream<rstring stderrLine> STDERRStream ) = ShellPipe(InputStream) {
     param
         command: $shellCommand + " " + $tokenDelimiter; 
         stdinAttribute: inputLine;
         stdoutAttribute: "stdoutLine";
         stderrAttribute: "stderrLine"; }
     //() as DebugSTDOUTStream = FileSink(STDOUTStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_naive.STDOUTStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; }
 
 
     // parse the numbers and text written to STDOUT by the command, and copy other output attributes from the input tuple
     stream<OutputType> OutputStream = Custom(STDOUTStream) {
     logic 
     state: { mutable OutputType outputTuple; }
     onTuple STDOUTStream: {
         list<rstring> tokens = tokenize(stdoutLine, $tokenDelimiter, true);
         if (size(tokens)!=3) return;
         assignFrom(outputTuple, STDOUTStream); 
         outputTuple.outputLineNumber = (int64)tokens[0];
         outputTuple.outputLineLength = (int64)tokens[1];
         outputTuple.outputLine = tokens[2];
         submit(outputTuple, OutputStream);
     } }
     () as DebugOutputStream = FileSink(OutputStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_naive.OutputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; }
 
 
     // log STDERR lines to Streams application trace 
     () as LogSTDERRStream = Custom(STDERRStream as In) {
     logic onTuple In: {
           appTrc(Trace.info, "STDERR from command '" + $shellCommand + "': " + stderrLine); } }
 
 }