015_join_at_work

/*
This example shows one of the power-packed standard toolkit operators;
i.e. Join. This operator is so versatile that it is hard to do justice in 
explaining it thoroughly in a simple example such as this one. 
This example provides coverage to the following Join operator features.

a) Inner Join
b) Inner (Equi) Join
c) Left Outer Join
d) Right Outer Join
e) Full Outer Join
*/
namespace my.sample;

composite Main {
	type
		employee = tuple<rstring name, uint32 employeeDepartment>;
		department = tuple<uint32 departmentId, rstring departmentName>;
		orders = tuple<uint32 orderId, rstring customerName, rstring orderDate, rstring product, uint32 quantity>;
		inventory = tuple<uint32 productId, rstring productName, uint32 quantity, float32 price>;
		
	graph
		stream <employee> EmployeeRecord = FileSource() {
			param
				file:	"EmployeeRecords.txt";
				format:	csv;
				hasDelayField: true;
				initDelay: 2.0;
		} // End of EmployeeRecord = FileSource()
				
		stream <department> DepartmentRecord = FileSource() {
			param
				file:	"DepartmentRecords.txt";
				format:	csv;
				hasDelayField: true;
				initDelay: 2.0;
		} // End of DepartmentRecord = FileSource()

		stream <orders> OrderRecord = FileSource() {
			param
				file:	"orders.txt";
				format:	csv;
				hasDelayField: true;
				initDelay: 2.0;
		} // End of OrderRecord = FileSource()
		
		stream <inventory> InventoryRecord = FileSource() {
			param
				file:	"inventory.txt";
				format:	csv;
				hasDelayField: true;
				initDelay: 2.0;
		} // End of InventoryRecord = FileSource()
		
		// Inner Join of two streams.
		stream <employee, department> InnerJoin1 = Join(EmployeeRecord; DepartmentRecord) {
			window
				EmployeeRecord: sliding, count(20);
				DepartmentRecord: sliding, count(20);
				
			param
				match: EmployeeRecord.employeeDepartment == DepartmentRecord.departmentId;
				algorithm: inner;
		} // End of InnerJoin1 = Join(EmployeeRecord; DepartmentRecord)
		
		// Inner (Equi) Join of two streams.
		stream <orders, inventory, tuple<uint32 quantity2>> InnerEquiJoin1 = Join(OrderRecord; InventoryRecord) {
			window
				OrderRecord: sliding, delta(orderId, 1u);
				InventoryRecord: sliding, time(7);
			
			param
				algorithm: inner;
				match: productId == 125u;
				equalityLHS: product;
				equalityRHS: productName;
				
			output
				InnerEquiJoin1: 
					quantity = OrderRecord.quantity,
					quantity2 = InventoryRecord.quantity;
		} // End of InnerEquiJoin1 = Join(OrderRecord, InventoryRecord)
		
		// Left Outer Join.
		// It has two output ports; First port to send the matched tuples and
		// the second port to send the unmatched tuples.
		(stream <orders> LeftOuterJoin1; 
		 stream <orders> LeftOuterJoin2)  = Join(OrderRecord; InventoryRecord) {
			window		
				OrderRecord: sliding, time(6); 
				InventoryRecord: sliding, time(6);
			
			param
				algorithm: leftOuter;
				match: product == productName;
		} // End of (... LeftOuterJoin2) = Join(OrderRecord, InventoryRecord)		
		
		// Right Outer Join. It has only one output port.
		stream <orders, inventory, tuple<uint32 quantity2>> RightOuterJoin1 = Join(OrderRecord; InventoryRecord) {
			window		
				OrderRecord: sliding, count(5); 
				InventoryRecord: sliding, count(5);
			
			param
				algorithm: rightOuter;
				match: product == productName;
				defaultTupleLHS: {orderId=0u, customerName="XXXXX", orderDate="XXXXX", product="XXXXX", quantity=0u};
			
			output
				RightOuterJoin1: 
					quantity = OrderRecord.quantity,
					quantity2 = InventoryRecord.quantity;
		} // End of RightOuterJoin1 = Join(OrderRecord, InventoryRecord)

		// Outer Join. It has three output ports.
		(stream <orders, inventory, tuple<uint32 quantity2>> OuterJoin1;
		 stream <orders> OuterJoin2;
		 stream <inventory> OuterJoin3) = Join(OrderRecord; InventoryRecord) {
			window		
				OrderRecord: sliding, time(6); 
				InventoryRecord: sliding, time(6);
			
			param
				algorithm: outer;
				match: product == productName;
			
			output
				OuterJoin1: 
					quantity = OrderRecord.quantity,
					quantity2 = InventoryRecord.quantity;
		} // End of (... OuterJoin3) = Join(OrderRecord, InventoryRecord)
	
		() as ScreenWriter1 = Custom(InnerJoin1) {
			logic
				state: 
					mutable int32 joinedTupleCnt = 0;
				
				onTuple InnerJoin1: {
					if (joinedTupleCnt++ == 0) {
						printStringLn("\na)Tuples joined during Inner Join with sliding count(5)");
					} // End of if (joinedTupleCnt++ == 0)
					
					printStringLn ((rstring) joinedTupleCnt + "a)" + (rstring) InnerJoin1);
				} // End of onTuple InnerJoin1
		} // End of Custom(InnerJoin1)
		
		() as ScreenWriter2 = Custom(InnerEquiJoin1) {
			logic
				state: 
					mutable int32 joinedTupleCnt = 0;
				
				onTuple InnerEquiJoin1: {
					if (joinedTupleCnt++ == 0) {
						printStringLn("\nb)Tuples joined during Inner Equi Join with sliding delta(orderId, 1u)");
					} // End of if (joinedTupleCnt++ == 0)
					
					printStringLn ((rstring) joinedTupleCnt + "b)" + (rstring) InnerEquiJoin1);
				} // End of onTuple InnerJoin1
		} // End of Custom(InnerJoin1)		

		() as ScreenWriter3 = Custom(LeftOuterJoin1) {
			logic
				state: 
					mutable int32 joinedTupleCnt = 0;
				
				onTuple LeftOuterJoin1: {
					if (joinedTupleCnt++ == 0) {
						printStringLn("\nc)Tuples joined during Left Outer Join with sliding time(6)");
					} // End of if (joinedTupleCnt++ == 0)
					
					printStringLn ((rstring) joinedTupleCnt + "c)" + (rstring) LeftOuterJoin1);
				} // End of onTuple LeftOuterJoin1
		} // End of Custom(LeftOuterJoin1)		

		() as ScreenWriter4 = Custom(LeftOuterJoin2) {
			logic
				state: 
					mutable int32 joinedTupleCnt = 0;
				
				onTuple LeftOuterJoin2: {
					if (joinedTupleCnt++ == 0) {
						printStringLn("\nd)Tuples failed to match during Left Outer Join with sliding time(6)");
					} // End of if (joinedTupleCnt++ == 0)
					
					printStringLn ((rstring) joinedTupleCnt + "d)" + (rstring) LeftOuterJoin2);
				} // End of onTuple LeftOuterJoin2
		} // End of Custom(LeftOuterJoin2)

		() as ScreenWriter5 = Custom(RightOuterJoin1) {
			logic
				state: 
					mutable int32 joinedTupleCnt = 0;
				
				onTuple RightOuterJoin1: {
					if (joinedTupleCnt++ == 0) {
						printStringLn("\ne)Tuples matched and unmatched during Right Outer Join with sliding count(5)");
					} // End of if (joinedTupleCnt++ == 0)
					
					printStringLn ((rstring) joinedTupleCnt + "e)" + (rstring) RightOuterJoin1);
				} // End of onTuple RightOuterJoin1		
		} // End of Custom(RightOuterJoin1)	

		() as ScreenWriter6 = Custom(OuterJoin1) {
			logic
				state: 
					mutable int32 joinedTupleCnt = 0;
				
				onTuple OuterJoin1: {
					if (joinedTupleCnt++ == 0) {
						printStringLn("\nf)Tuples matched during Outer Join with sliding time(6)");
					} // End of if (joinedTupleCnt++ == 0)
					
					printStringLn ((rstring) joinedTupleCnt + "f)" + (rstring) OuterJoin1);
				} // End of onTuple OuterJoin1
		} // End of Custom(OuterJoin1)

		() as ScreenWriter7 = Custom(OuterJoin2) {
			logic
				state: 
					mutable int32 joinedTupleCnt = 0;
				
				onTuple OuterJoin2: {
					if (joinedTupleCnt++ == 0) {
						printStringLn("\ng)Tuples unmatched on left during Outer Join with sliding time(6)");
					} // End of if (joinedTupleCnt++ == 0)
					
					printStringLn ((rstring) joinedTupleCnt + "g)" + (rstring) OuterJoin2);
				} // End of onTuple OuterJoin2
		} // End of Custom(OuterJoin2)

		() as ScreenWriter8 = Custom(OuterJoin3) {
			logic
				state: 
					mutable int32 joinedTupleCnt = 0;
				
				onTuple OuterJoin3: {
					if (joinedTupleCnt++ == 0) {
						printStringLn("\nh)Tuples unmatched on right during Outer Join with sliding time(6)");
					} // End of if (joinedTupleCnt++ == 0)
					
					printStringLn ((rstring) joinedTupleCnt + "h)" + (rstring) OuterJoin3);
				} // End of onTuple OuterJoin3
		} // End of Custom(OuterJoin3)
} // End of composite Main.