On 03.09.2012 16:26, Heikki Linnakangas wrote:
On 03.09.2012 16:25, Fujii Masao wrote:
On Tue, Sep 4, 2012 at 7:07 AM, Heikki Linnakangas<hlinn...@iki.fi>
wrote:
Hmm, I was thinking that when walsender gets the position it can send
the
WAL up to, in GetStandbyFlushRecPtr(), it could atomically check the
current
recovery timeline. If it has changed, refuse to send the new WAL and
terminate. That would be a fairly small change, it would just close the
window between requesting walsenders to terminate and them actually
terminating.
Yeah, sounds good. Could you implement the patch? If you don't have time,
I will....
I'll give it a shot..
So, this is what I came up with, please review.
- Heikki
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a7762ea..c00b277 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -407,7 +407,6 @@ typedef struct XLogCtlData
XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
int XLogCacheBlck; /* highest allocated xlog buffer index */
TimeLineID ThisTimeLineID;
- TimeLineID RecoveryTargetTLI;
/*
* archiveCleanupCommand is read from recovery.conf but needs to be in
@@ -456,6 +455,8 @@ typedef struct XLogCtlData
XLogRecPtr recoveryLastRecPtr;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
+ /* current effective recovery target timeline. */
+ TimeLineID RecoveryTargetTLI;
/*
* timestamp of when we started replaying the current chunk of WAL data,
@@ -4470,12 +4471,17 @@ rescanLatestTimeLine(void)
ThisTimeLineID)));
else
{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
/* Switch target */
recoveryTargetTLI = newtarget;
list_free(expectedTLIs);
expectedTLIs = newExpectedTLIs;
- XLogCtl->RecoveryTargetTLI = recoveryTargetTLI;
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->RecoveryTargetTLI = recoveryTargetTLI;
+ SpinLockRelease(&xlogctl->info_lck);
ereport(LOG,
(errmsg("new target timeline is %u",
@@ -7518,8 +7524,15 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
TimeLineID
GetRecoveryTargetTLI(void)
{
- /* RecoveryTargetTLI doesn't change so we need no lock to copy it */
- return XLogCtl->RecoveryTargetTLI;
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ TimeLineID result;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ result = xlogctl->RecoveryTargetTLI;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return result;
}
/*
@@ -8309,7 +8322,7 @@ CreateRestartPoint(int flags)
XLogRecPtr endptr;
/* Get the current (or recent) end of xlog */
- endptr = GetStandbyFlushRecPtr();
+ endptr = GetStandbyFlushRecPtr(NULL);
KeepLogSeg(endptr, &_logSegNo);
_logSegNo--;
@@ -9819,13 +9832,14 @@ do_pg_abort_backup(void)
* Get latest redo apply position.
*
* Optionally, returns the end byte position of the last restored
- * WAL segment. Callers not interested in that value may pass
- * NULL for restoreLastRecPtr.
+ * WAL segment, and the current recovery target timeline. Callers not
+ * interested in those values may pass NULL for restoreLastRecPtr and
+ * targetTLI.
*
* Exported to allow WALReceiver to read the pointer directly.
*/
XLogRecPtr
-GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
+GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr, TimeLineID *targetTli)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
@@ -9835,6 +9849,8 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
recptr = xlogctl->recoveryLastRecPtr;
if (restoreLastRecPtr)
*restoreLastRecPtr = xlogctl->restoreLastRecPtr;
+ if (targetTli)
+ *targetTli = xlogctl->RecoveryTargetTLI;
SpinLockRelease(&xlogctl->info_lck);
return recptr;
@@ -9843,16 +9859,18 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
/*
* Get current standby flush position, ie, the last WAL position
* known to be fsync'd to disk in standby.
+ *
+ * If 'tli' is not NULL, it's set to the current recovery target timeline.
*/
XLogRecPtr
-GetStandbyFlushRecPtr(void)
+GetStandbyFlushRecPtr(TimeLineID *targetTLI)
{
XLogRecPtr receivePtr;
XLogRecPtr replayPtr;
XLogRecPtr restorePtr;
receivePtr = GetWalRcvWriteRecPtr(NULL);
- replayPtr = GetXLogReplayRecPtr(&restorePtr);
+ replayPtr = GetXLogReplayRecPtr(&restorePtr, targetTLI);
if (XLByteLT(receivePtr, replayPtr))
return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr;
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index d345761..b7c864e 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -247,7 +247,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
XLogRecPtr recptr;
char location[MAXFNAMELEN];
- recptr = GetXLogReplayRecPtr(NULL);
+ recptr = GetXLogReplayRecPtr(NULL, NULL);
if (recptr == 0)
PG_RETURN_NULL();
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 8694570..3ce9eb7 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -278,7 +278,8 @@ WalReceiverMain(void)
DisableWalRcvImmediateExit();
/* Initialize LogstreamResult, reply_message and feedback_message */
- LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
+ LogstreamResult.Write = LogstreamResult.Flush =
+ GetXLogReplayRecPtr(NULL, NULL);
MemSet(&reply_message, 0, sizeof(reply_message));
MemSet(&feedback_message, 0, sizeof(feedback_message));
@@ -654,7 +655,7 @@ XLogWalRcvSendReply(void)
/* Construct a new message */
reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush;
- reply_message.apply = GetXLogReplayRecPtr(NULL);
+ reply_message.apply = GetXLogReplayRecPtr(NULL, NULL);
reply_message.sendTime = now;
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 9eba180..68d0d6f 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -258,7 +258,7 @@ GetReplicationApplyDelay(void)
receivePtr = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
- replayPtr = GetXLogReplayRecPtr(NULL);
+ replayPtr = GetXLogReplayRecPtr(NULL, NULL);
if (XLByteEQ(receivePtr, replayPtr))
return 0;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 38f7a3f..ae508da 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -303,7 +303,7 @@ IdentifySystem(void)
GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
- logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
+ logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr();
snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
@@ -1137,7 +1137,31 @@ XLogSend(char *msgbuf, bool *caughtup)
* subsequently crashes and restarts, slaves must not have applied any WAL
* that gets lost on the master.
*/
- SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
+ if (am_cascading_walsender)
+ {
+ TimeLineID currentTargetTLI;
+ SendRqstPtr = GetStandbyFlushRecPtr(¤tTargetTLI);
+
+ /*
+ * If the recovery target timeline changed, bail out. It's a bit
+ * unfortunate that we have to just disconnect, but there is no way
+ * to tell the client that the timeline changed. We also don't know
+ * exactly where the switch happened, so we cannot safely try to send
+ * up to the switchover point before disconnecting.
+ */
+ if (currentTargetTLI != ThisTimeLineID)
+ {
+ if (!walsender_ready_to_stop)
+ ereport(LOG,
+ (errmsg("terminating walsender process to force cascaded standby "
+ "to update timeline and reconnect")));
+ walsender_ready_to_stop = true;
+ *caughtup = true;
+ return;
+ }
+ }
+ else
+ GetFlushRecPtr();
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index ec79870..771fac9 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -285,8 +285,9 @@ extern bool RecoveryInProgress(void);
extern bool HotStandbyActive(void);
extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
-extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr);
-extern XLogRecPtr GetStandbyFlushRecPtr(void);
+extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr,
+ TimeLineID *targetTli);
+extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *targetTLI);
extern XLogRecPtr GetXLogInsertRecPtr(void);
extern XLogRecPtr GetXLogWriteRecPtr(void);
extern bool RecoveryIsPaused(void);
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers