On 14/09/10 05:02, Fujii Masao wrote:
+       /*
+        * Walreceiver sets this latch every time new WAL has been received and
+        * fsync'd to disk, allowing startup process to wait for new WAL to
+        * arrive.
+        */
+       Latch           receivedLatch;

I think that this latch should be available for other than walreceiver -
startup process communication. For example, backend - startup process
communication, which can be used for requesting a failover via SQL function
by users in the future. What about putting the latch in XLogCtl instead of
WalRcv and calling OwnLatch at the beginning of the startup process instead
of RequestXLogStreaming?

Yes, good point. I updated the patch along those lines, attached.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ddf7d79..b94aa24 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -46,6 +46,7 @@
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
+#include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/procarray.h"
 #include "storage/smgr.h"
@@ -393,6 +394,13 @@ typedef struct XLogCtlData
 	bool		SharedRecoveryInProgress;
 
 	/*
+	 * recoveryWakeupLatch is used to wake up the startup process to
+	 * continue WAL replay, if it is waiting for WAL to arrive or failover
+	 * trigger file to appear.
+	 */
+	Latch		recoveryWakeupLatch;
+
+	/*
 	 * During recovery, we keep a copy of the latest checkpoint record here.
 	 * Used by the background writer when it wants to create a restartpoint.
 	 *
@@ -4840,6 +4848,7 @@ XLOGShmemInit(void)
 	XLogCtl->SharedRecoveryInProgress = true;
 	XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
 	SpinLockInit(&XLogCtl->info_lck);
+	InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
 
 	/*
 	 * If we are not in bootstrap mode, pg_control should already exist. Read
@@ -5814,6 +5823,13 @@ StartupXLOG(void)
 					(errmsg("starting archive recovery")));
 	}
 
+	/*
+	 * Take ownership of the wakup latch if we're going to sleep during
+	 * recovery.
+	 */
+	if (StandbyMode)
+		OwnLatch(&XLogCtl->recoveryWakeupLatch);
+
 	if (read_backup_label(&checkPointLoc))
 	{
 		/*
@@ -9139,6 +9155,13 @@ startupproc_quickdie(SIGNAL_ARGS)
 }
 
 
+/* SIGUSR1: let latch facility handle the signal */
+static void
+StartupProcSigUsr1Handler(SIGNAL_ARGS)
+{
+	latch_sigusr1_handler();
+}
+
 /* SIGHUP: set flag to re-read config file at next convenient time */
 static void
 StartupProcSigHupHandler(SIGNAL_ARGS)
@@ -9213,7 +9236,7 @@ StartupProcessMain(void)
 	else
 		pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGUSR1, SIG_IGN);
+	pqsignal(SIGUSR1, StartupProcSigUsr1Handler);
 	pqsignal(SIGUSR2, SIG_IGN);
 
 	/*
@@ -9397,16 +9420,17 @@ retry:
 					}
 
 					/*
-					 * Data not here yet, so check for trigger then sleep.
+					 * Data not here yet, so check for trigger then sleep for
+					 * five seconds like in the WAL file polling case below.
 					 */
 					if (CheckForStandbyTrigger())
 						goto triggered;
 
 					/*
-					 * When streaming is active, we want to react quickly when
-					 * the next WAL record arrives, so sleep only a bit.
+					 * Wait for more WAL to arrive, or timeout to be reached
 					 */
-					pg_usleep(100000L); /* 100ms */
+					WaitLatch(&XLogCtl->recoveryWakeupLatch, 5000000L);
+					ResetLatch(&XLogCtl->recoveryWakeupLatch);
 				}
 				else
 				{
@@ -9681,3 +9705,13 @@ CheckForStandbyTrigger(void)
 	}
 	return false;
 }
+
+/*
+ * Wake up startup process to replay newly arrived WAL, or to notice that
+ * failover has been requested.
+ */
+void
+WakeupRecovery(void)
+{
+	SetLatch(&XLogCtl->recoveryWakeupLatch);
+}
diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c
index da06202..e39bf1c 100644
--- a/src/backend/port/win32_latch.c
+++ b/src/backend/port/win32_latch.c
@@ -230,6 +230,8 @@ NumSharedLatches(void)
 
 	/* Each walsender needs one latch */
 	numLatches += max_wal_senders;
+	/* One latch for startup process - walreceiver communication */
+	numLatches += 1;
 
 	return numLatches;
 }
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b868707..a20119b 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -529,6 +529,9 @@ XLogWalRcvFlush(void)
 		walrcv->receivedUpto = LogstreamResult.Flush;
 		SpinLockRelease(&walrcv->mutex);
 
+		/* Signal the startup process that new WAL has arrived */
+		WakeupRecovery();
+
 		/* Report XLOG streaming progress in PS display */
 		if (update_process_title)
 		{
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index b206885..308d54e 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -187,6 +187,9 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
 	if (recptr.xrecoff % XLogSegSize != 0)
 		recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
 
+	/*
+	 * Update shared memory status with information needed by walreceiver
+	 */
 	SpinLockAcquire(&walrcv->mutex);
 
 	/* It better be stopped before we try to restart it */
@@ -204,6 +207,7 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
 
 	SpinLockRelease(&walrcv->mutex);
 
+	/* Request postmaster to start the walreceiver process */
 	SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
 }
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 7d667d5..bad36f3 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -303,5 +303,6 @@ extern TimeLineID GetRecoveryTargetTLI(void);
 
 extern void HandleStartupProcInterrupts(void);
 extern void StartupProcessMain(void);
+extern void WakeupRecovery(void);
 
 #endif   /* XLOG_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to