Hi all,

During implementing logical replication protocol for pgjdbc
https://github.com/pgjdbc/pgjdbc/pull/550 I faced with strange behavior of
the *walcender.c*:

   1. When WAL consumer catchup master and change his state to streaming,
   not available normally complete replication by send CopyDone message until
   will not generate/create new WAL record. It occurs because logical decoding
   located in *WalSndWaitForWal* until will not available next WAL record,
   and it method receive message from consumer even reply on CopyDone with
   CopyDone but ignore exit from loop and we can wait many times waiting
   CommandStatus & ReadyForQuery packages on consumer.
   2. Logical decoding ignore message from consumer during decoding and
   writing transaction in socket(*WalSndWriteData*). It affect long
   transactions with many changes. Because for example if we start decoding
   transaction that insert 1 million records and after consume 1% of it date
   we
   decide stop replication, it will be not available until whole million
   record will not send to consumer. And I see the following problems because
   of this:


   - It will generate many not necessary network traffic.
   - Not available normally stop logical replication, because consumer will
   fail with timeout and it broke scenario when consumer put data to external
   system asynchronously and by success callback send feedback to master
   about flushed LSN, but by fail callback stop replication, and restart it
   from the last successfully sent LSN to external system, because slot
   will be busy and it will increase latency for streaming system.
   - Consumer can send keepalive message with required reply flag to master
   that right now decoding and sending huge transaction, and after some time
   disconnect master because because reply on keepalive will not get. I
   faced with it problem during research  bottledwater-pg
   <https://github.com/confluentinc/bottledwater-pg> extension that fail
   each time during receive transaction in that was modify 1 million
   record(restart_lsn was before huge transaction so extension fail again and
   again) disconnect master because not keep keepalive package too long.

I prepare small patch that fix problems describe below. Now *WalSndWriteData
*first check message from consumer and during decode transaction check that
replication still active. KeppAlive message now not send if was get
CopyDone package(keep alive now not necessary we preparing to
complete). *WalSndWaitForWal
*after get CopyDone exit from loop. With apply this patch I get next
measurements
Before
-----
logical start and stopping: *15446ms*
logical stopping: *13820ms*

physical start and stopping: 462ms
physical stopping: 348

After
-----
logical start and stopping: 2424ms
logical stopping: *26ms*

physical start and stopping: 458ms
physical stopping: 329ms

Where for measurements was use code from pgjdbc

For physical replicaion:

    LogSequenceNumber startLSN = getCurrentLSN();
>
>     Statement st = sqlConnection.createStatement();
>     st.execute("insert into test_logic_table\n"
>         + "  select id, md5(random()::text) as name from
> generate_series(1, 1000000) as id;");
>     st.close();
>
>     long start = System.nanoTime();
>
>     PGReplicationStream stream =
>         pgConnection
>             .replicationStream()
>             .physical()
>             .withStartPosition(startLSN)
>             .start();
>
>     //read single message
>     stream.read();
>     long startStopping = System.nanoTime();
>
>     stream.close();
>
>     long now = System.nanoTime();
>
>     long startAndStopTime = now - start;
>     long stopTime = now - startStopping;
>
>     System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
>     System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));
>

For logical replication:

    LogSequenceNumber startLSN = getCurrentLSN();
>
>     Statement st = sqlConnection.createStatement();
>     st.execute("insert into test_logic_table\n"
>         + "  select id, md5(random()::text) as name from
> generate_series(1, 1000000) as id;");
>     st.close();
>
>     long start = System.nanoTime();
>
>     PGReplicationStream stream =
>         pgConnection
>             .replicationStream()
>             .logical()
>             .withSlotName(SLOT_NAME)
>             .withStartPosition(startLSN)
>             .withSlotOption("include-xids", false)
>             .withSlotOption("skip-empty-xacts", true)
>             .start();
>
>     //read single message
>     stream.read();
>     long startStopping = System.nanoTime();
>
>     stream.close();
>
>     long now = System.nanoTime();
>
>     long startAndStopTime = now - start;
>     long stopTime = now - startStopping;
>
>     System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
>     System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));
>


https://github.com/Gordiychuk/postgres/tree/stopping_logical_replication
From 5f6f3b5241c535b452c1866b7fbe174ac27ce130 Mon Sep 17 00:00:00 2001
From: Vladimir Gordiychuk <fol...@gmail.com>
Date: Fri, 6 May 2016 17:06:33 +0300
Subject: [PATCH] Stop logical decoding by get CopyDone

Logical decoding during decode WALs ignore message that can reponse receiver on XLogData.
So during big transaction for example that change 1 million record it can lead to two problem:

1. Receiver can disconect server because it not responce on keepalive message with required respose marker.
2. Receiver can't stop replication, until whole transaction will not send to receiver.

Not available stop replication it's main problem. Because receiver will fail during stop replication
with timeout and also backend will generate many not network traffic. This problem
was found during implement physical\logical replication protocol in pgjdbc driver
https://github.com/pgjdbc/pgjdbc/pull/550 And it broke scenario when WALs consumer
receive decoded WALs and put it to external system asynchroneze were if some problem occurs
callback say which LSN was fail, so we can rollback to last success process LSN and
start logical replication again from it place.

I measure stopping replication with fix and without by this test:

For physical replicaion:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .physical()
            .withStartPosition(startLSN)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

For logical replication:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName(SLOT_NAME)
            .withStartPosition(startLSN)
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

And get next timing:

