Hi,
On 3/8/23 11:25 AM, Drouvot, Bertrand wrote:
Hi,
On 3/3/23 5:26 PM, Drouvot, Bertrand wrote:
Hi,
On 3/3/23 8:58 AM, Jeff Davis wrote:
On Thu, 2023-03-02 at 11:45 -0800, Jeff Davis wrote:
In this case it looks easier to add the right API than to be sure
about
whether it's needed or not.
I attached a sketch of one approach.
Oh, that's very cool, thanks a lot!
I'm not very confident that it's
the right API or even that it works as I intended it, but if others
like the approach I can work on it some more.
I'll look at it early next week.
So, I took your patch and as an example I tried a quick integration in 0004,
(see 0004_new_API.txt attached) to put it in the logical decoding on standby
context.
Based on this, I've 3 comments:
- Maybe ConditionVariableEventSleep() should take care of the “WaitEventSetWait
returns 1 and cvEvent.event == WL_POSTMASTER_DEATH” case?
- Maybe ConditionVariableEventSleep() could accept and deal with the CV being
NULL?
I used it in the POC attached to handle logical decoding on the primary server
case.
One option should be to create a dedicated CV for that case though.
- In the POC attached I had to add this extra condition “(cv &&
!RecoveryInProgress())” to avoid waiting on the timeout when there is a promotion.
That makes me think that we may want to add 2 extra parameters (as 2 functions
returning a bool?) to ConditionVariableEventSleep()
to check whether or not we still want to test the socket or the CV wake up in
each loop iteration.
Also 3 additional remarks:
1) About InitializeConditionVariableWaitSet() and ConditionVariableWaitSetCreate(): I'm
not sure about the naming as there is no CV yet (they "just" deal with
WaitEventSet).
So, what about renaming?
+static WaitEventSet *ConditionVariableWaitSet = NULL;
to say, "LocalWaitSet" and then rename ConditionVariableWaitSetLatchPos,
InitializeConditionVariableWaitSet() and ConditionVariableWaitSetCreate() accordingly?
But it might be not needed (see 3) below).
2)
/*
* Prepare to wait on a given condition variable.
*
@@ -97,7 +162,8 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
void
ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
{
- (void) ConditionVariableTimedSleep(cv, -1 /* no timeout */ ,
+ (void) ConditionVariableEventSleep(cv, ConditionVariableWaitSet,
+ -1
/* no timeout */ ,
wait_event_info);
}
@@ -111,11 +177,27 @@ ConditionVariableSleep(ConditionVariable *cv, uint32
wait_event_info)
bool
ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
uint32 wait_event_info)
+{
+ return ConditionVariableEventSleep(cv, ConditionVariableWaitSet,
timeout,
+
wait_event_info);
+}
+
I like the idea of making use of the new ConditionVariableEventSleep() here,
but on the other hand...
3)
I wonder if there is no race conditions: ConditionVariableWaitSet is being
initialized with PGINVALID_SOCKET
as WL_LATCH_SET and might be also (if IsUnderPostmaster) be initialized with
PGINVALID_SOCKET as WL_EXIT_ON_PM_DEATH.
So IIUC, the patch is introducing 2 new possible source of wake up.
Then, what about?
- not create ConditionVariableWaitSet, ConditionVariableWaitSetLatchPos,
InitializeConditionVariableWaitSet() and ConditionVariableWaitSetCreate() at
all?
- call ConditionVariableEventSleep() with a NULL parameter in
ConditionVariableSleep() and ConditionVariableTimedSleep()?
- handle the case where the WaitEventSet parameter is NULL in
ConditionVariableEventSleep()? (That could also make sense if we handle the
case of the CV being NULL as proposed above)
I gave it a try, so please find attached
v2-0001-Introduce-ConditionVariableEventSleep.txt (implementing the comments
above) and 0004_new_API.txt to put the new API in the logical decoding on
standby context.
There is no change in v2-0001-Introduce-ConditionVariableEventSleep.txt
regarding the up-thread comment related to WL_POSTMASTER_DEATH.
What do you think?
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 9a820140b7356ab94479499a80fc4742403f3ca5 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Fri, 10 Mar 2023 10:58:23 +0000
Subject: [PATCH v99 5/7] Fixing Walsender corner case with logical decoding on
standby.
The problem is that WalSndWaitForWal() waits for the *replay* LSN to
increase, but gets woken up by walreceiver when new WAL has been
flushed. Which means that typically walsenders will get woken up at the
same time that the startup process will be - which means that by the
time the logical walsender checks GetXLogReplayRecPtr() it's unlikely
that the startup process already replayed the record and updated
XLogCtl->lastReplayedEndRecPtr.
Introducing a new condition variable and a new API ConditionVariableEventSleep()
to fix this corner case.
---
doc/src/sgml/monitoring.sgml | 4 ++++
src/backend/access/transam/xlogrecovery.c | 28 +++++++++++++++++++++++
src/backend/replication/walsender.c | 18 ++++++++++++++-
src/backend/utils/activity/wait_event.c | 3 +++
src/include/access/xlogrecovery.h | 3 +++
src/include/replication/walsender.h | 1 +
src/include/utils/wait_event.h | 1 +
7 files changed, 57 insertions(+), 1 deletion(-)
7.8% doc/src/sgml/
52.1% src/backend/access/transam/
27.1% src/backend/replication/
4.5% src/backend/utils/activity/
4.5% src/include/access/
3.7% src/include/
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index cdf7c09b4b..9af8d58da2 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1857,6 +1857,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss
11:34 0:00 postgres: ser
<entry>Waiting for startup process to send initial data for streaming
replication.</entry>
</row>
+ <row>
+ <entry><literal>WalSenderWaitReplay</literal></entry>
+ <entry>Waiting for startup process to replay write-ahead log.</entry>
+ </row>
<row>
<entry><literal>XactGroupUpdate</literal></entry>
<entry>Waiting for the group leader to update transaction status at
diff --git a/src/backend/access/transam/xlogrecovery.c
b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..8a9505a52d 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData
RecoveryPauseState recoveryPauseState;
ConditionVariable recoveryNotPausedCV;
+ /* Replay state (see check_for_replay() for more explanation) */
+ ConditionVariable replayedCV;
+
slock_t info_lck; /* locks shared variables shown
above */
} XLogRecoveryCtlData;
@@ -468,6 +471,7 @@ XLogRecoveryShmemInit(void)
SpinLockInit(&XLogRecoveryCtl->info_lck);
InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
+ ConditionVariableInit(&XLogRecoveryCtl->replayedCV);
}
/*
@@ -1935,6 +1939,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord
*record, TimeLineID *repl
XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ /*
+ * wake up walsender(s) used by logical decoding on standby.
+ */
+ ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV);
+
/*
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
* receiver so that it notices the updated lastReplayedEndRecPtr and
sends
@@ -4942,3 +4951,22 @@ assign_recovery_target_xid(const char *newval, void
*extra)
else
recoveryTarget = RECOVERY_TARGET_UNSET;
}
+
+/*
+ * Return the ConditionVariable indicating that a replay has been done.
+ *
+ * This is needed for logical decoding on standby. Indeed the "problem" is that
+ * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up
+ * by walreceiver when new WAL has been flushed. Which means that typically
+ * walsenders will get woken up at the same time that the startup process
+ * will be - which means that by the time the logical walsender checks
+ * GetXLogReplayRecPtr() it's unlikely that the startup process already
replayed
+ * the record and updated XLogCtl->lastReplayedEndRecPtr.
+ *
+ * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case.
+ */
+ConditionVariable *
+check_for_replay(void)
+{
+ return &XLogRecoveryCtl->replayedCV;
+}
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index 3042e5bd64..8ef22616bb 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1551,7 +1551,9 @@ static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)
{
int wakeEvents;
+ uint32 wait_event;
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+ ConditionVariable *cv = NULL;
/*
* Fast path to avoid acquiring the spinlock in case we already know we
@@ -1564,9 +1566,20 @@ WalSndWaitForWal(XLogRecPtr loc)
/* Get a more recent flush pointer. */
if (!RecoveryInProgress())
+ {
RecentFlushPtr = GetFlushRecPtr(NULL);
+ wait_event = WAIT_EVENT_WAL_SENDER_WAIT_WAL;
+ }
else
+ {
RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+ wait_event = WAIT_EVENT_WAL_SENDER_WAIT_REPLAY;
+ cv = check_for_replay();
+ }
+
+ /* Prepare the cv to sleep */
+ if (cv)
+ ConditionVariablePrepareToSleep(cv);
for (;;)
{
@@ -1667,9 +1680,12 @@ WalSndWaitForWal(XLogRecPtr loc)
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
- WalSndWait(wakeEvents, sleeptime,
WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, wakeEvents,
NULL);
+ ConditionVariableEventSleep(cv, RecoveryInProgress,
FeBeWaitSet, NULL,
+
sleeptime, wait_event);
}
+ ConditionVariableCancelSleep();
/* reactivate latch so WalSndLoop knows to continue */
SetLatch(MyLatch);
return RecentFlushPtr;
diff --git a/src/backend/utils/activity/wait_event.c
b/src/backend/utils/activity/wait_event.c
index cb99cc6339..a10dcd4e61 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -466,6 +466,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
event_name = "WalReceiverWaitStart";
break;
+ case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY:
+ event_name = "WalSenderWaitReplay";
+ break;
case WAIT_EVENT_XACT_GROUP_UPDATE:
event_name = "XactGroupUpdate";
break;
diff --git a/src/include/access/xlogrecovery.h
b/src/include/access/xlogrecovery.h
index 47c29350f5..2bfeaaa00f 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -15,6 +15,7 @@
#include "catalog/pg_control.h"
#include "lib/stringinfo.h"
#include "utils/timestamp.h"
+#include "storage/condition_variable.h"
/*
* Recovery target type.
@@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char
*param_name, int currValue,
extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
+extern ConditionVariable *check_for_replay(void);
+
#endif /* XLOGRECOVERY_H */
diff --git a/src/include/replication/walsender.h
b/src/include/replication/walsender.h
index 52bb3e2aae..2fd745fe72 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -13,6 +13,7 @@
#define _WALSENDER_H
#include <signal.h>
+#include "storage/condition_variable.h"
/*
* What to do with a snapshot in create replication slot command.
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 9ab23e1c4a..548ef41dca 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -131,6 +131,7 @@ typedef enum
WAIT_EVENT_SYNC_REP,
WAIT_EVENT_WAL_RECEIVER_EXIT,
WAIT_EVENT_WAL_RECEIVER_WAIT_START,
+ WAIT_EVENT_WAL_SENDER_WAIT_REPLAY,
WAIT_EVENT_XACT_GROUP_UPDATE
} WaitEventIPC;
--
2.34.1
From 0044078a540fcb2b5f5c728dcb7e4911b000d6d5 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Fri, 10 Mar 2023 10:57:22 +0000
Subject: [PATCH v99 4/7] Introduce-ConditionVariableEventSleep
---
src/backend/storage/lmgr/condition_variable.c | 65 ++++++++++++++-----
src/include/storage/condition_variable.h | 7 ++
2 files changed, 57 insertions(+), 15 deletions(-)
89.0% src/backend/storage/lmgr/
10.9% src/include/storage/
diff --git a/src/backend/storage/lmgr/condition_variable.c
b/src/backend/storage/lmgr/condition_variable.c
index 7e2bbf46d9..af241e7317 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -97,7 +97,8 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
void
ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
{
- (void) ConditionVariableTimedSleep(cv, -1 /* no timeout */ ,
+ (void) ConditionVariableEventSleep(cv, NULL, NULL, NULL,
+ -1
/* no timeout */ ,
wait_event_info);
}
@@ -111,11 +112,28 @@ ConditionVariableSleep(ConditionVariable *cv, uint32
wait_event_info)
bool
ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
uint32 wait_event_info)
+{
+ return ConditionVariableEventSleep(cv, NULL, NULL, NULL, timeout,
+
wait_event_info);
+}
+
+/*
+ * Wait for a condition variable to be signaled, a timeout to be reached, or a
+ * socket event in the given waitset.
+ *
+ * Returns true when timeout expires, otherwise returns false.
+ *
+ * See ConditionVariableSleep() for general usage.
+ */
+bool
+ConditionVariableEventSleep(ConditionVariable *cv, bool
(*cv_resume_waiting)(void),
+ WaitEventSet *waitset,
+ bool
(*waitset_resume_waiting)(void),
+ long timeout, uint32
wait_event_info)
{
long cur_timeout = -1;
instr_time start_time;
instr_time cur_time;
- int wait_events;
/*
* If the caller didn't prepare to sleep explicitly, then do so now and
@@ -132,7 +150,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long
timeout,
* If we are currently prepared to sleep on some other CV, we just
cancel
* that and prepare this one; see ConditionVariablePrepareToSleep.
*/
- if (cv_sleep_target != cv)
+ if (cv && cv_sleep_target != cv)
{
ConditionVariablePrepareToSleep(cv);
return false;
@@ -147,24 +165,29 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long
timeout,
INSTR_TIME_SET_CURRENT(start_time);
Assert(timeout >= 0 && timeout <= INT_MAX);
cur_timeout = timeout;
- wait_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
}
- else
- wait_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
while (true)
{
bool done = false;
+ WaitEvent cvEvent;
+ int nevents = 0;
/*
- * Wait for latch to be set. (If we're awakened for some other
- * reason, the code below will cope anyway.)
+ * Wait for latch to be set, or other events which will be
handled
+ * below.
*/
- (void) WaitLatch(MyLatch, wait_events, cur_timeout,
wait_event_info);
+ if (waitset)
+ nevents = WaitEventSetWait(waitset, cur_timeout,
&cvEvent,
+ 1,
wait_event_info);
/* Reset latch before examining the state of the wait list. */
ResetLatch(MyLatch);
+ /* If a socket event occurred, no need to check wait list. */
+ if (nevents == 1 && (cvEvent.events & WL_SOCKET_MASK) != 0)
+ return true;
+
/*
* If this process has been taken out of the wait list, then we
know
* that it has been signaled by ConditionVariableSignal (or
@@ -180,13 +203,25 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long
timeout,
* by something other than ConditionVariableSignal; though we
don't
* guarantee not to return spuriously, we'll avoid this obvious
case.
*/
- SpinLockAcquire(&cv->mutex);
- if (!proclist_contains(&cv->wakeup, MyProc->pgprocno,
cvWaitLink))
+
+ if (cv)
{
- done = true;
- proclist_push_tail(&cv->wakeup, MyProc->pgprocno,
cvWaitLink);
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_contains(&cv->wakeup, MyProc->pgprocno,
cvWaitLink))
+ {
+ done = true;
+ proclist_push_tail(&cv->wakeup,
MyProc->pgprocno, cvWaitLink);
+ }
+ SpinLockRelease(&cv->mutex);
}
- SpinLockRelease(&cv->mutex);
+
+ /* If we are not waiting on a CV or don't want to wait anymore
*/
+ if (!cv || (cv && cv_resume_waiting && !cv_resume_waiting()))
+ done = true;
+
+ /* If we don't want to wait on the waitset anymore */
+ if (waitset && waitset_resume_waiting &&
!waitset_resume_waiting())
+ done = true;
/*
* Check for interrupts, and return spuriously if that caused
the
@@ -194,7 +229,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long
timeout,
* waited for a different condition variable).
*/
CHECK_FOR_INTERRUPTS();
- if (cv != cv_sleep_target)
+ if (cv && cv != cv_sleep_target)
done = true;
/* We were signaled, so return */
diff --git a/src/include/storage/condition_variable.h
b/src/include/storage/condition_variable.h
index 589bdd323c..b9510caa17 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -22,6 +22,7 @@
#ifndef CONDITION_VARIABLE_H
#define CONDITION_VARIABLE_H
+#include "storage/latch.h"
#include "storage/proclist_types.h"
#include "storage/spin.h"
@@ -56,6 +57,12 @@ extern void ConditionVariableInit(ConditionVariable *cv);
extern void ConditionVariableSleep(ConditionVariable *cv, uint32
wait_event_info);
extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
uint32 wait_event_info);
+extern bool ConditionVariableEventSleep(ConditionVariable *cv,
+
bool (*cv_resume_waiting)(void),
+
WaitEventSet *cvEventSet,
+
bool (*waitset_resume_waiting)(void),
+
long timeout,
+
uint32 wait_event_info);
extern void ConditionVariableCancelSleep(void);
/*
--
2.34.1