044_streams_checkpointing_at_work

/*
This example shows a key feature of Streams by which an operator's
state variables can be preserved when a PE fails and gets restarted.
This is done through a combination of the SPL configuration directives named
"checkpointing" and "restartable". Developers can protect their critical
operator data by taking advantage of this built-in checkpointing feature. 
When you run this example, you will see data flows without any gaps or
interruption, when a PE is killed manually and then gets restored automatically by
the Streams runtime.

In order to test this example, you have to run in distributed mode.
*/
namespace checkpointing.example;

composite streams_checkpointing_at_work {
	graph
		stream<rstring str, int64 i> Test1 = Beacon() {
			param
				period: 1.0;
				iterations: 4000;
				
			output
				// Send the value of seconds elapsed since epoch.
				Test1: str = (rstring)getSeconds(getTimestamp()), i = getSeconds(getTimestamp());
		}
		
		stream<rstring str, int64 i, int64 j> CheckpointedStream = Custom(Test1) {
			logic
				state: {
					// In a terminal window, run "streamtool lspes" command and note down
					// the pid for this PE identified by its output stream name.
					// In that same terminal window, run "kill -9 <pid>" for that PE's pid.
					// When this PE is forcefully killed, Streams runtime will automatically
					// restore the value held by this state variable at the time of the crash. 
					mutable int64 _cnt = 0;	
				}
				
				onTuple Test1: {
					_cnt++;
					mutable CheckpointedStream _oTuple = {};
					assignFrom(_oTuple, Test1);
					_oTuple.j = _cnt;
					// You can check this output from the PE console log as shown below.
					// After you kill the PE, it will automatically get restarted by the
					// Streams runtime. Its internal state also will get restored from the
					// latest checkpoint that was taken just before the crash. You should see
					// no missing data in the flow as logged in the following file.
					// cat /tmp/Streams-<domain_id>/logs/<host_name>/instances/<streams_instance_name>/jobs/<job_id>/pec.pe.<peId>.stdouterror
					printStringLn("_oTuple = " + (rstring)_oTuple);
					submit(_oTuple, CheckpointedStream);
				}
				
			config
				// We will do a periodic checkpoint of this operator every two seconds.
				checkpoint: periodic(2.0);
				restartable: true;
				relocatable: true;
		}
		
		stream<int64 minValue, int64 maxValue, int64 avgValue> AggregatedStream = Aggregate(CheckpointedStream) {
			window
				CheckpointedStream: sliding, count(50), count(1);
			
			output
				AggregatedStream: minValue = Min(j), maxValue = Max(j), avgValue = Average(j);
		}
		
		() as FileWriter = FileSink(AggregatedStream) {
			param
				file: "AggOutput.csv";
				append: true;
		
		}
		
		
}