TemperatureSample sample application for IBM Streams Runner for Apache Beam

Edit me

You can use a simple application called TemperatureSample to learn how to submit and monitor an Apache Beam 2.4 application in the Streaming Analytics service on IBM Cloud (formerly IBM Bluemix). The sample application is included with IBM® Streams Runner for Apache Beam. Some familiarity with Beam programming is helpful, though not required; the Apache Beam website has a useful Apache Beam Java SDK Quickstart page and other documentation.

For more information about this sample application, see The TemperatureSample application.

Before you start

Before you run the TemperatureSample sample application, you must configure and run the following service on IBM IBM Cloud®:

Important: If you want to compile your application on IBM Cloud, you must unset the STREAMS_INSTALL variable before you submit the application to the Streaming Analytics service.

Running the TemperatureSample application

The following video demonstrates how to launch the TemperatureSample.


  1. Navigate to the $STREAMS_RUNNER_HOME/examples directory. The Streams Runner toolkit provides all necessary files. Assuming that all environment variables are set as described in Downloading and configuring Streams Runner and that the $VCAP_SERVICES IBM Cloud credentials file has credentials in it named beam-service, you can launch the TemperatureSample application with the following command:

    java -cp \
    $STREAMS_BEAM_TOOLKIT/lib/com.ibm.streams.beam.translation.jar:\
    $STREAMS_RUNNER_HOME/examples/transform/target/beam-examples-transform-<runner-version>.jar \
    com.ibm.streams.beam.examples.transform.temperature.TemperatureSample \
    --runner=StreamsRunner \
    --contextType=STREAMING_ANALYTICS_SERVICE \
    --jarsToStage=$STREAMS_RUNNER_HOME/examples/transform/target/beam-examples-transform-x.y.x.jar
    

    Note: If the environment variables are not set, you must use full paths, and use the --vcapServices parameter to provide the path to the IBM Cloud credentials file.

    For more information about the parameters available to you, see Pipeline options for Streams Runner.

  2. Verify that the application started successfully.

    Streams Runner displays information (INFO) messages as it processes the Beam pipeline for your application, turning it into an IBM Streams application bundle (.sab) file, and eventually submitting the .sab file to IBM Cloud:

    Sep 27, 2017 2:34:19 PM com.ibm.streams.beam.translation.StreamsRunner run
    INFO: Running pipeline using StreamsRunner in context:
         contextType=STREAMING_ANALYTICS_SERVICE
         beamToolkitDir=null
         tracingLevel=null
         jobName=temperaturesample-johnmac-0927183419-7a8ccbfa
    StreamsRunner: Sending application to be built remotely, then submitting to Streaming Analytics Service...
    StreamsRunner: Application successfully submitted to Streaming Analytic service with Job ID: 1
    

    If you see an exception or error report, it indicates the reason for the failure. The following issues are common:

    • A typographical error in the java command or one of the variables it uses.
    • A problem with the $VCAP_SERVICES file or the wrong service name. Check that the "credentials" for "streaming-analytics" in the $VCAP_SERVICESfile match those for your Streaming Analytics service, and that the "name" matches the name that you used for the --serviceName option to the java command.

Viewing the running application

After the job is submitted successfully, the application output is displayed while it reports metrics:

----- Temperature Metrics --------------------------------------------------
good.summary: DistributionResult{sum=272674, count=5434, min=0, max=100}
good.device_1: DistributionResult{sum=88417, count=1792, min=0, max=100}
good.device_3: DistributionResult{sum=90876, count=1806, min=0, max=100}
good.device_2: DistributionResult{sum=93381, count=1836, min=0, max=100}
bad.device_1: 365
bad.device_2: 329
bad.device_3: 373
bad.total: 1067
----- System Metrics for step MergeReadings --------------------------------
MetricName{namespace=com.ibm.streams, name=nTuplesProcessed}: 12585
MetricName{namespace=com.ibm.streams, name=nTuplesSubmitted}: 12581

At this point, the application is also visible in the Streams console in IBM Cloud. To launch the Streams Console, navigate to the Manage tab of your Streaming Analytics Service and click Launch.

The application running in Streams Console in IBM Cloud

Of particular interest for this application is the Streams Graph. The graph shows how the Streaming Analytics service organizes and executes the code in the sample application. When you maximize the graph, you can see the graph of the whole application:

The application graph maximized

By default, the graph shows the Beam transforms of the application and matches the pipeline structure that is described in the overview. The names that are used in the graph are the same as those used in the application unless there are transform names that are not unique.

