On 05/12/17 21:07, Robert Haas wrote:
> On Mon, Dec 4, 2017 at 10:59 PM, Craig Ringer <cr...@2ndquadrant.com> wrote:
>> To me it looks like it's time to get this committed, marking as such.
> 
> This version has noticeably more code rearrangement than before, and
> I'm not sure that is actually buying us anything.  Why not keep the
> changes minimal?
> 

Yeah we moved things around in the end, the main reason would be that it
actually works closer to how it was originally designed to work. Meaning
that the slow path is not so slow when !pq_is_send_pending() which seems
to have been the reasoning for original coding.

It's not completely necessary to do it for fixing the bug, but why make
things slower than they need to be.

> Also, TBH, this doesn't seem to have been carefully reviewed for style:
> 
> -    if (!pq_is_send_pending())
> -        return;
> +    /* Try taking fast path unless we get too close to walsender timeout. */
> +    if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
> +                                          wal_sender_timeout / 2))
> +    {
> +        if (!pq_is_send_pending())
> +            return;
> +    }
> 
> Generally we write if (a && b) { ... } not if (a) { if (b) .. }
> 

It's rather ugly with && because one of the conditions is two line, but
okay here you go. I am keeping the brackets even if normally don't for
one-liners because it's completely unreadable without them IMHO.

> -    }
> +    };
> 

Oops.

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From 61a23cad58a0016876e3d6c829f6353ee491b4c7 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Tue, 12 Sep 2017 17:31:28 +0900
Subject: [PATCH] Fix walsender timeouts when decoding large transaction

The logical slots have fast code path for sending data in order to not
impose too high per message overhead. The fast path skips checks for
interrupts and timeouts. However, the existing coding failed to consider
the fact that transaction with large number of changes may take very long
to be processed and sent to the client. This causes walsender to ignore
interrupts for potentially long time and more importantly it will cause
walsender being killed due to timeout at the end of such transaction.

This commit changes the fast path to also check for interrupts and only
allows calling the fast path when last keeplaive check happened less
than half of walsender timeout ago, otherwise the slower code path will
be taken.
---
 src/backend/replication/walsender.c | 66 +++++++++++++++++++++----------------
 1 file changed, 37 insertions(+), 29 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fa1db748b5..e015870a4e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1151,6 +1151,8 @@ static void
 WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 				bool last_write)
 {
+	TimestampTz	now;
+
 	/* output previously gathered data in a CopyData packet */
 	pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
 
@@ -1160,23 +1162,54 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	 * several releases by streaming physical replication.
 	 */
 	resetStringInfo(&tmpbuf);
-	pq_sendint64(&tmpbuf, GetCurrentTimestamp());
+	now = GetCurrentTimestamp();
+	pq_sendint64(&tmpbuf, now);
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
-	/* fast path */
+	CHECK_FOR_INTERRUPTS();
+
 	/* Try to flush pending output to the client */
 	if (pq_flush_if_writable() != 0)
 		WalSndShutdown();
 
-	if (!pq_is_send_pending())
+	/* Try taking fast path unless we get too close to walsender timeout. */
+	if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
+										  wal_sender_timeout / 2) &&
+		pq_is_send_pending())
+	{
 		return;
+	}
 
+	/* If we have pending write here, go to slow path */
 	for (;;)
 	{
 		int			wakeEvents;
 		long		sleeptime;
-		TimestampTz now;
+
+		/* Check for input from the client */
+		ProcessRepliesIfAny();
+
+		now = GetCurrentTimestamp();
+
+		/* die if timeout was reached */
+		WalSndCheckTimeOut(now);
+
+		/* Send keepalive if the time has come */
+		WalSndKeepaliveIfNecessary(now);
+
+		if (!pq_is_send_pending())
+			break;
+
+		sleeptime = WalSndComputeSleeptime(now);
+
+		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+			WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
+
+		/* Sleep until something happens or we time out */
+		WaitLatchOrSocket(MyLatch, wakeEvents,
+						  MyProcPort->sock, sleeptime,
+						  WAIT_EVENT_WAL_SENDER_WRITE_DATA);
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
@@ -1198,34 +1231,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 			SyncRepInitConfig();
 		}
 
-		/* Check for input from the client */
-		ProcessRepliesIfAny();
-
 		/* Try to flush pending output to the client */
 		if (pq_flush_if_writable() != 0)
 			WalSndShutdown();
-
-		/* If we finished clearing the buffered data, we're done here. */
-		if (!pq_is_send_pending())
-			break;
-
-		now = GetCurrentTimestamp();
-
-		/* die if timeout was reached */
-		WalSndCheckTimeOut(now);
-
-		/* Send keepalive if the time has come */
-		WalSndKeepaliveIfNecessary(now);
-
-		sleeptime = WalSndComputeSleeptime(now);
-
-		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
-			WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
-
-		/* Sleep until something happens or we time out */
-		WaitLatchOrSocket(MyLatch, wakeEvents,
-						  MyProcPort->sock, sleeptime,
-						  WAIT_EVENT_WAL_SENDER_WRITE_DATA);
 	}
 
 	/* reactivate latch so WalSndLoop knows to continue */
-- 
2.14.1

Reply via email to