On 19 September 2016 at 07:12, Vladimir Gordiychuk <fol...@gmail.com> wrote:
> New patch in attach. 0001 and 0002 without changes.
> 0003 - patch contain improvements for pg_recvloginca, now pg_recvloginca
> after receive SIGINT will send CopyDone package to postgresql and wait from
> server CopyDone. For backward compatible after repeat send SIGINT
> pg_recvloginca will continue immediately without wait CopyDone from server.
> 0004 patch contain regression tests on scenarios that fix 0001 and 0002
> patches.

Great.

Thanks for this.


First observation (which I'll fix in updated patch):



It looks like you didn't import my updated patches, so I've rebased
your new patches on top of them.

Whitespace issues:

$ git am 
~/Downloads/0003-Add-ability-for-pg_recvlogical-safe-stop-replication.patch
Applying: Add ability for pg_recvlogical safe stop replication
/home/craig/projects/2Q/postgres/.git/rebase-apply/patch:140: indent
with spaces.
   msgBuf + hdr_len + bytes_written,
/home/craig/projects/2Q/postgres/.git/rebase-apply/patch:550: indent
with spaces.
    /* Backward compatible, allow force interrupt logical replication
/home/craig/projects/2Q/postgres/.git/rebase-apply/patch:551: indent
with spaces.
     * after second SIGINT without wait CopyDone from server
/home/craig/projects/2Q/postgres/.git/rebase-apply/patch:552: indent
with spaces.
     */
warning: 4 lines add whitespace errors.


Remember to use "git log --check" before sending patches.

Also, comment style, please do

/* this */

or

/*
 * this
 */

not

/* this
 */




I did't see you explain why this was removed:

-    /* fast path */
-    /* Try to flush pending output to the client */
-    if (pq_flush_if_writable() != 0)
-        WalSndShutdown();
-
-    if (!pq_is_send_pending())
-        return;



I fixed a warning introduced here:


pg_recvlogical.c: In function ‘ProcessXLogData’:
pg_recvlogical.c:289:2: warning: ISO C90 forbids mixed declarations
and code [-Wdeclaration-after-statement]
  int bytes_left = msgLength - hdr_len;
  ^


Some of the changes to pg_recvlogical look to be copied from
receivelog.c, most notably the functions ProcessKeepalive and
ProcessXLogData . I thought that rather than copying them, shouldn't
the existing ones be modified to meet your needs? But it looks like
the issue is that a fair chunk of the rest of pg_recvlogical doesn't
re-use code from receivelog.c either; for example, pg_recvlogical's
sendFeedback differs from receivelog.c's sendFeedback. So
pg_recvlogical doesn't share the globals that receivelog.c assumes are
used. Also, what I thought were copied from receivelog.c were actually
extracted from code previously inline in StreamLogicalLog(...) in
pg_recvlogical.c .

I'm reluctant to try to untangle this in the same patch for risk that
it'll get stalled by issues with refactoring. The change you've
already made is a useful cleanup without messing with unnecessary
code.

Your patch 3 does something ... odd:

 src/test/recovery/t/001_stream_rep.pl                |  63
----------------------
 src/test/recovery/t/002_archiving.pl                 |  53 -------------------
 src/test/recovery/t/003_recovery_targets.pl          | 146
---------------------------------------------------
 src/test/recovery/t/004_timeline_switch.pl           |  75
--------------------------
 src/test/recovery/t/005_replay_delay.pl              |  69
------------------------
 src/test/recovery/t/006_logical_decoding.pl          |  40 --------------
 src/test/recovery/t/007_sync_rep.pl                  | 174
------------------------------------------------------------
 src/test/recovery/t/previous/001_stream_rep.pl       |  63
++++++++++++++++++++++
 src/test/recovery/t/previous/002_archiving.pl        |  53 +++++++++++++++++++
 src/test/recovery/t/previous/003_recovery_targets.pl | 146
+++++++++++++++++++++++++++++++++++++++++++++++++++
 src/test/recovery/t/previous/004_timeline_switch.pl  |  75
++++++++++++++++++++++++++
 src/test/recovery/t/previous/005_replay_delay.pl     |  69
++++++++++++++++++++++++
 src/test/recovery/t/previous/006_logical_decoding.pl |  40 ++++++++++++++
 src/test/recovery/t/previous/007_sync_rep.pl         | 174
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

so I've revised it to remove that whole change, which I think you
probably did not mean to commit.

Reworded commit message.

Ensured that we send feedback just before a client-initiated CopyDone
so server knows what position we saved up to. We don't discard already
buffered data

I've modified patch 3 so that it also flushes to disk and sends
feedback before sending CopyDone, then discards any xlog data received
after it sends CopyDone. This is helpful in ensuring that we get
predictable behaviour when cancelling pg_recvxlog and restarting it
because the client and server agree on the point at which replay
stopped.

I was evaluating the tests and I don't think they actually demonstrate
that the patch works. There's nothing done to check that
pg_recvlogical exited because of client-initiated CopyDone. While
looking into that I found that it also never actually hits
ProcessCopyDone(...) because libpq handles the CopyDone reply from the
server its self and treats it as end-of-stream. So the loop in
StreamLogicalLog will just end and ProcessCopyDone() is dead code.

Initialization of copyDoneSent and copyDoneReceived were also in the
wrong place and would've caused issues with retry looping.

I modified pg_recvlogical so that when in verbose mode it prints a
message when it exits by client request. Then modified the tests to
look for that message.

An updated patch series is attached. Please re-test and review my
changes. At that point I'll mark it ready to go unless someone else
wants to take a look. I'm pretty much out of time for this anyway.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 0eda484d8f314fd103306b9335262633dba41d23 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 10 May 2016 10:34:10 +0800
Subject: [PATCH 1/4] Respect client-initiated CopyDone in walsender

The walsender never looked for CopyDone sent by the client unless it
had already decided it was done sending data and dispatched its own
CopyDone message.

Check for client-initiated CopyDone when in COPY BOTH mode, returning to
command mode. In logical decoding this will allow the client to end a logical
decoding session between transactions without just unilaterally closing its
connection.

This change does not allow a client to end COPY BOTH session in the middle of
processing a logical decoding block.

TODO effect on physical walsender?
---
 src/backend/replication/walsender.c | 32 +++++++++++++++++++++++++++-----
 1 file changed, 27 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c7743da..2e3697b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -759,6 +759,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	/* make sure we have enough WAL available */
 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
+	/*
+	 * If the client sent CopyDone while we were waiting,
+	 * bail out so we can wind up the decoding session.
+	 */
+	if (streamingDoneSending)
+		return -1;
+
 	/* more than one block available */
 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
 		count = XLOG_BLCKSZ;
@@ -1220,8 +1227,11 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * It's important to do this check after the recomputation of
 		 * RecentFlushPtr, so we can send all remaining data before shutting
 		 * down.
+		 *
+		 * We'll also exit here if the client sent CopyDone because it wants
+		 * to return to command mode.
 		 */
-		if (walsender_ready_to_stop)
+		if (walsender_ready_to_stop || streamingDoneReceiving)
 			break;
 
 		/*
@@ -1787,7 +1797,14 @@ WalSndCheckTimeOut(TimestampTz now)
 	}
 }
 
-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Main loop of walsender process that streams the WAL over Copy messages.
+ *
+ * The send_data callback must enqueue complete CopyData messages to libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.
+ */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -1853,10 +1870,15 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
+		 *
+		 * If we're trying to finish sending and exit we shouldn't enqueue more
+		 * data to libpq. We need to finish writing out whatever we already
+		 * have in libpq's send buffer to maintain protocol sync so we still
+		 * need to loop until it's flushed.
 		 */
-		if (!pq_is_send_pending())
+		if (!pq_is_send_pending() && !streamingDoneSending)
 			send_data();
-		else
+		else if (!streamingDoneSending)
 			WalSndCaughtUp = false;
 
 		/* Try to flush pending output to the client */
@@ -2912,7 +2934,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
 		return;
 
-	if (waiting_for_ping_response)
+	if (waiting_for_ping_response || streamingDoneSending)
 		return;
 
 	/*
-- 
2.5.5

From f98f2388c57d938ebbe07ccd2dbe02138312858f Mon Sep 17 00:00:00 2001
From: Vladimir Gordiychuk <fol...@gmail.com>
Date: Wed, 7 Sep 2016 00:39:18 +0300
Subject: [PATCH 2/4] Client-initiated CopyDone during transaction decoding in
 walsender

The prior patch caused the walsender to react to a client-initiated
CopyDone while it's in the WalSndLoop. That's not enough to let clients
end logical decoding mid-transaction because we loop in ReorderBufferCommit
during decoding of a transaction without ever returning to WalSndLoop.

Allow breaking out of ReorderBufferCommit by detecting that client
input has requested an end to COPY BOTH mode, so clients can abort
decoding mid-transaction.
---
 src/backend/replication/logical/logical.c       | 15 +++++---
 src/backend/replication/logical/logicalfuncs.c  |  3 +-
 src/backend/replication/logical/reorderbuffer.c | 11 ++++--
 src/backend/replication/slotfuncs.c             |  2 +-
 src/backend/replication/walsender.c             | 48 +++++++++++++++----------
 src/include/replication/logical.h               |  6 ++--
 src/include/replication/reorderbuffer.h         | 12 +++++++
 7 files changed, 66 insertions(+), 31 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1512be5..4065899 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -116,7 +116,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   TransactionId xmin_horizon,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   ContinueDecodingCB continue_decoding_cb)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -180,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->continue_decoding_cb = continue_decoding_cb;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -212,7 +214,8 @@ CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  ContinueDecodingCB continue_decoding_cb)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -288,7 +291,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotSave();
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write, continue_decoding_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -328,7 +331,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  ContinueDecodingCB continue_decoding_cb)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -378,7 +382,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 continue_decoding_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 318726e..520aa90 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -249,7 +249,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite,
+									NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9b430b9..7405e2c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1417,7 +1417,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL &&
+			   (rb->continue_decoding_cb == NULL ||
+				rb->continue_decoding_cb()))
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
@@ -1643,8 +1645,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		if (rb->continue_decoding_cb == NULL || rb->continue_decoding_cb())
+		{
+			/* call commit callback */
+			rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f908761..976bc0c 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -127,7 +127,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(
 									NameStr(*plugin), NIL,
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL, NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 2e3697b..e765126 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -218,6 +218,23 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
+/*
+ * Return true until either the client or server side have requested that we wind
+ * up COPY BOTH mode by sending a CopyDone.
+ *
+ * If we receive a CopyDone from the client we should avoid sending any further
+ * CopyData messages and return to command mode as promptly as possible.
+ *
+ * While in the middle of sending data to a client we notice a client-initated
+ * CopyDone when WalSndWriteData() calls ProcessRepliesIfAny() and it
+ * sets streamingDoneSending.
+ */
+static
+bool IsStreamingActive(void)
+{
+	return !streamingDoneReceiving && !streamingDoneSending;
+}
+
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -823,7 +840,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										IsStreamingActive);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1002,7 +1020,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	logical_decoding_ctx = CreateDecodingContext(
 											   cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData, IsStreamingActive);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1093,14 +1111,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
-	/* fast path */
-	/* Try to flush pending output to the client */
-	if (pq_flush_if_writable() != 0)
-		WalSndShutdown();
-
-	if (!pq_is_send_pending())
-		return;
-
 	for (;;)
 	{
 		int			wakeEvents;
@@ -1235,7 +1245,14 @@ WalSndWaitForWal(XLogRecPtr loc)
 			break;
 
 		/*
-		 * We only send regular messages to the client for full decoded
+		 * If we have received CopyDone from the client, sent CopyDone
+		 * ourselves, it's time to exit streaming.
+		 */
+		if (!IsStreamingActive()) {
+			break;
+		}
+
+		/* We only send regular messages to the client for full decoded
 		 * transactions, but a synchronous replication and walsender shutdown
 		 * possibly are waiting for a later location. So we send pings
 		 * containing the flush location every now and then.
@@ -1797,14 +1814,7 @@ WalSndCheckTimeOut(TimestampTz now)
 	}
 }
 
-/*
- * Main loop of walsender process that streams the WAL over Copy messages.
- *
- * The send_data callback must enqueue complete CopyData messages to libpq
- * using pq_putmessage_noblock or similar, since the walsender loop may send
- * CopyDone then exit and return to command mode in response to a client
- * CopyDone between calls to send_data.
- */
+/* Main loop of walsender process that streams the WAL over Copy messages. */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 947000e..9fc4098 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -81,13 +81,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  ContinueDecodingCB continue_decoding_cb);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  ContinueDecodingCB continue_decoding_cb);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 9e209ae..c2e0eea 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -292,6 +292,13 @@ typedef void (*ReorderBufferMessageCB) (
 												 const char *prefix, Size sz,
 													const char *message);
 
