At Thu, 2 Feb 2017 15:34:33 +0900, Michael Paquier <michael.paqu...@gmail.com> 
wrote in <cab7npqq05g15joormeongpkw0osot77yafauf9_6q8g+v+2...@mail.gmail.com>
> On Thu, Feb 2, 2017 at 1:26 AM, Fujii Masao <masao.fu...@gmail.com> wrote:
> > I'm afraid that many WAL segments would start with a continuation record
> > when there are the workload of short transactions (e.g., by pgbench), and
> > which would make restart_lsn go behind very much. No?
> 
> I don't quite understand this argument. Even if there are many small
> transactions, that would cause restart_lsn to just be late by one
> segment, all the time.
> 
> > The discussion on this thread just makes me think that restart_lsn should
> > indicate the replay location instead of flush location. This seems safer.
> 
> That would penalize WAL retention on the primary with standbys using
> recovery_min_apply_delay and a slot for example...
> 
> We can attempt to address this problem two ways. The patch proposed
> (ugly btw and there are two typos!) is doing it in the WAL sender by
> not making restart_lsn jump to the next segment if a continuation
> record is found.

Sorry for the ug..:p Anyway, the previous version was not the
latest. The attached one is the revised version. (Sorry, I
haven't find a typo by myself..)

>  Or we could have the standby request for the next
> segment instead if the record it wants to replay from is at a boundary
> and that it locally has the beginning of the record, and it has it
> because it already confirmed to the primary that it flushed to the
> next segment. Not sure which fix is better though.

We could it as I said, with some refactoring ReadRecord involving
reader plugin mechanism..

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From d835bf248e6869f7b843d339c9213a082e332297 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 1 Feb 2017 16:07:22 +0900
Subject: [PATCH] Fix a bug of physical replication slot.

A physical-replication standby can stop just at the boundary of WAL
segments. restart_lsn of the slot on the master can be assumed to be
the same location. The last segment on the master will be removed
after some checkpoints for the case. If the first record of the next
segment is a continuation record, it is only on the master and its
beginning is only on the standby so the standby cannot restart because
the record to read is scattered to two sources.

This patch detains restart_lsn in the last sgement when the first page
of the next segment is a continuation record.
---
 src/backend/replication/walsender.c | 104 +++++++++++++++++++++++++++++++++---
 1 file changed, 97 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 76f09fb..0ec7ba9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -188,6 +188,13 @@ static volatile sig_atomic_t replication_active = false;
 static LogicalDecodingContext *logical_decoding_ctx = NULL;
 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
 
+/*
+ * This variable corresponds to restart_lsn in pg_replication_slots for a
+ * physical slot. This has a valid value only when it differs from the current
+ * flush pointer.
+ */
+static XLogRecPtr	   restartLSN = InvalidXLogRecPtr;
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndXLogSendHandler(SIGNAL_ARGS);
@@ -220,7 +227,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static bool XLogRead(char *buf, XLogRecPtr startptr, Size count, bool noutfoundok);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -504,6 +511,9 @@ StartReplication(StartReplicationCmd *cmd)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 (errmsg("cannot use a logical replication slot for physical replication"))));
+
+		/* Restore restartLSN from replication slot */
+		restartLSN = MyReplicationSlot->data.restart_lsn;
 	}
 
 	/*
@@ -519,6 +529,10 @@ StartReplication(StartReplicationCmd *cmd)
 	else
 		FlushPtr = GetFlushRecPtr();
 
+	/* Set InvalidXLogRecPtr if catching up */
+	if (restartLSN == FlushPtr)
+		restartLSN = InvalidXLogRecPtr;
+
 	if (cmd->timeline != 0)
 	{
 		XLogRecPtr	switchpoint;
@@ -727,7 +741,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;
 
 	/* now actually read the data, we know it's there */
-	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, false);
 
 	return count;
 }
