/*
This example demonstrates how two different SPL applications can
share streams between them. This is an important feature that is
elegantly done using the two pseudo operators called Export and Import.
This application also shows how two different main composites can be
part of the same application by using two different namespaces.
As an aside, there is also a demonstration of using a Custom operator
to customize the Beacon generated tuples by involving state variables.
*/
namespace my.sample2;
composite Main {
type
employee = tuple<rstring name, uint32 employeeDepartment>;
department = tuple<uint32 departmentId, rstring departmentName>;
ticker = tuple<rstring symbol, float32 price, uint32 quantity, rstring tradeType>;
graph
stream <department> DepartmentRecord = FileSource() {
param
file: "DepartmentRecords.txt";
format: csv;
hasDelayField: true;
initDelay: 30.0;
} // End of DepartmentRecord = FileSource()
() as ExportedDepartmentRecord = Export(DepartmentRecord) {
param
streamId: "ExportedDepartmentRecord";
} // End of ExportedDepartmentRecord = Export(DepartmentRecord)
stream <employee> EmployeeRecord = Import() {
param
applicationName: "my.sample::Main";
streamId: "ExportedEmployeeRecord";
} // End of EmployeeRecord = Import()
// Inner Join of two streams.
stream <employee, department> InnerJoin1 = Join(EmployeeRecord; DepartmentRecord) {
window
EmployeeRecord: sliding, count(14);
DepartmentRecord: sliding, count(14);
param
match: EmployeeRecord.employeeDepartment == DepartmentRecord.departmentId;
algorithm: inner;
} // End of InnerJoin1 = Join(EmployeeRecord; DepartmentRecord)
() as ScreenWriter1 = Custom(InnerJoin1) {
logic
state:
mutable int32 joinedTupleCnt = 0;
onTuple InnerJoin1: {
if (joinedTupleCnt++ == 0) {
printStringLn("\nb)Tuples joined during Inner Join with sliding count(14)");
} // End of if (joinedTupleCnt++ == 0)
printStringLn ((rstring) joinedTupleCnt + "b)" + (rstring) InnerJoin1);
} // End of onTuple InnerJoin1
} // End of Custom(InnerJoin1)
// Let us import the automotive sector tickers exported by
// the other main composite in the my.sample namespace.
stream <ticker> ImportedAutomotiveTickers = Import() {
param
subscription:
(sector == "Automotive") &&
("GM" in symbols || "F" in symbols);
} // End of ImportedAutomotiveTickers = Import()
// Let us import the pharma sector tickers exported by
// the other main composite in the my.sample namespace.
stream <ticker> ImportedPharmaTickers = Import() {
param
subscription:
(sector == "Pharma") &&
("PFE" in symbols || "LLY" in symbols || "BMS" in symbols);
} // End of ImportedAutomotiveTickers = Import()
() as ScreenWriter2 = Custom(ImportedAutomotiveTickers) {
logic
state:
mutable int32 automotiveTupleCnt = 0;
onTuple ImportedAutomotiveTickers: {
if (automotiveTupleCnt++ == 0) {
printStringLn("\nc)Imported Automotive tuples from my.sample::Main");
} // End of if (automotiveTupleCnt++ == 0)
printStringLn ((rstring) automotiveTupleCnt + "c)" + (rstring) ImportedAutomotiveTickers);
} // End of onTuple ImportedAutomotiveTickers
} // End of Custom(ImportedAutomotiveTickers)
() as ScreenWriter3 = Custom(ImportedPharmaTickers) {
logic
state:
mutable int32 pharmaTupleCnt = 0;
onTuple ImportedPharmaTickers: {
if (pharmaTupleCnt++ == 0) {
printStringLn("\nd)Imported Pharma tuples from my.sample::Main");
} // End of if (pharmaTupleCnt++ == 0)
printStringLn ((rstring) pharmaTupleCnt + "d)" + (rstring) ImportedPharmaTickers);
} // End of onTuple ImportedPharmaTickers
} // End of Custom(ImportedPharmaTickers)
} // End of the main composite.