At Tue, 9 Feb 2021 12:27:21 +0530, Bharath Rupireddy 
<bharath.rupireddyforpostg...@gmail.com> wrote in 
> What I meant was that if we were to add waiting logic inside
> pg_wal_replay_pause, we should also have a timeout with some default
> value, to avoid pg_wal_replay_pause waiting forever in the waiting
> loop. Within that timeout, if the recovery isn't paused,
> pg_wal_replay_pause will return probably a warning and a false(this
> requires us to change the return value of the existing
> pg_wal_replay_pause)?

I thought that rm_redo finishes shortly unless any trouble
happens. But on second thought, I found that I forgot a case of a
recovery-conflict. So as you pointed out, pg_wal_replay_pause() needs
a flag 'wait' to wait for a pause established. And the flag can be
turned into "timeout".

# And the prevous verision had another silly bug.

> To avoid changing the existing API and return type, a new function
> pg_get_wal_replay_pause_state is introduced.

I mentioned about IN parameters, not OUTs. IN parameters can be
optional to accept existing usage. pg_wal_replay_pause() is changed
that way in the attached.

If all of you still disagree with my proposal, I withdraw it.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 1ab31a9056..7eb93f74dd 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -25320,14 +25320,19 @@ postgres=# SELECT * FROM 
pg_walfile_name_offset(pg_stop_backup());
         <indexterm>
          <primary>pg_wal_replay_pause</primary>
         </indexterm>
-        <function>pg_wal_replay_pause</function> ()
+        <function>pg_wal_replay_pause</function> (
+        <optional> <parameter>timeout</parameter> <type>integer</type>
+        </optional> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
         Pauses recovery.  While recovery is paused, no further database
         changes are applied.  If hot standby is active, all new queries will
         see the same consistent snapshot of the database, and no further query
-        conflicts will be generated until recovery is resumed.
+        conflicts will be generated until recovery is resumed.  Zero or
+        positive timeout value means the function errors out after that
+        milliseconds elapsed before recovery is paused (default is -1, wait
+        forever).
        </para>
        <para>
         This function is restricted to superusers by default, but other users
diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 8e3b5df7dc..8fd614cded 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6072,10 +6072,61 @@ RecoveryIsPaused(void)
        return recoveryPause;
 }
 
+/*
+ * Pauses recovery.
+ *
+ * It is guaranteed that no WAL replay happens after this function returns. If
+ * timeout is zero or positive, emits ERROR when the timeout is reached before
+ * recovery is paused.
+ */
 void
-SetRecoveryPause(bool recoveryPause)
+SetRecoveryPause(bool recoveryPause, int timeout)
 {
+       TimestampTz finish_time = 0;
+       TimestampTz now;
+       int                 sleep_ms;
+
        SpinLockAcquire(&XLogCtl->info_lck);
+
+       /* No need of timeout in the startup process */
+       Assert(!InRecovery || timeout < 0);
+
+       /*
+        * Wait for the concurrent rm_redo() to finish, so that no records will 
be
+        * applied after this function returns. No need to wait while resuming.
+        * Anyway we are requesting a recovery pause, we don't mind a possible 
slow
+        * down of recovery by the info_lck here.  We don't need to wait in the
+        * startup process since no concurrent rm_redo() runs.
+        */
+       while(!InRecovery &&
+                 recoveryPause && !XLogCtl->recoveryPause &&
+                 XLogCtl->replayEndRecPtr != XLogCtl->lastReplayedEndRecPtr)
+       {
+               SpinLockRelease(&XLogCtl->info_lck);
+               now = GetCurrentTimestamp();
+
+               if (timeout >= 0)
+               {
+                       if (timeout > 0 && finish_time == 0)
+                               finish_time = TimestampTzPlusMilliseconds(now, 
timeout);
+
+                       if (finish_time < now)
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_SQL_STATEMENT_NOT_YET_COMPLETE),
+                                                errmsg ("could not pause 
recovery: timed out")));
+               }
+
+               CHECK_FOR_INTERRUPTS();
+
+               sleep_ms = 10000L;              /* 10 ms */
+
+               /* finish_time may be reached earlier than 10ms */
+               if (finish_time > 0)
+                       Min(sleep_ms, TimestampDifferenceMilliseconds(now, 
finish_time));
+
+               pg_usleep(sleep_ms);
+               SpinLockAcquire(&XLogCtl->info_lck);
+       }
        XLogCtl->recoveryPause = recoveryPause;
        SpinLockRelease(&XLogCtl->info_lck);
 }
@@ -6270,7 +6321,7 @@ RecoveryRequiresIntParameter(const char *param_name, int 
currValue, int minValue
                                                           currValue,
                                                           minValue)));
 
