On Sat, Mar 5, 2022 at 1:26 AM Nathan Bossart <nathandboss...@gmail.com> wrote:
>
> My point is that there are existing tools for alerting processes when an
> LSN is synchronously replicated and for waking up WAL senders.  What I am
> proposing wouldn't involve spinning in XLogSendPhysical() waiting for
> synchronous replication.  Like SyncRepWaitForLSN(), we'd register our LSN
> in the queue (SyncRepQueueInsert()), but we wouldn't sit in a separate loop
> waiting to be woken.  Instead, SyncRepWakeQueue() would eventually wake up
> the WAL sender and trigger another iteration of WalSndLoop().

While we continue to discuss the other better design at [1], FWIW, I
would like to share a simpler patch that lets wal senders serving
async standbys wait until sync standbys report the flush lsn.
Obviously this is not an elegant way to solve the problem reported in
this thread, as I have this patch ready long back, I wanted to share
it here.

Nathan, of course, this is not something you wanted.

[1] 
https://www.postgresql.org/message-id/CALj2ACWCj60g6TzYMbEO07ZhnBGbdCveCrD413udqbRM0O59RA%40mail.gmail.com

Regards,
Bharath Rupireddy.
From 8a74dbd8c3fe3631b6a2ecee3d8c7a1c98429341 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Thu, 17 Mar 2022 17:01:53 +0000
Subject: [PATCH v2] Allow async standbys wait for sync replication

---
 doc/src/sgml/config.sgml                      |  22 +++
 src/backend/replication/syncrep.c             |  11 +-
 src/backend/replication/walsender.c           | 157 ++++++++++++++++++
 src/backend/utils/misc/guc.c                  |   9 +
 src/backend/utils/misc/postgresql.conf.sample |   3 +
 src/include/replication/syncrep.h             |   7 +
 src/include/replication/walsender.h           |   1 +
 7 files changed, 202 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 7a48973b3c..026e12f5ef 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4247,6 +4247,28 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-async-standbys-wait-for-sync_replication" xreflabel="async_standbys_wait_for_sync_replication">
+      <term><varname>async_standbys_wait_for_sync_replication</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>async_standbys_wait_for_sync_replication</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        When set, asynchronous standbys will wait until synchronous standbys
+        (that are either in quorum or priority based) receive WAL, flush and
+        acknowledge primary with the flush LSN. This behvaiour is particularly
+        important in the event of failover as it avoids manual steps required
+        on the asynchronous standbys that might go ahead of the synchronous
+        standbys.
+       </para>
+       <para>
+        This parameter can only be set in the <filename>postgresql.conf</filename>
+        file or on the server command line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ce163b99e9..398c11579c 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -97,8 +97,6 @@ static bool announce_next_takeover = true;
 SyncRepConfigData *SyncRepConfig = NULL;
 static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
 
-static void SyncRepQueueInsert(int mode);
-static void SyncRepCancelWait(void);
 static int	SyncRepWakeQueue(bool all, int mode);
 
 static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
@@ -120,9 +118,6 @@ static int	SyncRepGetStandbyPriority(void);
 static int	standby_priority_comparator(const void *a, const void *b);
 static int	cmp_lsn(const void *a, const void *b);
 
-#ifdef USE_ASSERT_CHECKING
-static bool SyncRepQueueIsOrderedByLSN(int mode);
-#endif
 
 /*
  * ===========================================================
@@ -335,7 +330,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
  * Usually we will go at tail of queue, though it's possible that we arrive
  * here out of order, so start at tail and work back to insertion point.
  */
