IBMStreams com.ibm.streamsx.objectstorage Toolkit > com.ibm.streamsx.objectstorage 2.2.5 > com.ibm.streamsx.objectstorage.s3 > S3ObjectStorageSink > Examples
These examples use the S3ObjectStorageSink operator.
a) S3ObjectStorageSink creating objects of size 200 bytes with incremented number in object name. As endpoint is the public us-geo (CROSS REGION) the default value of the os-endpoint submission parameter.
composite Main { param expression<rstring> $accessKeyID : getSubmissionTimeValue("os-access-key-id"); expression<rstring> $secretAccessKey : getSubmissionTimeValue("os-secret-access-key"); expression<rstring> $bucket: getSubmissionTimeValue("os-bucket"); expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud"); graph stream<rstring i> SampleData = Beacon() { param period: 0.1; output SampleData: i = (rstring)IterationCount(); } () as osSink = com.ibm.streamsx.objectstorage.s3::S3ObjectStorageSink(SampleData) { param accessKeyID : $accessKeyID; secretAccessKey : $secretAccessKey; bucket : $bucket; objectName : "%OBJECTNUM.txt"; endpoint : $endpoint; bytesPerObject: 200l; } }
b) S3ObjectStorageSink creating objects in parquet format. Objects are created in parquet format after $timePerObject in seconds
composite Main { param expression<rstring> $accessKeyID : getSubmissionTimeValue("os-access-key-id"); expression<rstring> $secretAccessKey : getSubmissionTimeValue("os-secret-access-key"); expression<rstring> $bucket: getSubmissionTimeValue("os-bucket"); expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud"); expression<float64> $timePerObject: 10.0; type S3ObjectStorageSinkOut_t = tuple<rstring objectName, uint64 size>; graph stream<rstring username, uint64 id> SampleData = Beacon() { param period: 0.1; output SampleData : username = "Test"+(rstring) IterationCount(), id = IterationCount(); } stream<S3ObjectStorageSinkOut_t> ObjStSink = com.ibm.streamsx.objectstorage.s3::S3ObjectStorageSink(SampleData) { param accessKeyID : $accessKeyID; secretAccessKey : $secretAccessKey; bucket : $bucket; endpoint : $endpoint; objectName: "sample_%TIME.snappy.parquet"; timePerObject : $timePerObject; storageFormat: "parquet"; parquetCompression: "SNAPPY"; } () as SampleSink = Custom(ObjStSink as I) { logic onTuple I: { printStringLn("Object with name '" + I.objectName + "' of size '" + (rstring)I.size + "' has been created."); } } }