hi,

I am trying to implement logical replication stream API of postgresql.
I am facing unusual connection breakdown problem. Here is the simple code
that I am
using to read WAL file:

String url = "jdbc:postgresql://pcnode2:5432/benchmarksql";
            Properties props = new Properties();
            PGProperty.USER.set(props, "benchmarksql");
            PGProperty.PASSWORD.set(props, "benchmarksql");
            PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
            PGProperty.REPLICATION.set(props, "database");
            PGProperty.PREFER_QUERY_MODE.set(props, "simple");

            Connection conn = DriverManager.getConnection(url, props);
            PGConnection replConnection = conn.unwrap(PGConnection.class);

            PGReplicationStream stream = replConnection.getReplicationAPI()
                    .replicationStream().logical()
                    .withSlotName("replication_slot3")
                    .withSlotOption("include-xids", true)
                    .withSlotOption("include-timestamp", "on")
                    .withSlotOption("skip-empty-xacts", true)
                    .withStatusInterval(20, TimeUnit.MILLISECONDS).start();
            while (true) {

                ByteBuffer msg = stream.read();

                if (msg == null) {
                    TimeUnit.MILLISECONDS.sleep(10L);
                    continue;
                }

                int offset = msg.arrayOffset();
                byte[] source = msg.array();
                int length = source.length - offset;
                String data = new String(source, offset, length);
               * System.out.println(data);*

                stream.setAppliedLSN(stream.getLastReceiveLSN());
                stream.setFlushedLSN(stream.getLastReceiveLSN());

            }

Even the slightest modification in the code like commenting
*System.out.println(data)*;
which is just printing the data in the console, causes connection breakdown
problem with
following error msg

org.postgresql.util.PSQLException: Database connection failed when reading
from copy
    at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(
QueryExecutorImpl.java:1028)
    at org.postgresql.core.v3.CopyDualImpl.readFromCopy(
CopyDualImpl.java:41)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.
receiveNextData(V3PGReplicationStream.java:155)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.
readInternal(V3PGReplicationStream.java:124)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.read(
V3PGReplicationStream.java:70)
    at Server.main(Server.java:52)
Caused by: java.net.SocketException: Socket closed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at org.postgresql.core.VisibleBufferedInputStream.readMore(
VisibleBufferedInputStream.java:140)
    at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(
VisibleBufferedInputStream.java:109)
    at org.postgresql.core.VisibleBufferedInputStream.read(
VisibleBufferedInputStream.java:191)
    at org.postgresql.core.PGStream.receive(PGStream.java:495)
    at org.postgresql.core.PGStream.receive(PGStream.java:479)
    at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(
QueryExecutorImpl.java:1161)
    at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(
QueryExecutorImpl.java:1026)
    ... 5 more

I am trying to implement some logic like filtering out the unrelated table
after reading log.
But due to this unusual behavior I couldn't implement properly.
Can somebody give me some hint how to solve this problem.

Thank you.

Dipesh Dangol

Reply via email to