SPL File TestShellPipeCopyAttributes_1to1_paired.spl

Toolkits > SampleShellPipe 1.1.0 > sample > TestShellPipeCopyAttributes_1to1_paired.spl

Content

Operators
  • TestShellPipeCopyAttributes_1to1_paired: This sample application executes a Perl script with the ShellPipe operator, flowing Streams tuples through the script asynchronously, then pairing the ShellPipe operator's input and output tuples, using a downstream Barrier operator.

Composites

public composite TestShellPipeCopyAttributes_1to1_paired

This sample application executes a Perl script with the ShellPipe operator, flowing Streams tuples through the script asynchronously, then pairing the ShellPipe operator's input and output tuples, using a downstream Barrier operator.

The script copies lines of text from STDIN to STDOUT, prefixing them with line numbers and the number of characters on each line. The operator writes lines of text consumed from input tuples to STDIN, and produces output tuples containing lines of text read from STDOUT. (This application does not expect the script to write anything to STDERR. If something is written to STDERR, it will be logged in the Streams application trace.) A downstream Barrier operator pairs input tuples and output tuples and copies attributes with matching names and types. This is probably what you want for scripts that write one output line for each input line, that is, for scripts with a "1 to 1" ratio of input and output lines.

The ShellPipe operator executes its command asynchronously from Streams, and the shell buffers the data flowing between them. This is good for performance, because it allows Streams and the command to execute concurrently. But, it allows the operator to consume multiple input tuples before the script's output from previous tuples is received. That is, input and output tuples may get out of sync, so it should not be used to copy attributes from input tuples to output tuples. In this application, the Barrier operator copies attributes into output tuples from the corresponding input tuple.

This sample application illustrates one possible solution to the problem of copying attributes from input tuples to output tuples that are out of sync due to buffering and asynchronous execution in a shell. The sample::TestShellPipeCopyAttributes_1toN_queued application shows another solution to this problem for scripts that write more than one output line for each input line.

The Perl script used by this sample application can be tested independently of Streams by entering these commands at a Linux prompt:

cd .../samples/SampleShellPipe
cat ./data/ozymandias.txt | ./etc/lineCounter.pl - 

Parameters

SPL Source Code


 composite TestShellPipeCopyAttributes_1to1_paired {
 
     param
     expression<rstring> $inputFile: dataDirectory() + "/ozymandias.txt";
     expression<rstring> $shellCommand: "./etc/lineCounter.pl";
     expression<rstring> $tokenDelimiter: "-";
 
     type 
 
     InputType =
         float64 inputTimestamp,
         int64   inputLineNumber,
         rstring inputLine;
 
     OutputType =
         float64 inputTimestamp,
         int64   inputLineNumber,
         rstring inputLine,
         int64   outputLineNumber,
         int64   outputLineLength,
         rstring outputLine;
 
 
     graph
 
 
     // create a stream of tuples containing text lines for the ShellPipe operators below to consume
     stream<InputType> InputStream as Out = FileSource() {
     param
         file: $inputFile;
         format: line;
    output Out:
         inputTimestamp = getTimestampInSecs(),
         inputLineNumber = TupleNumber(); }
     () as DebugInputStream = FileSink(InputStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_paired.InputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; }
 
 
     // execute the shell command, writing input attributes to STDIN and reading output attributes from STDOUT
     ( stream<rstring stdoutLine> STDOUTStream ;
       stream<rstring stderrLine> STDERRStream ) = ShellPipe(InputStream) {
     param
         command: $shellCommand + " " + $tokenDelimiter;
         stdinAttribute: inputLine;
         stdoutAttribute: "stdoutLine";
         stderrAttribute: "stderrLine"; }
     //() as DebugSTDOUTStream = FileSink(STDOUTStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_paired.STDOUTStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; }
     //() as DebugSTDERRStream = FileSink(STDERRStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_paired.STDERRStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; }
 
 
     // pair each tuple consumed by the ShellPipe operator with the tuple it produced, and copy matching output attributes from the input tuple
     stream<InputType, tuple<rstring stdoutLine>> PairedSTDOUTStream = Barrier(InputStream ; STDOUTStream) {}
     //() as DebugPairedSTDOUTStream = FileSink(PairedSTDOUTStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_paired.PairedSTDOUTStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; }
 
 
     // parse the lines written to STDOUT by the command, and copy matching attributes 
     stream<OutputType> OutputStream = Custom(PairedSTDOUTStream) {
     logic 
     state: { mutable OutputType outputTuple; }
     onTuple PairedSTDOUTStream: {
         list<rstring> tokens = tokenize(stdoutLine, $tokenDelimiter, true);
         if (size(tokens)!=3) return;
         assignFrom(outputTuple, PairedSTDOUTStream); 
         outputTuple.outputLineNumber = (int64)tokens[0];
         outputTuple.outputLineLength = (int64)tokens[1];
         outputTuple.outputLine = tokens[2];
         submit(outputTuple, OutputStream); } }
     () as DebugOutputStream = FileSink(OutputStream) { param file: "debug.TestShellPipeCopyAttributes_1to1_paired.OutputStream.out"; format: txt; hasDelayField: true; flush: 1u; writePunctuations: true; }
 
 
     // log STDERR lines to Streams application trace 
     () as LogSTDERRStream = Custom(STDERRStream as In) {
     logic onTuple In: {
           appTrc(Trace.info, "STDERR from command '" + $shellCommand + "': " + stderrLine); } }
 
 }