017_filesource_filesink_at_work

/*
We have used the FileSource and the FileSink operators in other examples
before. However, this example shows off the following intriguing
features that will become handy. 

a) Automatic deletion of a file after the FileSource reads all the records.
b) Flushing the sink file on demand after writing a certain number of tuples.
c) Ability of the FileSource to move the file once it is done with it.
d) Creating a fresh and new output sink file after writing certain number of tuples.
e) Ability of the FileSource to keep reading from a hot file as new CSV records get written.
*/
namespace my.sample;
// In order to test this application, you have to ensure that two 
// sub-directories "test1", "test2" are present inside this application's directory.
// Then, you have to ensure that all the files in "data" directory are copied to "test1".
//
// To test the hotFile function, you have to keep appending new records
// to the test1/DepartmentRecordsFile using a text editor.
composite Main {
	type
		cityData = tuple<rstring city, rstring country, uint32 population, uint32 medianAge, uint32 percentageEducated>;
		employee = tuple<rstring name, uint32 employeeDepartment>;
		department = tuple<uint32 departmentId, rstring departmentName>;	

		graph
			// This FileSource will delete the file after it is fully read..
			stream <cityData> CityDataRecord = FileSource() {
				param
					file: "../test1/Population.txt";
					format: csv;
					deleteFile: true;
					hasDelayField: true;
			} // End of CityDataRecord = FileSource()	
			
		    // This FileSink will flush every 4 tuples.
		    () as FileSink1 = FileSink(CityDataRecord) {
		    	param
		    		file: "../test1/result_population.txt";
		    		flush: 4u;
		    } // End of FileSink1 = FileSink(CityDataRecord)

			// This FileSource will move the file after it is fully read..
			stream <employee> EmployeeRecord = FileSource() {
				param
					file: "../test1/EmployeeRecords.txt";
					format: csv;
					moveFileToDirectory: "../test2";
					hasDelayField: true;
			} // End of EmployeeRecord = FileSource()	
			
		    // This FileSink will create a new output file for every 2 tuples.
		    () as FileSink2 = FileSink(EmployeeRecord) {
		    	param
		    		file: "../test2/result_employee{id}.txt";
		    		closeMode: count;
		    		tuplesPerFile: 2u;
		    } // End of FileSink2 = FileSink(EmployeeRecord)

			// This FileSource will keep reading tuples from a hot file
			stream <department> DepartmentRecord = FileSource() {
				param
					file: "../test1/DepartmentRecords.txt";
					format: csv;
					hotFile: true;
					hasDelayField: true;
			} // End of DepartmentRecord = FileSource()	
			
		    // This FileSink will flush the file for every 2 new tuples.
		    () as FileSink3 = FileSink(DepartmentRecord) {
		    	param
		    		file: "../test1/result_department.txt";
		    		flush: 2u;
		    } // End of FileSink3 = FileSink(DepartmentRecord)
			
}