Examples

IBMStreams com.ibm.streamsx.objectstorage Toolkit > com.ibm.streamsx.objectstorage 2.2.5 > com.ibm.streamsx.objectstorage > ObjectStorageSink > Examples

These examples use the ObjectStorageSink operator.

a) ObjectStorageSink with static object name closed on window marker.

Beacon operator sends 5000 tuples and window marker afterwards. ObjectStorageSink operator writes 5000 tuples to the object and closes the object on window marker.

Sample is using cos application configuration with property cos.creds to specify the IBM COS credentials: Set the objectStorageURI either in format "cos://<bucket-name>/" or "s3a://<bucket-name>/". As endpoint is the public us-geo (CROSS REGION) the default value of the os-endpoint submission parameter.

composite Main {
    param
        expression<rstring> $objectStorageURI: getSubmissionTimeValue("os-uri");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
    graph
        stream<rstring i> SampleData = Beacon()  {
            param
                iterations: 5000;
                period: 0.1;
            output SampleData: i = (rstring)IterationCount();
        }

        () as osSink = com.ibm.streamsx.objectstorage::ObjectStorageSink(SampleData) {
            param
                objectStorageURI: $objectStorageURI;
                objectName : "static_name.txt";
                endpoint : $endpoint;
        }
}

b) ObjectStorageSink creating objects of size 200 bytes with incremented number in object name.

Sample is using the credentials parameter to specify the IBM COS credentials. Set the objectStorageURI either in format "cos://<bucket-name>/" or "s3a://<bucket-name>/". As endpoint is the public us-geo (CROSS REGION) the default value of the os-endpoint submission parameter.

composite Main {
    param
        expression<rstring> $credentials: getSubmissionTimeValue("os-credentials");
        expression<rstring> $objectStorageURI: getSubmissionTimeValue("os-uri");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
    graph
        stream<rstring i> SampleData = Beacon()  {
            param
                period: 0.1;
            output SampleData: i = (rstring)IterationCount();
        }

        () as osSink = com.ibm.streamsx.objectstorage::ObjectStorageSink(SampleData) {
            param
                credentials: $credentials;
                objectStorageURI: $objectStorageURI;
                objectName : "%OBJECTNUM.txt";
                endpoint : $endpoint;
                bytesPerObject: 200l;
        }
}

c) ObjectStorageSink creating objects in parquet format.

Operator reads IAM credentials from application configuration. Ensure that cos application configuration with property cos.creds has been created. Objects are created in parquet format after $timePerObject in seconds

composite Main {
    param
        expression<rstring> $objectStorageURI: getSubmissionTimeValue("os-uri", "cos://streams-sample-001/");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
        expression<float64> $timePerObject: 10.0;

    type
        S3ObjectStorageSinkOut_t = tuple<rstring objectName, uint64 size>;

    graph

        stream<rstring username, uint64 id> SampleData = Beacon() {
            param
                period: 0.1;
            output
                SampleData : username = "Test"+(rstring) IterationCount(), id = IterationCount();
        }

        stream<S3ObjectStorageSinkOut_t> ObjStSink = com.ibm.streamsx.objectstorage::ObjectStorageSink(SampleData) {
            param
                objectStorageURI: $objectStorageURI;
                endpoint : $endpoint;
                objectName: "sample_%TIME.snappy.parquet";
                timePerObject : $timePerObject;
                storageFormat: "parquet";
                parquetCompression: "SNAPPY";
        }

        () as SampleSink = Custom(ObjStSink as I) {
             logic
                onTuple I: {
                    printStringLn("Object with name '" + I.objectName + "' of size '" + (rstring)I.size + "' has been created.");
                }
        }
}