@@ -1486,7 +1500,7 @@ static void
 ProcessStandbyReplyMessage(void)
 {
 	XLogRecPtr	writePtr,
-				flushPtr,
+				flushPtr, oldFlushPtr,
 				applyPtr;
 	bool		replyRequested;
 
@@ -1515,6 +1529,7 @@ ProcessStandbyReplyMessage(void)
 		WalSnd	   *walsnd = MyWalSnd;
 
 		SpinLockAcquire(&walsnd->mutex);
+		oldFlushPtr = walsnd->flush;
 		walsnd->write = writePtr;
 		walsnd->flush = flushPtr;
 		walsnd->apply = applyPtr;
@@ -1532,7 +1547,74 @@ ProcessStandbyReplyMessage(void)
 		if (SlotIsLogical(MyReplicationSlot))
 			LogicalConfirmReceivedLocation(flushPtr);
 		else
-			PhysicalConfirmReceivedLocation(flushPtr);
+		{
+			/*
+			 * Recovery on standby requires that a continuation reocrd must be
+			 * available from single WAL source. For the reason, physical
+			 * replication slot should stay in the first segment for a
+			 * continuation record spanning multiple segments. Since this
+			 * doesn't look into individual record, restartLSN may stay a bit
+			 * too behind.
+			 *
+			 * Since the objective is avoding to remove required segments,
+			 * checking at the beginning of every segment is enough. But once
+			 * restartLSN goes behind, check every page for quick restoration.
+			 *
+			 * restartLSN has a valid value only when it is behind flushPtr.
+			 */
+			if (oldFlushPtr != InvalidXLogRecPtr &&
+				(restartLSN == InvalidXLogRecPtr ?
+				 oldFlushPtr / XLOG_SEG_SIZE != flushPtr / XLOG_SEG_SIZE :
+				 restartLSN / XLOG_BLCKSZ != flushPtr / XLOG_BLCKSZ))
+			{
+				XLogRecPtr rp;
+
+				if (restartLSN == InvalidXLogRecPtr)
+					restartLSN = oldFlushPtr;
+
+				rp = restartLSN - (restartLSN % XLOG_BLCKSZ);
+
+				/*
+				 * We may have let the record at flushPtr sent, so it's worth
+				 * looking
+				 */
+				while (rp <= flushPtr)
+				{
+					XLogPageHeaderData header;
+
+					/*
+					 * If the page header is not available for now, don't move
+					 * restartLSN forward. We can read it by the next chance.
+					 */
+					if(sentPtr - rp >= sizeof(XLogPageHeaderData))
+					{
+						bool found;
+						/*
+						 * Fetch the page header of the next page. Move
+						 * restartLSN forward only if it is not a continuation
+						 * page.
+						 */
+						found = XLogRead((char *)&header, rp,
+											 sizeof(XLogPageHeaderData), true);
+						if (found &&
+							(header.xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+							restartLSN = rp;
+					}
+					rp += XLOG_BLCKSZ;
+				}
+
+				/*
+				 * If restartLSN is on the same page with flushPtr, it means
+				 * that we are catching up.
+				 */
+				if (restartLSN / XLOG_BLCKSZ == flushPtr / XLOG_BLCKSZ)
+					restartLSN = InvalidXLogRecPtr;
+			}
+
+			/* restartLSN == InvalidXLogRecPtr means catching up */
+			PhysicalConfirmReceivedLocation(restartLSN != InvalidXLogRecPtr ?
+											restartLSN : flushPtr);
+		}
 	}
 }
 
@@ -1954,6 +2036,7 @@ WalSndKill(int code, Datum arg)
 
 /*
  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
+ * Returns false if the segment file is not found when notfoundok is true.
  *
  * XXX probably this should be improved to suck data directly from the
  * WAL buffers when possible.
@@ -1963,8 +2046,8 @@ WalSndKill(int code, Datum arg)
  * always be one descriptor left open until the process ends, but never
  * more than one.
  */
-static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+static bool
+XLogRead(char *buf, XLogRecPtr startptr, Size count, bool notfoundok)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -2041,10 +2124,15 @@ retry:
 				 * removed or recycled.
 				 */
 				if (errno == ENOENT)
+				{
+					if (notfoundok)
+						return false;
+
 					ereport(ERROR,
 							(errcode_for_file_access(),
 							 errmsg("requested WAL segment %s has already been removed",
 								XLogFileNameP(curFileTimeLine, sendSegNo))));
+				}
 				else
 					ereport(ERROR,
 							(errcode_for_file_access(),
@@ -2124,6 +2212,8 @@ retry:
 			goto retry;
 		}
 	}
+
+	return true;
 }
 
 /*
@@ -2328,7 +2418,7 @@ XLogSendPhysical(void)
 	 * calls.
 	 */
 	enlargeStringInfo(&output_message, nbytes);
-	XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+	XLogRead(&output_message.data[output_message.len], startptr, nbytes, false);
 	output_message.len += nbytes;
 	output_message.data[output_message.len] = '\0';
 
-- 
2.9.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to