Before
-----
logical start and stopping: 15446ms
logical stopping: 13820ms

physical start and stopping: 462ms
physical stopping: 348

After
-----
logical start and stopping: 2424ms
logical stopping: 26ms

physical start and stopping: 458ms
physical stopping: 329ms

As you can see, not it allow stop logical replication very fast. For do it, not we check
replies first and only after that send decoded data. After get CopyDone from frontend we
stoping decoding as soon as possible.

The second part of fix, it disable sending keep alive message to frontend if already got CopyDone.
---
 src/backend/replication/logical/logical.c       | 15 ++++++----
 src/backend/replication/logical/logicalfuncs.c  | 15 +++++++++-
 src/backend/replication/logical/reorderbuffer.c |  9 ++++--
 src/backend/replication/slotfuncs.c             |  2 +-
 src/backend/replication/walsender.c             | 38 ++++++++++++++++---------
 src/include/replication/logical.h               | 13 +++++++--
 src/include/replication/reorderbuffer.h         |  7 +++++
 7 files changed, 74 insertions(+), 25 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5ccfd31..cfb4ae4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -116,7 +116,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   TransactionId xmin_horizon,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   ReorderBufferIsActive is_active)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -182,6 +183,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->is_active = is_active;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -214,7 +216,8 @@ CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalDecondingContextIsActive is_active)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -290,7 +293,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotSave();
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write, is_active);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -330,7 +333,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalDecondingContextIsActive is_active)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -380,7 +384,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 is_active);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 99112ac..e90e107 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -104,6 +104,17 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 	p->returned_rows++;
 }
 
+/**
+ * Stab function that necessary for LogicalDecondign context.
+ * Function always return true and it means that decoding WALs
+ * can't be interrupt in contrast of logical replication.
+ */
+static bool
+LogicalContextAlwaysActive(void)
+{
+	return true;
+}
+
 static void
 check_permissions(void)
 {
@@ -243,7 +254,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite,
+									LogicalContextAlwaysActive /* converting to Datum(sql api) can't be interrupted in contrast of replication*/
+									);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 57821c3..deb09dc 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1420,7 +1420,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL && rb->is_active())
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
@@ -1646,8 +1646,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		if(rb->is_active())
+		{
+			/* call commit callback */
+			rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 9cc24ea..8b30bd3 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -127,7 +127,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(
 									NameStr(*plugin), NIL,
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL, NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 926a247..ff6fc6d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -218,6 +218,8 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
+static bool IsStreamingActive(void);
+
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -816,7 +818,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										IsStreamingActive);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -995,7 +998,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	logical_decoding_ctx = CreateDecodingContext(
 											   cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData, IsStreamingActive);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1086,14 +1089,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
-	/* fast path */
-	/* Try to flush pending output to the client */
-	if (pq_flush_if_writable() != 0)
-		WalSndShutdown();
-
-	if (!pq_is_send_pending())
-		return;
-
 	for (;;)
 	{
 		int			wakeEvents;
@@ -1225,7 +1220,14 @@ WalSndWaitForWal(XLogRecPtr loc)
 			break;
 
 		/*
-		 * We only send regular messages to the client for full decoded
+		 * If we have received CopyDone from the client, sent CopyDone
+		 * ourselves, it's time to exit streaming.
+		 */
+		if (!IsStreamingActive()) {
+			break;
+		}
+
+		/* We only send regular messages to the client for full decoded
 		 * transactions, but a synchronous replication and walsender shutdown
 		 * possibly are waiting for a later location. So we send pings
 		 * containing the flush location every now and then.
@@ -1853,7 +1855,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
 		 */
-		if (!pq_is_send_pending())
+		if (!pq_is_send_pending() && !streamingDoneReceiving)
 			send_data();
 		else
 			WalSndCaughtUp = false;
@@ -2911,7 +2913,11 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
 		return;
 
-	if (waiting_for_ping_response)
+	/*
+	 * Keep alive should be send only if protocol in active state. When get or send CopyDone
+	 * it means that protocol preparing to complete.
+	 */
+	if (waiting_for_ping_response || !IsStreamingActive())
 		return;
 
 	/*
@@ -2931,3 +2937,9 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 			WalSndShutdown();
 	}
 }
+
+static
+bool IsStreamingActive(void)
+{
+	return !streamingDoneReceiving && !streamingDoneSending;
+}
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 947000e..ba54ddc 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,6 +26,13 @@ typedef void (*LogicalOutputPluginWriterWrite) (
 
 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
+/*
+ * Callback function that allow interrupt logical replication during decoding.
+ * Function return true if decoding can be continue decode, but if function return false
+ * logical decoding will stop as soon as possible.
+ */
+typedef ReorderBufferIsActive LogicalDecondingContextIsActive;
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -81,13 +88,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalDecondingContextIsActive is_active);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalDecondingContextIsActive is_active);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index e070894..28bd692 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -291,6 +291,8 @@ typedef void (*ReorderBufferMessageCB) (
 												   bool transactional,
 												   const char *prefix, Size sz,
 												   const char *message);
+/* callback signature for check decoding status */
+typedef bool (*ReorderBufferIsActive) (void);
 
 struct ReorderBuffer
 {
@@ -321,6 +323,11 @@ struct ReorderBuffer
 	ReorderBufferMessageCB message;
 
 	/*
+	 * Callback to define status of decoding. Return false if decoding not necessary continue
+	 */
+	ReorderBufferIsActive is_active;
+
+	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
 	void	   *private_data;
-- 
2.5.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to