+/*
+ * Callback function that allow interrupt logical replication during decoding.
+ * Function return true if decoding can be continue decode, but if function return false
+ * logical decoding will stop as soon as possible.
+ */
+typedef bool (*ContinueDecodingCB) (void);
+
 struct ReorderBuffer
 {
 	/*
@@ -321,6 +328,11 @@ struct ReorderBuffer
 	ReorderBufferMessageCB message;
 
 	/*
+	 * Callback to define status of decoding. Return false if decoding not necessary continue
+	 */
+	ContinueDecodingCB continue_decoding_cb;
+
+	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
 	void	   *private_data;
-- 
2.5.5

From d961eca9c320cc81675268db58996f7c17c7f800 Mon Sep 17 00:00:00 2001
From: Vladimir Gordiychuk <fol...@gmail.com>
Date: Mon, 19 Sep 2016 01:50:05 +0300
Subject: [PATCH 3/4] Add ability for pg_recvlogical to stop replication from
 client side

Now pg_recvlogical will respond to the first SIGINT (Ctrl+C) it receives by
sending a CopyDone message to postgresql and waiting for a CopyDone reply.
This change is mainly to demonstrate that ending COPY BOTH mode and returning
to command mode works and allow it to be tested, as it is not currently
necessary to warn the server that streaming will end or "cleanly" shut down
streaming. It also lets us make sure the server and client are in agreement
about how far streaming proceeded at time of shutdown, so the server' confirmed
flush position will match that of the client.

A second SIGINT will forcibly interrupt replication without waiting for a
complete CopyDone exhange so pg_recvlogical can still be terminated if the
peer is unreachable or unresponsive. This is the same behaviour prior versions
had on their first SIGINT.
---
 src/bin/pg_basebackup/pg_recvlogical.c | 462 +++++++++++++++++++++------------
 1 file changed, 289 insertions(+), 173 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 4c6cf70..c3d615b 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -50,8 +50,12 @@ static const char *plugin = "test_decoding";
 /* Global State */
 static int	outfd = -1;
 static volatile sig_atomic_t time_to_abort = false;
+static volatile sig_atomic_t force_time_to_abort = false;
 static volatile sig_atomic_t output_reopen = false;
+static bool copyDoneSent;
+static bool copyDoneReceived;
 static bool output_isfile;
+static int64 last_status_time;
 static int64 output_last_fsync = -1;
 static bool output_needs_fsync = false;
 static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
@@ -195,6 +199,222 @@ OutputFsync(int64 now)
 	return true;
 }
 
