Streams DPS Toolkit > com.ibm.streamsx.dps 4.1.8 > Developing and running applications that use the DPS Toolkit
source product-installation-root-directory/4.2.0.0/bin/streamsprofile.sh Download the most recent version of the DPS toolkit from here and extract it in your IBM Streams development machine. https://github.com/IBMStreams/streamsx.dps
All the functionality of the toolkit is available via the native functions in the com.ibm.streamsx.store.distributed and com.ibm.streamsx.lock.distributed namespaces. You need to include these namespaces in your SPL application via a use directive. See the "Getting Started" section below and the native function documentation has more details on how to use the toolkit within applications written in SPL. If you have a C++ or Java operator or function that uses the DPS toolkit, your SPL application graph must also include an instance of the DPSAux operator. This operator does not require any additional configuration but must be present in order for your application to work correctly. See the DPSUsageFromJava sample for an example.
Ensure that <dps_toolkit_home>/impl/java/lib/dps-helper.jar, which contains the Java implementation of the DPS functions is accessible to your application. Packages of interest are com.ibm.streamsx.dps and com.ibm.streamsx.dl. See the "Getting Started" section below and the Javadoc for details on using the API from operators written in Java.
PrimitiveOperator(name="MyJavaOperator", namespace="com.ibm.demo", description="Java Operator MyJavaOperator") @SharedLoader(true) public class MyJavaOperator extends AbstractOperator { ... }
Include the C++ header file DistributedProcessStoreWrappers.h found in impl/include. This is the main entry point for the C++ functions, which are in the C++ namespace com::ibm::streamsx::store::distributed.
The following snippets demonstrate the basic usage of the toolkit from SPL and Java. Usage from C++ is very similar to the SPL example below.
SPL:
rstring dummyRstring = ""; uint32 dummyUint32 = 0u; mutable uint64 err = 0ul; mutable uint64 dbStore_handle = 0ul; dbStore_handle = dpsCreateStore("myDBStore1", dummyRstring, dummyUint32, err); if (err == 0ul ) { //no error occurred //create lock for the store mutable uint64 lock_id = dlCreateOrGetLock("My db store lock", err); // Acquire the newly created lock, specifying a lease time and maximum time to wait to acquire the lock. float64 max_wait = 10.0; float64 lease_time = 10.0; dlAcquireLock(lock_id, lease_time, max_wait, err); //add a key/value pair to the store mutable boolean result = true; rstring key = "IBM"; uint32 value = 399; err = 0ul; result = dpsPut(dbStore_handle, key, value, err); if (err != 0ul) { //use dpsGetLastStoreErrorCode() and dpsGetLastStoreErrorString() as needed } // finished our store operations, release the lock err = 0ul; dlReleaseLock(lock_id, err); }
Java:
StoreFactory sf = DistributedStores.getStoreFactory(); Store store = null; try { //specify the SPL types for the keys and values in the store String keyType = "rstring"; String valueType = "int32"; store = sf.createOrGetStore("Java Test Store1", keyType, valueType); } catch (StoreFactoryException sfe) { // use sfe.getErrorCode() and sfe.getErrorMessage()) for more info } ... //once ready to access the store, //get the lock for the store, may have previously been created LockFactory lf = DistributedLocks.getLockFactory(); Lock myLock = lf.createOrGetLock("Lock_For_Test_Store1"); // Acquire the lock try { myLock.acquireLock(); } catch (LockException le) { System.out.print("Unable to acquire the lock named 'Lock_For_Test_Store1'"); System.out.println(" Error = " + le.getErrorCode() + ", Error msg = " + le.getErrorMessage()); throw le; } //perform store operations store.put("IBM", 39); store.put("Lenovo", 50); //release the lock when finished myLock.releaseLock();
Note that error checking in the above examples is minimal.
Most C++ functions include a mutable err parameter that will contain the result of executing the function. If an error occurs, this variable's value will be non-zero. It is the caller's responsibility to provide a mutable parameter to contain the error code and check its value afterwards.
The following example shows how to check for errors, after a function call, in this case, after creating a lock:
mutable uint64 err = 0ul; mutable uint64 lock_id = dlCreateOrGetLock("My Sentinel Lock1", err); if (err != 0ul) { rstring msg = dlGetLastDistributedLockErrorString(); uint64 rc = dlGetLastDistributedLockErrorCode(); printStringLn("Error creating lock, rc = " + (rstring)(rc) + ", msg =" + msg ); }
To specifically learn how to call the DPS APIs from SPL native functions, C++ and Java primitive operators, see the samples included in <STREAMS_INSTALL>/samples/com.ibm.streamsx.dps or in <YOUR_DPS_TOOLKIT_DIRECTORY>/samples directory. The advanced sub-directory there contains examples that showcase bulk of the available DPS APIs.