How to connect to a PostgresSql
How to connect to a PostgresSql database via JDBC toolkit
This document describes a step by step instruction to connect to a PostgresSql database via JDBC toolkit.
1 Download the PostgresSql jdbc driver
https://jdbc.postgresql.org/download/postgresql-42.1.3.jar create a opt directory in your SPL project directory copy the driver jar library postgresql-42.1.3.jar in to opt directory
2 - Create a database in your postgresql server
login as root
su - postgres
-bash-4.1$ psql
postgres=# CREATE DATABASE STREAMS OWNER postgres;
postgres=# ALTER USER "postgres" WITH PASSWORD 'postpass';
# create a sample table in your database
streams=# CREATE TABLE streamsTable ( personid int, lastname varchar(255), firstname varchar(255), address varchar(255), city varchar(255));
# insert some rows into table
streams=# INSERT INTO streamsTable VALUES (1234, 'myName', 'myFistName', 'Berliner Street', 'Berlin'
3 - Create a SPL project in your Streams server
namespace application;
use com.ibm.streamsx.jdbc::* ;
// *******************************************************************************
// JDBCPostgreSQL demonstrates how to connect to a PostgreSQL database and select data from a table using
// JDBCRun operator.
// Required Streams Version = 4.1.x.x
// Required JDBC Toolkit Version = 1.2.0
// https://github.com/IBMStreams/streamsx.jdbc
// PostgreSQL jdbc driver version 42 or higher
// https://jdbc.postgresql.org/download/postgresql-42.1.3.jar
// To connect to database, the following parameters need to be specified:
// jdbcDriverLib : the jdbc driver libraries (download the jdbc driver file from https://jdbc.postgresql.org/download/postgresql-42.1.3.jar
// and store it in opt folder, e.g. opt/postgresql-42.1.3.jar )
// jdbcClassName : the class name for PostgreSQL jdbc driver (org.postgresql.Driver)
// jdbcUrl : the database URL. (e.g. jdbc:postgresql://<your postgresql IP address>/<your database name>)
// dbcUser : the database user on whose behalf the connection is being made.
// jdbcPassword : the user’s password.
// transactionSize : 2
// set the transactionSize > 1 for postgresql database
// In the SPL sample:
// "select" operator demonstrates how to run SQL statement from stream attribute via statementAttr parameter
// In this sample the JDBCRun operator connect to the database and read all rows from test table and
// write them into data/output.txt
//
// *******************************************************************************/
composite JDBCPostgreSQL
{
param
expression<rstring> $jdbcDriverLib : getSubmissionTimeValue("jdbcDriverLib", "opt/postgresql-42.1.3.jar");
expression<rstring> $jdbcClassName : getSubmissionTimeValue("jdbcClassName", "org.postgresql.Driver");
expression<rstring> $jdbcUrl : getSubmissionTimeValue("jdbcUrl", "jdbc:postgresql://192.168.239.160/streams");
expression<rstring> $jdbcUser : getSubmissionTimeValue("jdbcUser", "postgres");
expression<rstring> $jdbcPassword : getSubmissionTimeValue("jdbcPassword", "postpass");
expression<rstring> $statement : getSubmissionTimeValue("statement", "SELECT * from streamsTable");
type
// the postgres database deliver the select results with small capital letters
// lastname is correct and not LastName
resultSchema = int32 personid,
rstring lastname,
rstring firstname,
rstring address,
rstring city;
graph
stream<rstring sql> pulse = Beacon() {
param
iterations : 1u ;
initDelay : 2.0;
output
pulse : sql = "SELECT personid, lastname, firstname, address, city FROM streamsTable";
}
stream<resultSchema> select = JDBCRun(pulse){
param
jdbcDriverLib: $jdbcDriverLib;
jdbcClassName: $jdbcClassName;
jdbcUrl: $jdbcUrl;
jdbcUser: $jdbcUser;
jdbcPassword: $jdbcPassword;
statement: $statement;
// statementAttr: sql;
// it is possible to get the select statement via input port
// or put it direct in statement operator
transactionSize : 2;
}
() as SelectSink = FileSink(select)
{
logic
state :
{
mutable int64 counter = 0;
}
onTuple select :
{
printStringLn((rstring)personid + "," + lastname + "," + firstname + "," + address + "," + city);
}
param
file : "output.txt";
format : csv ;
flush : 1u; // flush the output file after 1 tuple
}
}
4 - Make the SPL application
create a Makefile
and run make
.PHONY: all distributed clean
JDBC_TOOLKIT_INSTALL = $(STREAMS_INSTALL)/toolkits/com.ibm.streamsx.jdbc
SPLC_FLAGS ?= -a
SPLC = $(STREAMS_INSTALL)/bin/sc
SPL_CMD_ARGS ?= -t $(JDBC_TOOLKIT_INSTALL)
SPL_MAIN_COMPOSITE = application::JDBCPostgreSQL
all: distributed
distributed:
rm -rf output
JAVA_HOME=$(STREAMS_INSTALL)/java $(SPLC) $(SPLC_FLAGS) -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS) --data-directory data
clean:
$(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE)
rm -rf output