-                       SetRecoveryPause(true);
+                       SetRecoveryPause(true, -1);
 
                        ereport(LOG,
                                        (errmsg("recovery has paused"),
@@ -7262,6 +7313,7 @@ StartupXLOG(void)
                        do
                        {
                                bool            switchedTLI = false;
+                               bool            pause_requested = false;
 
 #ifdef WAL_DEBUG
                                if (XLOG_DEBUG ||
@@ -7292,11 +7344,9 @@ StartupXLOG(void)
                                 * Note that we intentionally don't take the 
info_lck spinlock
                                 * here.  We might therefore read a slightly 
stale value of
                                 * the recoveryPause flag, but it can't be very 
stale (no
-                                * worse than the last spinlock we did 
acquire).  Since a
-                                * pause request is a pretty asynchronous thing 
anyway,
-                                * possibly responding to it one WAL record 
later than we
-                                * otherwise would is a minor issue, so it 
doesn't seem worth
-                                * adding another spinlock cycle to prevent 
that.
+                                * worse than the last spinlock we did 
acquire). We eventually
+                                * make sure catching the pause request if any 
just before
+                                * applying this record.
                                 */
                                if (((volatile XLogCtlData *) 
XLogCtl)->recoveryPause)
                                        recoveryPausesHere(false);
@@ -7385,12 +7435,19 @@ StartupXLOG(void)
                                /*
                                 * Update shared replayEndRecPtr before 
replaying this record,
                                 * so that XLogFlush will update 
minRecoveryPoint correctly.
+                                * Also we check for the correct value of the 
recoveryPause
+                                * flag here not to have redo overrun during a 
pause. See
+                                * SetRecoveryPuase() for details.
                                 */
                                SpinLockAcquire(&XLogCtl->info_lck);
                                XLogCtl->replayEndRecPtr = EndRecPtr;
                                XLogCtl->replayEndTLI = ThisTimeLineID;
+                               pause_requested = XLogCtl->recoveryPause;
                                SpinLockRelease(&XLogCtl->info_lck);
 
+                               if (pause_requested)
+                                       recoveryPausesHere(false);
+
                                /*
                                 * If we are attempting to enter Hot Standby 
mode, process
                                 * XIDs we see
@@ -7497,7 +7554,7 @@ StartupXLOG(void)
                                                proc_exit(3);
 
                                        case RECOVERY_TARGET_ACTION_PAUSE:
-                                               SetRecoveryPause(true);
+                                               SetRecoveryPause(true, -1);
                                                recoveryPausesHere(true);
 
                                                /* drop into promote */
diff --git a/src/backend/access/transam/xlogfuncs.c 
b/src/backend/access/transam/xlogfuncs.c
index 5e1aab319d..4c8c41e0bc 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -538,7 +538,7 @@ pg_wal_replay_pause(PG_FUNCTION_ARGS)
                                 errhint("%s cannot be executed after promotion 
is triggered.",
                                                 "pg_wal_replay_pause()")));
 
-       SetRecoveryPause(true);
+       SetRecoveryPause(true, PG_GETARG_INT32(0));
 
        PG_RETURN_VOID();
 }
@@ -565,7 +565,7 @@ pg_wal_replay_resume(PG_FUNCTION_ARGS)
                                 errhint("%s cannot be executed after promotion 
is triggered.",
                                                 "pg_wal_replay_resume()")));
 
-       SetRecoveryPause(false);
+       SetRecoveryPause(false, -1);
 
        PG_RETURN_VOID();
 }
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index fa58afd9d7..e03f22f350 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1264,6 +1264,11 @@ CREATE OR REPLACE FUNCTION
   RETURNS boolean STRICT VOLATILE LANGUAGE INTERNAL AS 'pg_promote'
   PARALLEL SAFE;
 
+CREATE OR REPLACE FUNCTION
+  pg_wal_replay_pause(timeout int4 DEFAULT -1)
+  RETURNS void VOLATILE LANGUAGE internal AS 'pg_wal_replay_pause'
+  PARALLEL SAFE;
+
 -- legacy definition for compatibility with 9.3
 CREATE OR REPLACE FUNCTION
   json_populate_record(base anyelement, from_json json, use_json_as_text 
boolean DEFAULT false)
@@ -1473,7 +1478,7 @@ REVOKE EXECUTE ON FUNCTION pg_stop_backup() FROM public;
 REVOKE EXECUTE ON FUNCTION pg_stop_backup(boolean, boolean) FROM public;
 REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public;
 REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
-REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public;
+REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause(int) FROM public;
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public;
 REVOKE EXECUTE ON FUNCTION pg_rotate_logfile() FROM public;
 REVOKE EXECUTE ON FUNCTION pg_reload_conf() FROM public;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 75ec1073bd..397e206433 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -311,7 +311,7 @@ extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID 
*replayTLI);
 extern XLogRecPtr GetXLogInsertRecPtr(void);
 extern XLogRecPtr GetXLogWriteRecPtr(void);
 extern bool RecoveryIsPaused(void);
-extern void SetRecoveryPause(bool recoveryPause);
+extern void SetRecoveryPause(bool recoveryPause, int timeout);
 extern TimestampTz GetLatestXTime(void);
 extern TimestampTz GetCurrentChunkReplayStartTime(void);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4e0c9be58c..a646721c3c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6222,7 +6222,7 @@
 
 { oid => '3071', descr => 'pause wal replay',
   proname => 'pg_wal_replay_pause', provolatile => 'v', prorettype => 'void',
-  proargtypes => '', prosrc => 'pg_wal_replay_pause' },
+  proargtypes => 'int4', prosrc => 'pg_wal_replay_pause' },
 { oid => '3072', descr => 'resume wal replay, if it was paused',
   proname => 'pg_wal_replay_resume', provolatile => 'v', prorettype => 'void',
   proargtypes => '', prosrc => 'pg_wal_replay_resume' },

Reply via email to