Examples

IBMStreams com.ibm.streamsx.objectstorage Toolkit > com.ibm.streamsx.objectstorage 2.2.5 > com.ibm.streamsx.objectstorage.s3 > S3ObjectStorageSink > Examples

These examples use the S3ObjectStorageSink operator.

a) S3ObjectStorageSink creating objects of size 200 bytes with incremented number in object name. As endpoint is the public us-geo (CROSS REGION) the default value of the os-endpoint submission parameter.

composite Main {
    param
        expression<rstring> $accessKeyID : getSubmissionTimeValue("os-access-key-id");
        expression<rstring> $secretAccessKey : getSubmissionTimeValue("os-secret-access-key");
        expression<rstring> $bucket: getSubmissionTimeValue("os-bucket");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
    graph
        stream<rstring i> SampleData = Beacon()  {
            param
                period: 0.1;
            output SampleData: i = (rstring)IterationCount();
        }

        () as osSink = com.ibm.streamsx.objectstorage.s3::S3ObjectStorageSink(SampleData) {
            param
                accessKeyID : $accessKeyID;
                secretAccessKey : $secretAccessKey;
                bucket : $bucket;
                objectName : "%OBJECTNUM.txt";
                endpoint : $endpoint;
                bytesPerObject: 200l;
        }
}

b) S3ObjectStorageSink creating objects in parquet format. Objects are created in parquet format after $timePerObject in seconds

composite Main {
    param
        expression<rstring> $accessKeyID : getSubmissionTimeValue("os-access-key-id");
        expression<rstring> $secretAccessKey : getSubmissionTimeValue("os-secret-access-key");
        expression<rstring> $bucket: getSubmissionTimeValue("os-bucket");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
        expression<float64> $timePerObject: 10.0;

    type
        S3ObjectStorageSinkOut_t = tuple<rstring objectName, uint64 size>;

    graph

        stream<rstring username, uint64 id> SampleData = Beacon() {
            param
               period: 0.1;
            output
                SampleData : username = "Test"+(rstring) IterationCount(), id = IterationCount();
        }

        stream<S3ObjectStorageSinkOut_t> ObjStSink = com.ibm.streamsx.objectstorage.s3::S3ObjectStorageSink(SampleData) {
            param
                accessKeyID : $accessKeyID;
                secretAccessKey : $secretAccessKey;
                bucket : $bucket;
                endpoint : $endpoint;
                objectName: "sample_%TIME.snappy.parquet";
                timePerObject : $timePerObject;
                storageFormat: "parquet";
                parquetCompression: "SNAPPY";
        }

        () as SampleSink = Custom(ObjStSink as I) {
            logic
                onTuple I: {
                     printStringLn("Object with name '" + I.objectName + "' of size '" + (rstring)I.size + "' has been created.");
                }
        }
}