SPL File Types.spl

Kafka Toolkit > com.ibm.streamsx.kafka 3.2.2 > com.ibm.streamsx.kafka > Types.spl

Content

Types

Composites

composite MessageType

Defines Message types with default attribute names and types.

Static Types

MessageType.BlobMessage = blob message, rstring key;

This type represents a message with message attribute of type blob and key attribute with SPL type rstring. The type can be used for consumed messages or messages to be produced.

Example use:

// messages get de-serialized into blob
stream <MessageType.BlobMessage> ReceivedMessages = KafkaConsumer() {
    param
        topic: "myTopic";
        groupId: "consumerGroup1";
        propertiesFile: "etc/consumer.properties";
}
MessageType.ConsumerMessageMetadata = rstring topic, int32 partition, int64 offset, int64 messageTimestamp;

This type represents the meta data of a received message.

It consists of following attributes:
  • topic (rstring)
  • partition (int32)
  • offset (int64)
  • messageTimestamp (int64) - milliseconds since Unix Epoch

Example Use:

stream <MessageType.BlobMessage, MessageType.ConsumerMessageMetadata> ReceivedMessages = KafkaConsumer() {
    param
        topic: "myTopic";
        groupId: "consumerGroup1";
        propertiesFile: "etc/consumer.properties";
}
MessageType.StringMessage = rstring message, rstring key;

This type represents a message with message and key attribute, both being rstring. The type can be used for consumed messages and messages to be produced.

Example use:
// messages get de-serialized into rstring
stream <MessageType.StringMessage> ReceivedMessages = KafkaConsumer() {
    param
        topic: "myTopic";
        groupId: "consumerGroup1";
        propertiesFile: "etc/consumer.properties";
}
MessageType.TopicPartition = rstring topic, int32 partition;

This type represents a topic and a partition number. It can be used to store meta data of received messages and to specify the topic and partition number of messages to be published (produced).

composite Control

Defines types for the JSON generator functions for the control port.

Static Types

Control.TopicPartition = rstring topic, int32 partition;

Tuple type for a topic partition to be added or removed

Example Use:

mutable list <Control.TopicPartition> partitionsToAdd = [];
appendM (partitionsToAdd, {topic = 'topic1', partition = 0});
appendM (partitionsToAdd, {topic = 'topic1', partition = 1});
submit ({controlMsg = createMessageAddTopicPartition (partitionsToAdd)}, ControlMessages);
Control.TopicPartitionOffset = rstring topic, int32 partition, int64 offset;

Tuple type for a topic partition with offset to be added

Example Use:

mutable list <Control.TopicPartitionOffset> partitionsWithOffsetsToAdd = [];
appendM (partitionsWithOffsetsToAdd, {topic = 'topic1', partition = 0, offset = -2l});
appendM (partitionsWithOffsetsToAdd, {topic = 'topic1', partition = 1, offset = 345678l});
submit ({controlMsg = createMessageAddTopicPartition (partitionsWithOffsetsToAdd)}, ControlMessages);