+static bool
+ProcessKeepalive(PGconn *conn, char *msgBuf, int msgLength)
+{
+	int			pos;
+	bool		replyRequested;
+	XLogRecPtr	walEnd;
+
+	/*
+	 * Parse the keepalive message, enclosed in the CopyData message.
+	 * We just check if the server requested a reply, and ignore the
+	 * rest.
+	 */
+	pos = 1;			/* skip msgtype 'k' */
+
+	/* read walEnd */
+	walEnd = fe_recvint64(&msgBuf[pos]);
+	output_written_lsn = Max(walEnd, output_written_lsn);
+
+	pos += 8;			/* skip sendTime */
+
+	if (msgLength < pos + 1)
+	{
+		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+				progname, msgLength);
+		return -1;
+	}
+	replyRequested = msgBuf[pos];
+
+	/* If the server requested an immediate reply, send one. */
+	if (replyRequested)
+	{
+		int64 now = feGetCurrentTimestamp();
+
+		/* fsync data, so we send a recent flush pointer */
+		if (!OutputFsync(now))
+		{
+			return false;
+		}
+
+		if (!sendFeedback(conn, now, true, false))
+		{
+			return false;
+		}
+		last_status_time = now;
+	}
+
+	return true;
+}
+
+static bool
+ProcessXLogData(PGconn *conn, char *msgBuf, int msgLength)
+{
+	int bytes_left;
+	int bytes_written;
+
+	/*
+	 * Read the header of the XLogData message, enclosed in the CopyData
+	 * message. We only need the WAL location field (dataStart), the rest
+	 * of the header is ignored.
+	 */
+	int hdr_len = 1;			/* msgtype 'w' */
+	hdr_len += 8;			/* dataStart */
+	hdr_len += 8;			/* walEnd */
+	hdr_len += 8;			/* sendTime */
+	if (msgLength < hdr_len + 1)
+	{
+		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+				progname, msgLength);
+		return false;
+	}
+
+	if (time_to_abort && copyDoneSent)
+	{
+		/*
+		 * We've sent feedback and sent CopyDone, so we are now discarding
+		 * xlog data input to find the server's reply CopyDone. That way when
+		 * another client connects to the slot later they start replay exactly
+		 * where we left off - or at least at the last commit we flushed to
+		 * disk. This is not an error condition.
+		 */
+		return true;
+	}
+
+	/* Extract WAL location for this block */
+	{
+		XLogRecPtr	temp = fe_recvint64(&msgBuf[1]);
+
+		output_written_lsn = Max(temp, output_written_lsn);
+	}
+
+	bytes_left = msgLength - hdr_len;
+	bytes_written = 0;
+
+	/* signal that a fsync is needed */
+	output_needs_fsync = true;
+
+	while (bytes_left)
+	{
+		int			ret;
+
+		ret = write(outfd,
+					msgBuf + hdr_len + bytes_written,
+					bytes_left);
+
+		if (ret < 0)
+		{
+			fprintf(stderr,
+			  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+					progname, bytes_left, outfile,
+					strerror(errno));
+			return false;
+		}
+
+		/* Write was successful, advance our position */
+		bytes_written += ret;
+		bytes_left -= ret;
+	}
+
+	if (write(outfd, "\n", 1) != 1)
+	{
+		fprintf(stderr,
+			  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+				progname, 1, outfile,
+				strerror(errno));
+		return false;
+	}
+
+	return true;
+}
+
+static bool
+ProcessReceiveMsg(PGconn *conn, unsigned char type, char *msgBuf, int msgLength)
+{
+	bool success = false;
+	switch (type)
+	{
+		case 'k':
+			success = ProcessKeepalive(conn, msgBuf, msgLength);
+			break;
+		case 'w':
+			success = ProcessXLogData(conn, msgBuf, msgLength);
+			break;
+		default:
+			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+					progname, type);
+	}
+
+	return success;
+}
+
+/*
+ * Sync wait activity on socket. Waiting can be interrupt by fsync or keepalive timeout.
+ * Returns the number of ready descriptors, or -1 for errors.
+ */
+static int
+WaitSocketActivity(PGconn *conn, int64 now)
+{
+	/*
+	 * In async mode, and no data available. We block on reading but
+	 * not more than the specified timeout, so that we can send a
+	 * response back to the client.
+	 */
+	fd_set		input_mask;
+	int64		message_target = 0;
+	int64		fsync_target = 0;
+	struct timeval timeout;
+	struct timeval *timeoutptr = NULL;
+
+	if (PQsocket(conn) < 0)
+	{
+		fprintf(stderr,
+				_("%s: invalid socket: %s"),
+				progname, PQerrorMessage(conn));
+		return -1;
+	}
+
+	FD_ZERO(&input_mask);
+	FD_SET(PQsocket(conn), &input_mask);
+
+	/* Compute when we need to wakeup to send a keepalive message. */
+	if (standby_message_timeout)
+		message_target = last_status_time + (standby_message_timeout - 1) *
+			((int64) 1000);
+
+	/* Compute when we need to wakeup to fsync the output file. */
+	if (fsync_interval > 0 && output_needs_fsync)
+		fsync_target = output_last_fsync + (fsync_interval - 1) *
+			((int64) 1000);
+
+	/* Now compute when to wakeup. */
+	if (message_target > 0 || fsync_target > 0)
+	{
+		int64		targettime;
+		long		secs;
+		int			usecs;
+
+		targettime = message_target;
+
+		if (fsync_target > 0 && fsync_target < targettime)
+			targettime = fsync_target;
+
+		feTimestampDifference(now,
+							  targettime,
+							  &secs,
+							  &usecs);
+		if (secs <= 0)
+			timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+		else
+			timeout.tv_sec = secs;
+		timeout.tv_usec = usecs;
+		timeoutptr = &timeout;
+	}
+
+	return select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+}
+
 /*
  * Start the log streaming
  */