Although this simplified view matches Beam, it omits details about how Streams is executing the application. To see those details, click Configure (the wrench icon), select Show raw graph, and click Apply.

Now the application is shown with the Streams operators instead of the Beam transforms. It is still similar in structure, and the names are similar to the Beam names but not always identical. In some cases, the Streams operator name contains extra information that comes from the names of the Beam Java™ SDK classes that implement the transforms. Also, Streams does not allow all the same characters in operator names that Beam allows in transform names, so characters might be removed or replaced with valid Streams characters.

You can use the Beam metrics API to insert information to monitor your application. For more information, see Monitoring IBM Streams Runner for Apache Beam.

Stopping the application

After you explore the Streams console, stop the application by canceling the job. You can cancel the job in several ways, from almost any place that shows the job in the console, but the simplest way is by clicking Cancel Jobs in the navigation bar. Clicking Cancel Jobs opens a window that shows running jobs. Select the job or jobs that you want to cancel, in this case the job that is running the sample, and click Cancel Jobs. Cancel Jobs in the navigation bar

Cancel Jobs window

Important: Interrupting or killing the java command in the terminal where you started the application does not cause the job to be stopped, even though the job does stop reporting metrics.

The TemperatureSample application

The TemperatureSample application takes temperature readings from multiple devices. The application splits the readings into “good” (valid) and “bad” (invalid) readings based on a specific threshold. It counts the bad readings and generates some basic statistics for the good readings, and finally logs the results. The full source is available in the $STREAMS_RUNNER_HOME/examples/transform/src/com/ibm/streams/beam/examples/transform/temperature directory, but excerpts are included here to describe the structure of the pipeline.

The devices are artificial; they use the Beam CountingInput transforms named Counter_n to drive another transform named Device_n to generate random readings:

PCollectionList<KV<String,Double>> deviceReadings = PCollectionList.empty(pipeline);

for (int deviceId = 1; deviceId <= options.getNumDevices(); ++deviceId) {
    PCollection<KV<String,Double>> readings = pipeline
            .apply("Counter_" + deviceId, CountingInput.unbounded()
                    .withRate(options.getRate(), Duration.standardSeconds(1)))
            .apply("Device_" + deviceId, MapElements.via(new GenReadingFn(deviceId, maxTemp)));
    deviceReadings = deviceReadings.and(readings);
}

These readings act as inputs to a MergeReadings transform. The transform merges readings from the different devices into a single stream of data that consists of pairs of {deviceName,temperatureReading}, which are then assigned to windows of fixed time duration:

PCollection<KV<String,Double>> mergedReadings = deviceReadings
        .apply("MergeReadings", Flatten.<KV<String,Double>>pCollections())
        .apply("WindowReadings",
                Window.<KV<String,Double>>into(
                        FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))));

The windowed data is validated with a ValidateReadings transform that produces two output streams, one of good readings and one of bad readings:

final TupleTag<KV<String,Double>> goodTag = new TupleTag<KV<String,Double>>(){ ... }
final TupleTag<KV<String,Double>> badTag = new TupleTag<KV<String,Double>>(){ ... }
final double badTempThreshold = options.getBadTempThreshold();

PCollectionTuple validatedReadings = mergedReadings.apply("ValidateReadings",
        ParDo.of(new DoFn<KV<String,Double>,KV<String,Double>>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) {
                String device = c.element().getKey();
                Double temp = c.element().getValue();
                if (temp < badTempThreshold) {
                    // Good reading, output and update distribution metrics
                    c.output(c.element());
                    ...
                } else {
                    // Bad reading, output to side output, update counters
                    c.output(badTag, c.element());
                    ...
                }
            }
        }).withOutputTags(goodTag, TupleTagList.of(badTag)));

On the good readings, statistics are calculated in a GoodStats transform and then logged by a GoodLog transform:

validatedReadings
        .get(goodTag)
        .apply("GoodStats", Combine.<String, Double, Stats>perKey(new DeviceStatsFn()))
        .apply("GoodLog", ParDo.of(new LogWindowFn<Stats>("Temperature device statistics")));

The bad readings are counted by a BadCount transform and logged by the BadLog transform:

validatedReadings
        .get(badTag)
        .apply("BadCount", Count.<String,Double>perKey())
        .apply("BadLog", ParDo.of(new LogWindowFn<Long>("Temperature device error count")));

The structure of the application becomes clear after it is displayed in the Streams console after the application is run.