-static void
+void
 SyncRepQueueInsert(int mode)
 {
 	PGPROC	   *proc;
@@ -368,7 +363,7 @@ SyncRepQueueInsert(int mode)
 /*
  * Acquire SyncRepLock and cancel any wait currently in progress.
  */
-static void
+void
 SyncRepCancelWait(void)
 {
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
@@ -979,7 +974,7 @@ SyncRepUpdateSyncStandbysDefined(void)
 }
 
 #ifdef USE_ASSERT_CHECKING
-static bool
+bool
 SyncRepQueueIsOrderedByLSN(int mode)
 {
 	PGPROC	   *proc = NULL;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 2d0292a092..97d32a6294 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -125,6 +125,8 @@ int			wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
 											 * data message */
 bool		log_replication_commands = false;
 
+bool		async_standbys_wait_for_sync_replication = true;
+
 /*
  * State for WalSndWakeupRequest
  */
@@ -258,6 +260,8 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
 							  TimeLineID *tli_p);
 
+/* called by wal sender serving asynchronous standby */
+static void AsynWalSndWaitForSyncRepLSN(XLogRecPtr lsn);
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -2771,6 +2775,8 @@ XLogSendPhysical(void)
 		SendRqstPtr = GetFlushRecPtr(NULL);
 	}
 