@@ -203,12 +423,14 @@ StreamLogicalLog(void)
 {
 	PGresult   *res;
 	char	   *copybuf = NULL;
-	int64		last_status = -1;
 	int			i;
 	PQExpBuffer query;
 
 	output_written_lsn = InvalidXLogRecPtr;
 	output_fsync_lsn = InvalidXLogRecPtr;
+	last_status_time = -1;
+	copyDoneReceived = false;
+	copyDoneSent = false;
 
 	query = createPQExpBuffer();
 
@@ -271,13 +493,10 @@ StreamLogicalLog(void)
 				_("%s: streaming initiated\n"),
 				progname);
 
-	while (!time_to_abort)
+	while (!force_time_to_abort)
 	{
 		int			r;
-		int			bytes_left;
-		int			bytes_written;
 		int64		now;
-		int			hdr_len;
 
 		if (copybuf != NULL)
 		{
@@ -298,15 +517,50 @@ StreamLogicalLog(void)
 				goto error;
 		}
 
-		if (standby_message_timeout > 0 &&
-			feTimestampDifferenceExceeds(last_status, now,
+		if (standby_message_timeout > 0 && !time_to_abort &&
+			feTimestampDifferenceExceeds(last_status_time, now,
 										 standby_message_timeout))
 		{
 			/* Time to send feedback! */
 			if (!sendFeedback(conn, now, true, false))
 				goto error;
 
-			last_status = now;
+			last_status_time = now;
+		}
+
+		if (time_to_abort && !copyDoneSent)
+		{
+			if (verbose)
+			{
+				fprintf(stderr,
+						_("%s: stopping write up to %X/%X, flush to %X/%X (slot %s)\n"),
+						progname,
+						(uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
+						(uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
+						replication_slot);
+			}
+
+			/*
+			 * Force fsync and send feedback just before we send CopyDone to
+			 * make sure the server knows exactly what we replayed up to. We'll
+			 * discard data received after we request the end of COPY BOTH mode
+			 * so we know we've written everything we're going to.
+			 */
+			if (!OutputFsync(now))
+				goto error;
+
+			if (!sendFeedback(conn, now, true, false))
+				goto error;
+
+			last_status_time = now;
+
+			if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+			{
+				fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+						progname, PQerrorMessage(conn));
+				goto error;
+			}
+			copyDoneSent = true;
 		}
 
 		/* got SIGHUP, close output file */
@@ -349,64 +603,9 @@ StreamLogicalLog(void)
 		r = PQgetCopyData(conn, &copybuf, 1);
 		if (r == 0)
 		{
-			/*
-			 * In async mode, and no data available. We block on reading but
-			 * not more than the specified timeout, so that we can send a
-			 * response back to the client.
-			 */
-			fd_set		input_mask;
-			int64		message_target = 0;
-			int64		fsync_target = 0;
-			struct timeval timeout;
-			struct timeval *timeoutptr = NULL;
+			int readyMsg = WaitSocketActivity(conn, now);
 
-			if (PQsocket(conn) < 0)
-			{
-				fprintf(stderr,
-						_("%s: invalid socket: %s"),
-						progname, PQerrorMessage(conn));
-				goto error;
-			}
-
-			FD_ZERO(&input_mask);
-			FD_SET(PQsocket(conn), &input_mask);
-
-			/* Compute when we need to wakeup to send a keepalive message. */
-			if (standby_message_timeout)
-				message_target = last_status + (standby_message_timeout - 1) *
-					((int64) 1000);
-
-			/* Compute when we need to wakeup to fsync the output file. */
-			if (fsync_interval > 0 && output_needs_fsync)
-				fsync_target = output_last_fsync + (fsync_interval - 1) *
-					((int64) 1000);
-
-			/* Now compute when to wakeup. */
-			if (message_target > 0 || fsync_target > 0)
-			{
-				int64		targettime;
-				long		secs;
-				int			usecs;
-
-				targettime = message_target;
-
-				if (fsync_target > 0 && fsync_target < targettime)
-					targettime = fsync_target;
-
-				feTimestampDifference(now,
-									  targettime,
-									  &secs,
-									  &usecs);
-				if (secs <= 0)
-					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
-				else
-					timeout.tv_sec = secs;
-				timeout.tv_usec = usecs;
-				timeoutptr = &timeout;
-			}
-
-			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
-			if (r == 0 || (r < 0 && errno == EINTR))
+			if (readyMsg == 0 || (readyMsg < 0 && errno == EINTR))
 			{
 				/*
 				 * Got a timeout or signal. Continue the loop and either
@@ -415,7 +614,7 @@ StreamLogicalLog(void)
 				 */
 				continue;
 			}
-			else if (r < 0)
+			else if (readyMsg < 0)
 			{
 				fprintf(stderr, _("%s: select() failed: %s\n"),
 						progname, strerror(errno));
@@ -430,12 +629,26 @@ StreamLogicalLog(void)
 						progname, PQerrorMessage(conn));
 				goto error;
 			}
+
 			continue;
 		}
 
-		/* End of copy stream */
+		/*
+		 * End of copy stream (server sent CopyDone)
+		 *
+		 * This is where we exit on normal time_to_abort because our own
+		 * CopyDone caused the server to shut down streaming on its end.
+		 */
 		if (r == -1)
+		{
+			copyDoneReceived = true;
+			if (verbose && time_to_abort && copyDoneSent)
+			{
+				fprintf(stderr,
+						_("%s: streaming ended by user request"), progname);
+			}
 			break;
+		}
 
 		/* Failure while reading the copy stream */
 		if (r == -2)
@@ -445,113 +658,8 @@ StreamLogicalLog(void)
 			goto error;
 		}
 
-		/* Check the message type. */
-		if (copybuf[0] == 'k')
+		if(!ProcessReceiveMsg(conn, copybuf[0], copybuf, r))
 		{
-			int			pos;
-			bool		replyRequested;
-			XLogRecPtr	walEnd;
-
-			/*
-			 * Parse the keepalive message, enclosed in the CopyData message.
-			 * We just check if the server requested a reply, and ignore the
-			 * rest.
-			 */
-			pos = 1;			/* skip msgtype 'k' */
-			walEnd = fe_recvint64(&copybuf[pos]);
-			output_written_lsn = Max(walEnd, output_written_lsn);
-
-			pos += 8;			/* read walEnd */
-
-			pos += 8;			/* skip sendTime */
-
-			if (r < pos + 1)
-			{
-				fprintf(stderr, _("%s: streaming header too small: %d\n"),
-						progname, r);
-				goto error;
-			}
-			replyRequested = copybuf[pos];
-
-			/* If the server requested an immediate reply, send one. */
-			if (replyRequested)
-			{
-				/* fsync data, so we send a recent flush pointer */
-				if (!OutputFsync(now))
-					goto error;
-
-				now = feGetCurrentTimestamp();
-				if (!sendFeedback(conn, now, true, false))
-					goto error;
-				last_status = now;
-			}
-			continue;
-		}
-		else if (copybuf[0] != 'w')
-		{
-			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
-					progname, copybuf[0]);
-			goto error;
-		}
-
-
-		/*
-		 * Read the header of the XLogData message, enclosed in the CopyData
-		 * message. We only need the WAL location field (dataStart), the rest
-		 * of the header is ignored.
-		 */
-		hdr_len = 1;			/* msgtype 'w' */
-		hdr_len += 8;			/* dataStart */
-		hdr_len += 8;			/* walEnd */
-		hdr_len += 8;			/* sendTime */
-		if (r < hdr_len + 1)
-		{
-			fprintf(stderr, _("%s: streaming header too small: %d\n"),
-					progname, r);
-			goto error;
-		}
-
-		/* Extract WAL location for this block */
-		{
-			XLogRecPtr	temp = fe_recvint64(&copybuf[1]);
-
-			output_written_lsn = Max(temp, output_written_lsn);
-		}
-
-		bytes_left = r - hdr_len;
-		bytes_written = 0;
-
-		/* signal that a fsync is needed */
-		output_needs_fsync = true;
-
-		while (bytes_left)
-		{
-			int			ret;
-
-			ret = write(outfd,
-						copybuf + hdr_len + bytes_written,
-						bytes_left);
-
-			if (ret < 0)
-			{
-				fprintf(stderr,
-				  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
-						progname, bytes_left, outfile,
-						strerror(errno));
-				goto error;
-			}
-
-			/* Write was successful, advance our position */
-			bytes_written += ret;
-			bytes_left -= ret;
-		}
-
-		if (write(outfd, "\n", 1) != 1)
-		{
-			fprintf(stderr,
-				  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
-					progname, 1, outfile,
-					strerror(errno));
 			goto error;
 		}
 	}
