On 2020/04/17 3:00, Tom Lane wrote:
Fujii Masao <masao.fu...@oss.nttdata.com> writes:
On 2020/04/14 22:52, Tom Lane wrote:
*Yes it does*. The existing code can deliver entirely broken results
if some walsender exits between where we examine the priorities and
where we fetch the WAL pointers.
IMO that the broken results can be delivered when walsender marked
as sync exits *and* new walsender comes at that moment. If this new
walsender uses the WalSnd slot that the exited walsender used,
SyncRepGetOldestSyncRecPtr() wronly calculates the oldest lsn based
on this new walsender (i.e., different walsender from one marked as sync).
Right, exactly, sorry that I was not more specific.
BTW, since the patch changes the API of SyncRepGetSyncStandbys(),
it should not be back-patched to avoid ABI break. Right?
Anything that is using that is just as broken as the core code is, for the
same reasons, so I don't have a problem with changing its API. Maybe we
should rename it while we're at it, just to make it clear that we are
breaking any external callers. (If there are any, which seems somewhat
unlikely.)
I agree to change the API if that's the only way to fix the bug. But ISTM that
we can fix the bug without changing the API, like the attached patch does.
Your patch changes the logic to pick up sync standbys, e.g., use qsort(),
in addition to the bug fix. This might be an improvement and I agree that
it's worth considering that idea for the master branch or v14. But I'm not
fan of adding such changes into the back branches if they are not
necessary for the bug fix. I like to basically keep the current logic as it is,
at least for the back branch, like the attached patch does.
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/src/backend/replication/syncrep.c
b/src/backend/replication/syncrep.c
index ffd5b31eb2..6add59f18c 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -108,14 +108,20 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr,
- List
*sync_standbys);
+ List
*sync_standbys,
+
SyncRepStandbyData *sdata);
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr,
-
List *sync_standbys, uint8 nth);
+
List *sync_standbys,
+
SyncRepStandbyData *sdata, uint8 nth);
static int SyncRepGetStandbyPriority(void);
-static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
-static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static List *SyncRepGetSyncStandbysInternal(bool *am_sync,
+
SyncRepStandbyData *sdata);
+static List *SyncRepGetSyncStandbysPriority(bool *am_sync,
+
SyncRepStandbyData *sdata);
+static List *SyncRepGetSyncStandbysQuorum(bool *am_sync,
+
SyncRepStandbyData *sdata);
static int cmp_lsn(const void *a, const void *b);
#ifdef USE_ASSERT_CHECKING
@@ -537,6 +543,7 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr
*flushPtr,
XLogRecPtr *applyPtr, bool *am_sync)
{
List *sync_standbys;
+ static SyncRepStandbyData *sdata = NULL;
Assert(LWLockHeldByMe(SyncRepLock));
@@ -545,8 +552,12 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr
*flushPtr,
*applyPtr = InvalidXLogRecPtr;
*am_sync = false;
+ if (sdata == NULL)
+ sdata = (SyncRepStandbyData *)
+ palloc(max_wal_senders * sizeof(SyncRepStandbyData));
+
/* Get standbys that are considered as synchronous at this moment */
- sync_standbys = SyncRepGetSyncStandbys(am_sync);
+ sync_standbys = SyncRepGetSyncStandbysInternal(am_sync, sdata);
/*
* Quick exit if we are not managing a sync standby or there are not
@@ -576,12 +587,13 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr
*flushPtr,
if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
{
SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
-
sync_standbys);
+
sync_standbys, sdata);
}
else
{
SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
-
sync_standbys, SyncRepConfig->num_sync);
+
sync_standbys, sdata,
+
SyncRepConfig->num_sync);
}
list_free(sync_standbys);
@@ -593,7 +605,8 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr
*flushPtr,
*/
static void
SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
- XLogRecPtr *applyPtr, List
*sync_standbys)
+ XLogRecPtr *applyPtr, List
*sync_standbys,
+ SyncRepStandbyData *sdata)
{
ListCell *cell;
@@ -603,23 +616,14 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr *flushPtr,
*/
foreach(cell, sync_standbys)
{
- WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
- XLogRecPtr write;
- XLogRecPtr flush;
- XLogRecPtr apply;
+ SyncRepStandbyData *sby = &(sdata[lfirst_int(cell)]);
- SpinLockAcquire(&walsnd->mutex);
- write = walsnd->write;
- flush = walsnd->flush;
- apply = walsnd->apply;
- SpinLockRelease(&walsnd->mutex);
-
- if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
- *writePtr = write;
- if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
- *flushPtr = flush;
- if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
- *applyPtr = apply;
+ if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > sby->write)
+ *writePtr = sby->write;
+ if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > sby->flush)
+ *flushPtr = sby->flush;
+ if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > sby->apply)
+ *applyPtr = sby->apply;
}
}
@@ -629,7 +633,8 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr
*flushPtr,
*/
static void
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
- XLogRecPtr *applyPtr,
List *sync_standbys, uint8 nth)
+ XLogRecPtr *applyPtr,
List *sync_standbys,
+ SyncRepStandbyData
*sdata, uint8 nth)
{
ListCell *cell;
XLogRecPtr *write_array;
@@ -645,13 +650,11 @@ SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr *flushPtr,
foreach(cell, sync_standbys)
{
- WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+ SyncRepStandbyData *sby = &(sdata[lfirst_int(cell)]);
- SpinLockAcquire(&walsnd->mutex);
- write_array[i] = walsnd->write;
- flush_array[i] = walsnd->flush;
- apply_array[i] = walsnd->apply;
- SpinLockRelease(&walsnd->mutex);
+ write_array[i] = sby->write;
+ flush_array[i] = sby->flush;
+ apply_array[i] = sby->apply;
i++;
}
@@ -688,16 +691,22 @@ cmp_lsn(const void *a, const void *b)
return 1;
}
-/*
- * Return the list of sync standbys, or NIL if no sync standby is connected.
- *
- * The caller must hold SyncRepLock.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
- */
List *
SyncRepGetSyncStandbys(bool *am_sync)
+{
+ return SyncRepGetSyncStandbysInternal(am_sync, NULL);
+}
+
+/*
+ * Return the list of sync standbys, or NIL if no sync standby is connected.
+ *
+ * The caller must hold SyncRepLock.
+ *
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+static List *
+SyncRepGetSyncStandbysInternal(bool *am_sync, SyncRepStandbyData *sdata)
{
Assert(LWLockHeldByMe(SyncRepLock));
@@ -710,8 +719,8 @@ SyncRepGetSyncStandbys(bool *am_sync)
return NIL;
return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
- SyncRepGetSyncStandbysPriority(am_sync) :
- SyncRepGetSyncStandbysQuorum(am_sync);
+ SyncRepGetSyncStandbysPriority(am_sync, sdata) :
+ SyncRepGetSyncStandbysQuorum(am_sync, sdata);
}
/*
@@ -725,7 +734,7 @@ SyncRepGetSyncStandbys(bool *am_sync)
* sync standby. Otherwise it's set to false.
*/
static List *
-SyncRepGetSyncStandbysQuorum(bool *am_sync)
+SyncRepGetSyncStandbysQuorum(bool *am_sync, SyncRepStandbyData *sdata)
{
List *result = NIL;
int i;
@@ -736,7 +745,9 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
for (i = 0; i < max_wal_senders; i++)
{
+ XLogRecPtr write;
XLogRecPtr flush;
+ XLogRecPtr apply;
WalSndState state;
int pid;
@@ -744,7 +755,9 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
SpinLockAcquire(&walsnd->mutex);
pid = walsnd->pid;
+ write = walsnd->write;
flush = walsnd->flush;
+ apply = walsnd->apply;
state = walsnd->state;
SpinLockRelease(&walsnd->mutex);
@@ -772,6 +785,12 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
result = lappend_int(result, i);
if (am_sync != NULL && walsnd == MyWalSnd)
*am_sync = true;
+ if (sdata != NULL)
+ {
+ sdata[i].write = write;
+ sdata[i].flush = flush;
+ sdata[i].apply = apply;
+ }
}
return result;
@@ -791,12 +810,12 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
* sync standby. Otherwise it's set to false.
*/
static List *
-SyncRepGetSyncStandbysPriority(bool *am_sync)
+SyncRepGetSyncStandbysPriority(bool *am_sync, SyncRepStandbyData *sdata)
{
List *result = NIL;
List *pending = NIL;
- int lowest_priority;
- int next_highest_priority;
+ int lowest_priority = -1;
+ int next_highest_priority = -1;
int this_priority;
int priority;
int i;
@@ -806,9 +825,6 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
- lowest_priority = SyncRepConfig->nmembers;
- next_highest_priority = lowest_priority + 1;
-
/*
* Find the sync standbys which have the highest priority (i.e, 1). Also
* store all the other potential sync standbys into the pending list, in
@@ -816,7 +832,9 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
*/
for (i = 0; i < max_wal_senders; i++)
{
+ XLogRecPtr write;
XLogRecPtr flush;
+ XLogRecPtr apply;
WalSndState state;
int pid;
@@ -824,7 +842,9 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
SpinLockAcquire(&walsnd->mutex);
pid = walsnd->pid;
+ write = walsnd->write;
flush = walsnd->flush;
+ apply = walsnd->apply;
state = walsnd->state;
SpinLockRelease(&walsnd->mutex);
@@ -856,6 +876,12 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
result = lappend_int(result, i);
if (am_sync != NULL && walsnd == MyWalSnd)
*am_sync = true;
+ if (sdata != NULL)
+ {
+ sdata[i].write = write;
+ sdata[i].flush = flush;
+ sdata[i].apply = apply;
+ }
if (list_length(result) == SyncRepConfig->num_sync)
{
list_free(pending);
@@ -867,16 +893,27 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
pending = lappend_int(pending, i);
if (am_sync != NULL && walsnd == MyWalSnd)
am_in_pending = true;
+ if (sdata != NULL)
+ {
+ sdata[i].write = write;
+ sdata[i].flush = flush;
+ sdata[i].apply = apply;
+ }
/*
- * Track the highest priority among the standbys in the
pending
- * list, in order to use it as the starting priority
for later
- * scan of the list. This is useful to find quickly the
sync
- * standbys from the pending list later because we can
skip
- * unnecessary scans for the unused priorities.
+ * Track the highest and lowest priorities among the
standbys
+ * in the pending list, in order to use them as the
starting and
+ * ending priorities for later scan of the list. This
is useful to
+ * find quickly the sync standbys from the pending list
later
+ * because we can skip unnecessary scans for the unused
+ * priorities.
*/
- if (this_priority < next_highest_priority)
+ if (next_highest_priority == -1 ||
+ this_priority < next_highest_priority)
next_highest_priority = this_priority;
+ if (lowest_priority == -1 ||
+ lowest_priority < this_priority)
+ lowest_priority = this_priority;
}
}
diff --git a/src/include/replication/syncrep.h
b/src/include/replication/syncrep.h
index c5f0e91aad..fd97eee206 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -36,6 +36,18 @@
#define SYNC_REP_PRIORITY 0
#define SYNC_REP_QUORUM 1
+/*
+ * SyncRepGetSyncStandbys returns an array of these structs,
+ * one per synchronous walsender.
+ */
+typedef struct SyncRepStandbyData
+{
+ /* Copies of relevant fields from WalSnd shared-memory struct */
+ XLogRecPtr write;
+ XLogRecPtr flush;
+ XLogRecPtr apply;
+} SyncRepStandbyData;
+
/*
* Struct for the configuration of synchronous replication.
*