>From b2ca54dbacea94fe03e506462c26674651a3134d Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Thu, 25 May 2017 17:35:24 +0200
Subject: [PATCH] Fix walsender timeouts when decoding large transaction

The logical slots have fast code path for sending data in order to not
impose too high per message overhead. The fast path skips checks for
interrupts and timeouts. However the fast path failed to consider the
fact that transaction with large number of changes may take very long to
be processed and sent to the client. This causes walsender to ignore
interrupts for potentially long time but more importantly it will cause
walsender being killed due to timeout at the end of such transaction.

This commit changes the fast path to also check for interrupts and only
allows calling the fast path when last keeplaive check happened less
than half of walsender timeout ago, otherwise the slower code path will
be taken.
---
 src/backend/replication/walsender.c | 34 +++++++++++++++++++++-------------
 1 file changed, 21 insertions(+), 13 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 49cce38..34b2f78 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1162,6 +1162,8 @@ static void
 WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 				bool last_write)
 {
+	TimestampTz	now = GetCurrentTimestamp();
+
 	/* output previously gathered data in a CopyData packet */
 	pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
 
@@ -1175,19 +1177,24 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
-	/* fast path */
-	/* Try to flush pending output to the client */
-	if (pq_flush_if_writable() != 0)
-		WalSndShutdown();
+	/* Try taking fast path unless we get too close to walsender timeout. */
+	if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
+										  wal_sender_timeout / 2))
+	{
+		CHECK_FOR_INTERRUPTS();
 
-	if (!pq_is_send_pending())
-		return;
+		/* Try to flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			WalSndShutdown();
+
+		if (!pq_is_send_pending())
+			return;
+	}
 
 	for (;;)
 	{
 		int			wakeEvents;
 		long		sleeptime;
-		TimestampTz now;
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
@@ -1216,18 +1223,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 		if (pq_flush_if_writable() != 0)
 			WalSndShutdown();
 
-		/* If we finished clearing the buffered data, we're done here. */
-		if (!pq_is_send_pending())
-			break;
-
-		now = GetCurrentTimestamp();
-
 		/* die if timeout was reached */
 		WalSndCheckTimeOut(now);
 
 		/* Send keepalive if the time has come */
 		WalSndKeepaliveIfNecessary(now);
 
+		/* If we finished clearing the buffered data, we're done here. */
+		if (!pq_is_send_pending())
+			break;
+
 		sleeptime = WalSndComputeSleeptime(now);
 
 		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
@@ -1237,6 +1242,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 		WaitLatchOrSocket(MyLatch, wakeEvents,
 						  MyProcPort->sock, sleeptime,
 						  WAIT_EVENT_WAL_SENDER_WRITE_DATA);
+
+		/* Update our idea of current time for the next cycle. */
+		now = GetCurrentTimestamp();
 	}
 
 	/* reactivate latch so WalSndLoop knows to continue */
-- 
2.7.4