@@ -601,6 +709,14 @@ error:
 static void
 sigint_handler(int signum)
 {
+	/*
+	 * Backward compatible, allow force interrupt logical replication
+	 * after second SIGINT without wait CopyDone from server
+	 */
+	if (time_to_abort)
+	{
+		force_time_to_abort = true;
+	}
 	time_to_abort = true;
 }
 
-- 
2.5.5

From 37c9d81fb04944b9fc8346918730b9e4349f84e7 Mon Sep 17 00:00:00 2001
From: Vladimir Gordiychuk <fol...@gmail.com>
Date: Mon, 19 Sep 2016 01:56:15 +0300
Subject: [PATCH 4/4] Add tests for stop replication protocol

---
 src/test/recovery/t/008_pg_recvlogical.pl | 226 ++++++++++++++++++++++++++++++
 1 file changed, 226 insertions(+)
 create mode 100644 src/test/recovery/t/008_pg_recvlogical.pl

diff --git a/src/test/recovery/t/008_pg_recvlogical.pl b/src/test/recovery/t/008_pg_recvlogical.pl
new file mode 100644
index 0000000..1dd41df
--- /dev/null
+++ b/src/test/recovery/t/008_pg_recvlogical.pl
@@ -0,0 +1,226 @@
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+use IPC::Run qw( start timeout ) ;
+
+
+my $verbose = $ENV{PG_TAP_VERBOSE};
+
+# Launch pg_recvlogical as a background proc and return the IPC::Run handle for it
+# as well as the proc's
+sub start_pg_recvlogical
+{
+    my ($node, $slotname, %params) = @_;
+    my $stdout = my $stderr = '';
+    my $timeout           = undef;
+    my $timeout_exception = 'pg_recvlogical timed out';
+
+    $timeout =
+        IPC::Run::timeout($params{timeout}, exception => $timeout_exception)
+        if (defined($params{timeout}));
+
+    my @cmd = ("pg_recvlogical", "--verbose", "-S", "$slotname", "--no-loop", "--dbname", $node->connstr, "--start", "-f", "-");
+
+    push @cmd, @{ $params{option} }
+        if defined $params{option};
+
+    diag "Running '@cmd'" if $verbose;
+
+    my $proc = start \@cmd, '<', \undef, '2>', \$stderr, '>', \$stdout, $timeout;
+
+    die $! unless defined($proc);
+
+    sleep 5;
+
+    if ($stdout ne "")
+    {
+        diag "#### Begin standard out\n" if $verbose;
+        diag $stdout if $verbose;
+        diag "\n#### End standard out\n" if $verbose;
+    }
+
+    if ($stderr ne "")
+    {
+        diag "#### Begin standard error\n" if $verbose;
+        diag $stderr if $verbose;
+        diag "\n#### End standard error\n" if $verbose;
+    }
+
+    if (wantarray)
+    {
+        return ($proc, \$stdout, \$stderr, $timeout);
+    }
+    else
+    {
+        return $proc;
+    }
+}
+
+sub wait_for_start_streaming
+{
+    my ($node, $slotname) = @_;
+
+    diag "waiting for " . $node->name . " start streaming by slot ".$slotname if $verbose;
+    $node->poll_query_until('postgres', "select active from pg_replication_slots where slot_name = '$slotname';");
+}
+
+sub wait_for_stop_streaming
+{
+    my ($node, $slotname) = @_;
+
+    diag "waiting for " . $node->name . " streaming by slot ".$slotname." will be stopped" if $verbose;
+    $node->poll_query_until('postgres', "select not(active) from pg_replication_slots where slot_name = '$slotname';");
+}
+
+sub create_logical_replication_slot
+{
+    my ($node, $slotname, $outplugin) = @_;
+
+    $node->safe_psql(
+        "postgres",
+        "select pg_drop_replication_slot('$slotname') where exists (select 1 from pg_replication_slots where slot_name = '$slotname');");
+
+    $node->safe_psql('postgres',
+        "SELECT pg_create_logical_replication_slot('$slotname', '$outplugin');"
+    );
+}
+
+my ($proc);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 12\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 12\n");
+$node_master->append_conf('postgresql.conf', "max_connections = 20\n");
+$node_master->dump_info;
+$node_master->start;
+
+
+#TestCase 1: client initialize stop logical replication when database doesn't have new changes(calm state)
+{
+    my $slotname = 'calm_state_slot';
+
+    create_logical_replication_slot($node_master, $slotname, "test_decoding");
+
+    my ($stdout, $stderr, $timeout);
+    ($proc, $stdout, $stderr, $timeout) = start_pg_recvlogical(
+        $node_master,
+        $slotname,
+        timeout => 60,
+        extra_params => ['-o include-xids=false', '-o skip-empty-xacts=true']
+    );
+
+    wait_for_start_streaming($node_master, $slotname);
+
+    my $cancelTime = time();
+    $proc->signal("SIGINT");
+
+    $proc->pump while $proc->pumpable;
+
+    wait_for_stop_streaming($node_master, $slotname);
+
+    my $spendTime = time() - $cancelTime;
+
+    my $timed_out = 0;
+    eval {
+        $proc->finish;
+    };
+    if ($@)
+    {
+        my $x = $@;
+        if ($timeout->is_expired)
+        {
+            diag "whoops, pg_recvlogical timed out" if $verbose;
+            $timed_out = 1;
+        }
+        else
+        {
+            die $x;
+        }
+    }
+
+    if ($verbose)
+    {
+        diag "#--- pg_recvlogical stderr ---";
+        diag $$stderr;
+        diag "#--- end stderr ---";
+    }
+
+    ok(!$timed_out, "pg_recvlogical exited before timeout when idle");
+    like($$stderr, qr/stopping write up to/, 'pg_recvlogical responded to sigint when idle');
+    like($$stderr, qr/streaming ended by user request/, 'idle wait ended due to client copydone');
+    diag "decoding when idle stopped after ${spendTime}s";
+    ok((time() - $cancelTime) <= 3, # allow extra time for slow machines
+        "pg_recvlogical exited promptly on sigint when idle"
+    );
+}
+
+
+#TestCase 2: client initialize stop logical replication during decode huge transaction(insert 200000 records)
+{
+    my $slotname = 'huge_tx_state_slot';
+
+    create_logical_replication_slot($node_master, $slotname, "test_decoding");
+
+    $node_master->safe_psql('postgres',
+        "create table test_logic_table(pk serial primary key, name varchar(100));");
+
+    diag 'Insert huge amount of data to table test_logic_table' if $verbose;
+    $node_master->safe_psql('postgres',
+        "insert into test_logic_table select id, md5(random()::text) as name from generate_series(1, 200000) as id;");
+
+    my ($stdout, $stderr, $timeout);
+    ($proc, $stdout, $stderr, $timeout) = start_pg_recvlogical(
+        $node_master,
+        $slotname,
+        timeout => 60,
+        extra_params => ['-o include-xids=false', '-o skip-empty-xacts=true']
+    );
+
+    wait_for_start_streaming($node_master, $slotname);
+
+    my $cancelTime = time();
+    $proc->signal("SIGINT");
+
+    $proc->pump while $proc->pumpable;
+
+    wait_for_stop_streaming($node_master, $slotname);
+
+    my $spendTime = time() - $cancelTime;
+
+    my $timed_out = 0;
+    eval {
+        $proc->finish;
+    };
+    if ($@)
+    {
+        my $x = $@;
+        if ($timeout->is_expired)
+        {
+            diag "whoops, pg_recvlogical timed out" if $verbose;
+            $timed_out = 1;
+        }
+        else
+        {
+            die $x;
+        }
+    }
+
+    if ($verbose)
+    {
+        diag "#--- pg_recvlogical stderr ---";
+        diag $$stderr;
+        diag "#--- end stderr ---";
+    }
+
+    ok(!$timed_out, "pg_recvlogical exited before timeout when streaming");
+    like($$stderr, qr/stopping write up to/, 'pg_recvlogical responded to sigint when streaming');
+    like($$stderr, qr/streaming ended by user request/, 'streaming ended due to client copydone');
+    diag "decoding of big xact stopped after ${spendTime}s";
+    ok($spendTime <= 5, # allow extra time for slow machines
+        "pg_recvlogical exited promptly on signal when decoding");
+}
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to