+	AsynWalSndWaitForSyncRepLSN(SendRqstPtr);
+
 	/*
 	 * Record the current system time as an approximation of the time at which
 	 * this WAL location was written for the purposes of lag tracking.
@@ -2995,6 +3001,22 @@ XLogSendLogical(void)
 
 	if (record != NULL)
 	{
+		/*
+		 * At this point, we do not know whether the current LSN (ReadRecPtr)
+		 * is required by any of the logical decoding output plugins which is
+		 * only known at the plugin level. If we were to decide whether to wait
+		 * or not for the synchronous standbys flush LSN at the plugin level,
+		 * we might have to pass extra information to it which doesn't sound an
+		 * elegant way.
+		 *
+		 * Another way the output plugins can wait there before sending the WAL
+		 * is by reading the flush LSN from the logical replication slots.
+		 *
+		 * Waiting here i.e. before even the logical decoding kicks in, makes
+		 * the code clean.
+		 */
+		AsynWalSndWaitForSyncRepLSN(logical_decoding_ctx->reader->ReadRecPtr);
+
 		/*
 		 * Note the lack of any call to LagTrackerWrite() which is handled by
 		 * WalSndUpdateProgress which is called by output plugin through
@@ -3789,3 +3811,138 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
 	Assert(time != 0);
 	return now - time;
 }
+
+/*
+ * This function is similar to SyncRepWaitForLSN() in syncrep.c. Only
+ * difference is in the way it waits with WalSndWait and exits when
+ * got_STOPPING or got_SIGUSR2 is set.
+ */
+void
+AsynWalSndWaitForSyncRepLSN(XLogRecPtr lsn)
+{
+	int			mode;
+
+	/*
+	 * Fast exit in case we are told to not wait.
+	 */
+	if (!async_standbys_wait_for_sync_replication)
+		return;
+
+	/*
+	 * Fast exit in case the wal sender is serving synchronous standby at the
+	 * moment as it has no business here.
+	 */
+	if (MyWalSnd->sync_standby_priority > 0)
+		return;
+
+	/*
+	 * Fast exit if user has not requested sync replication, or there are no
+	 * sync replication standby names defined.
+	 *
+	 * Since this routine gets called every commit time, it's important to
+	 * exit quickly if sync replication is not requested. So we check
+	 * WalSndCtl->sync_standbys_defined flag without the lock and exit
+	 * immediately if it's false. If it's true, we need to check it again
+	 * later while holding the lock, to check the flag and operate the sync
+	 * rep queue atomically. This is necessary to avoid the race condition
+	 * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
+	 * it's false, the lock is not necessary because we don't touch the queue.
+	 */
+	if (!SyncRepRequested() ||
+		!((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+		return;
+
+	mode = SYNC_REP_WAIT_FLUSH;
+
+	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+	Assert(WalSndCtl != NULL);
+
+	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+	Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
+
+	/*
+	 * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
+	 * set.  See SyncRepUpdateSyncStandbysDefined.
+	 *
+	 * Also check that the standby hasn't already replied. Unlikely race
+	 * condition but we'll be fetching that cache line anyway so it's likely
+	 * to be a low cost check.
+	 */
+	if (!WalSndCtl->sync_standbys_defined ||
+		lsn <= WalSndCtl->lsn[mode])
+	{
+		LWLockRelease(SyncRepLock);
+		return;
+	}
+
+	/*
+	 * Set our waitLSN so WALSender will know when to wake us, and add
+	 * ourselves to the queue.
+	 */
+	MyProc->waitLSN = lsn;
+	MyProc->syncRepState = SYNC_REP_WAITING;
+	SyncRepQueueInsert(mode);
+	Assert(SyncRepQueueIsOrderedByLSN(mode));
+	LWLockRelease(SyncRepLock);
+
+	/*
+	 * Wait for specified LSN to be confirmed.
+	 *
+	 * Each proc has its own wait latch, so we perform a normal latch
+	 * check/wait loop here.
+	 */
+	for (;;)
+	{
+		/* Must reset the latch before testing state. */
+		ResetLatch(MyLatch);
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Process any requests or signals received recently */
+		if (ConfigReloadPending)
+		{
+			ConfigReloadPending = false;
+			ProcessConfigFile(PGC_SIGHUP);
+			SyncRepInitConfig();
+		}
+
+		if (!async_standbys_wait_for_sync_replication)
+			break;
+
+		if (MyWalSnd->sync_standby_priority > 0)
+			break;
+
+		/*
+		 * Acquiring the lock is not needed, the latch ensures proper
+		 * barriers. If it looks like we're done, we must really be done,
+		 * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
+		 * it will never update it again, so we can't be seeing a stale value
+		 * in that case.
+		 */
+		if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
+			break;
+
+		/* Sleep until something happens or we time out */
+		WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, -1,
+				   WAIT_EVENT_SYNC_REP);
+
+		if (got_STOPPING || got_SIGUSR2)
+		{
+			SyncRepCancelWait();
+			break;
+		}
+	}
+
+	/*
+	 * WalSender has checked our LSN and has removed us from queue. Clean up
+	 * state and leave.  It's OK to reset these shared memory fields without
+	 * holding SyncRepLock, because any walsenders will ignore us anyway when
+	 * we're not on the queue.  We need a read barrier to make sure we see the
+	 * changes to the queue link (this might be unnecessary without
+	 * assertions, but better safe than sorry).
+	 */
+	pg_read_barrier();
+	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
+	MyProc->waitLSN = 0;
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e7f0a380e6..cd5f8b5a42 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1388,6 +1388,15 @@ static struct config_bool ConfigureNamesBool[] =
 		false,
 		NULL, NULL, NULL
 	},
+	{
+		{"async_standbys_wait_for_sync_replication", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets whether asynchronous standbys should wait until synchronous standbys receive and flush WAL."),
+			NULL
+		},
+		&async_standbys_wait_for_sync_replication,
+		true,
+		NULL, NULL, NULL
+	},
 	{
 		{"debug_assertions", PGC_INTERNAL, PRESET_OPTIONS,
 			gettext_noop("Shows whether the running server has assertion checks enabled."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 4cf5b26a36..9e6dc5082b 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -308,6 +308,9 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
+#async_standbys_wait_for_sync_replication = on # Specifies whether asynchronous
+				# standbys should wait until synchronous standbys receive and
+				# flush WAL
 
 # - Primary Server -
 
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 27be230d77..b433528174 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -90,6 +90,13 @@ extern void SyncRepCleanupAtProcExit(void);
 /* called by wal sender */
 extern void SyncRepInitConfig(void);
 extern void SyncRepReleaseWaiters(void);
+extern void SyncRepQueueInsert(int mode);
+extern void SyncRepCancelWait(void);
+
+#ifdef USE_ASSERT_CHECKING
+extern bool SyncRepQueueIsOrderedByLSN(int mode);
+#endif
+
 
 /* called by wal sender and user backend */
 extern int	SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index b1892e9e4b..61b113ae48 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -34,6 +34,7 @@ extern bool wake_wal_senders;
 extern int	max_wal_senders;
 extern int	wal_sender_timeout;
 extern bool log_replication_commands;
+extern bool async_standbys_wait_for_sync_replication;
 
 extern void InitWalSender(void);
 extern bool exec_replication_command(const char *query_string);
-- 
2.25.1

Reply via email to