At Wed, 9 Jun 2021 11:21:55 +0900, Kyotaro Horiguchi <horikyota....@gmail.com> 
wrote in 
> The issue - if actually it is - we send a keep-alive packet before a
> quite short sleep.
> 
> We really want to send it if the sleep gets long but we cannot predict
> that before entering a sleep.
> 
> Let me think a little more on this..

After some investigation, I find out that the keepalives are sent
almost always after XLogSendLogical requests for the *next* record. In
most of the cases the record is not yet inserted at the request time
but insertd very soon (in 1-digit milliseconds). It doesn't seem to be
expected that that happens with such a high frequency when
XLogSendLogical is keeping up-to-date with the bleeding edge of WAL
records.

It is completely unpredictable when the next record comes, so we
cannot decide whether to send a keepalive or not at the current
timing.

Since we want to send a keepalive when we have nothing to send for a
while, it is a bit different to keep sending keepalives at some
intervals while the loop is busy.

As a possible solution, the attached patch splits the sleep into two
pieces. If the first sleep reaches the timeout then send a keepalive
then sleep for the remaining time. The first timeout is quite
arbitrary but keepalive of 4Hz at maximum doesn't look so bad to me.

Is it acceptable?

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 109c723f4e..49b3c0d4e2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -105,6 +105,9 @@
  */
 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
 
+/* Minimum idle time for sending an idle-time keepalive in milliseconds */
+#define KEEPALIVE_TIMEOUT 250
+
 /* Array of WalSnds in shared memory */
 WalSndCtlData *WalSndCtl = NULL;
 
@@ -244,7 +247,7 @@ static void WalSndKeepalive(bool requestReply);
 static void WalSndKeepaliveIfNecessary(void);
 static void WalSndCheckTimeOut(void);
 static long WalSndComputeSleeptime(TimestampTz now);
-static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
+static int WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
@@ -1428,19 +1431,6 @@ WalSndWaitForWal(XLogRecPtr loc)
 		if (got_STOPPING)
 			break;
 
-		/*
-		 * We only send regular messages to the client for full decoded
-		 * transactions, but a synchronous replication and walsender shutdown
-		 * possibly are waiting for a later location. So, before sleeping, we
-		 * send a ping containing the flush location. If the receiver is
-		 * otherwise idle, this keepalive will trigger a reply. Processing the
-		 * reply will update these MyWalSnd locations.
-		 */
-		if (MyWalSnd->flush < sentPtr &&
-			MyWalSnd->write < sentPtr &&
-			!waiting_for_ping_response)
-			WalSndKeepalive(false);
-
 		/* check whether we're done */
 		if (loc <= RecentFlushPtr)
 			break;
@@ -1483,6 +1473,39 @@ WalSndWaitForWal(XLogRecPtr loc)
 		if (pq_is_send_pending())
 			wakeEvents |= WL_SOCKET_WRITEABLE;
 
+		/*
+		 * We only send regular messages to the client for full decoded
+		 * transactions, but a synchronous replication and walsender shutdown
+		 * possibly are waiting for a later location. So, before sleeping, we
+		 * send a ping containing the flush location. If the receiver is
+		 * otherwise idle, this keepalive will trigger a reply. Processing the
+		 * reply will update these MyWalSnd locations. If the sleep is shorter
+		 * than KEEPALIVE_TIMEOUT milliseconds, we skip sending a keepalive to
+		 * prevent it from getting too-frequent.
+		 */
+		if (MyWalSnd->flush < sentPtr &&
+			MyWalSnd->write < sentPtr &&
+			!waiting_for_ping_response)
+		{
+			if (sleeptime > KEEPALIVE_TIMEOUT)
+			{
+				int r;
+
+				r = WalSndWait(wakeEvents, KEEPALIVE_TIMEOUT,
+							   WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+
+				if (r != 0)
+					continue;
+
+				sleeptime -= KEEPALIVE_TIMEOUT;
+			}
+
+			WalSndKeepalive(false);
+
+			if (pq_flush_if_writable() != 0)
+				WalSndShutdown();
+		}
+		
 		WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
 	}
 
@@ -3136,15 +3159,18 @@ WalSndWakeup(void)
  * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags.  Exit
  * on postmaster death.
  */
-static void
+static int
 WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
 {
 	WaitEvent	event;
+	int			ret;
 
 	ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
-	if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
-		(event.events & WL_POSTMASTER_DEATH))
+	ret = WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event);
+	if (ret == 1 && (event.events & WL_POSTMASTER_DEATH))
 		proc_exit(1);
+
+	return ret;
 }
 
 /*

Reply via email to