Hello, I'll add the rebased version to the next CF.

At Fri, 20 Jan 2017 11:07:29 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI 
<horiguchi.kyot...@lab.ntt.co.jp> wrote in 
<20170120.110729.107284864.horiguchi.kyot...@lab.ntt.co.jp>
> > > > - Delaying recycling a segment until the last partial record on it
> > > >   completes. This seems doable in page-wise (coarse resolution)
> > > >   but would cost additional reading of past xlog files (page
> > > >   header of past pages is required).
> > > 
> > > Hm, yes. That looks like the least invasive way to go. At least that
> > > looks more correct than the others.
> > 
> > The attached patch does that. Usually it reads page headers only
> > on segment boundaries, but once continuation record found (or
> > failed to read the next page header, that is, the first record on
> > the first page in the next segment has not been replicated), it
> > becomes to happen on every page boundary until non-continuation
> > page comes.
> > 
> > I leave a debug info (at LOG level) in the attached file shown on
> > every state change of keep pointer. At least for pgbench, the
> > cost seems ignorable.
> 
> I revised it. It became neater and less invasive.
> 
>  - Removed added keep from struct WalSnd. It is never referrenced
>    from other processes. It is static variable now.
> 
>  - Restore keepPtr from replication slot on starting.

keepPtr is renamed to a more meaningful name restartLSN.

>  - Moved the main part to more appropriate position.

- Removed the debug print code.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 47f9f867305934cbc5fdbd9629e61be65353fc9c Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 1 Feb 2017 16:07:22 +0900
Subject: [PATCH] Fix a bug of physical replication slot.

A physical-replication standby can stop just at the boundary of WAL
segments. restart_lsn of the slot on the master can be assumed to be
the same location. The last segment on the master will be removed
after some checkpoints for the case. If the first record of the next
segment is a continuation record, it is only on the master and its
beginning is only on the standby so the standby cannot restart because
the record to read is scattered to two sources.

This patch detains restart_lsn in the last sgement when the first page
of the next segment is a continuation record.
---
 src/backend/replication/walsender.c | 107 +++++++++++++++++++++++++++++++++---
 1 file changed, 100 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5909b7d..cfbe70e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -188,6 +188,12 @@ static volatile sig_atomic_t replication_active = false;
 static LogicalDecodingContext *logical_decoding_ctx = NULL;
 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
 
+/*
+ * Segment keep pointer for physical slots. Has a valid value only when it
+ * differs from the current flush pointer.
+ */
+static XLogRecPtr	   keepPtr = InvalidXLogRecPtr;
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndXLogSendHandler(SIGNAL_ARGS);
@@ -220,7 +226,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static bool XLogRead(char *buf, XLogRecPtr startptr, Size count, bool noutfoundok);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -541,6 +547,9 @@ StartReplication(StartReplicationCmd *cmd)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 (errmsg("cannot use a logical replication slot for physical replication"))));
+
+		/* Restore keepPtr from replication slot */
+		keepPtr = MyReplicationSlot->data.restart_lsn;
 	}
 
 	/*
@@ -556,6 +565,10 @@ StartReplication(StartReplicationCmd *cmd)
 	else
 		FlushPtr = GetFlushRecPtr();
 
+	/* Set InvalidXLogRecPtr if catching up */
+	if (keepPtr == FlushPtr)
+		keepPtr = InvalidXLogRecPtr;
+
 	if (cmd->timeline != 0)
 	{
 		XLogRecPtr	switchpoint;
@@ -777,7 +790,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;
 
 	/* now actually read the data, we know it's there */
-	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, false);
 
 	return count;
 }
@@ -1563,7 +1576,7 @@ static void
 ProcessStandbyReplyMessage(void)
 {
 	XLogRecPtr	writePtr,
-				flushPtr,
+				flushPtr, oldFlushPtr,
 				applyPtr;
 	bool		replyRequested;
 
@@ -1592,6 +1605,7 @@ ProcessStandbyReplyMessage(void)
 		WalSnd	   *walsnd = MyWalSnd;
 
 		SpinLockAcquire(&walsnd->mutex);
+		oldFlushPtr = walsnd->flush;
 		walsnd->write = writePtr;
 		walsnd->flush = flushPtr;
 		walsnd->apply = applyPtr;
@@ -1609,7 +1623,78 @@ ProcessStandbyReplyMessage(void)
 		if (SlotIsLogical(MyReplicationSlot))
 			LogicalConfirmReceivedLocation(flushPtr);
 		else
-			PhysicalConfirmReceivedLocation(flushPtr);
+		{
+			/*
+			 * On recovery, a continuation reocrd must be available from
+			 * single WAL source. So physical replication slot should stay in
+			 * the first segment for a continuation record spanning multiple
+			 * segments. Since this doesn't look into individual record,
+			 * keepPtr may stay a bit too behind.
+			 *
+			 * Since the objective is avoding to remove required segments,
+			 * checking every segment is enough. But once keepPtr goes behind,
+			 * check every page for quick restoration.
+			 *
+			 * keepPtr has a valid value only when it is behind flushPtr.
+			 */
+			if (oldFlushPtr != InvalidXLogRecPtr &&
+				(keepPtr == InvalidXLogRecPtr ?
+				 oldFlushPtr / XLOG_SEG_SIZE != flushPtr / XLOG_SEG_SIZE :
+				 keepPtr / XLOG_BLCKSZ != flushPtr / XLOG_BLCKSZ))
+			{
+				XLogRecPtr rp;
+				XLogRecPtr oldKeepPtr = keepPtr; /* for debug */
+
+				if (keepPtr == InvalidXLogRecPtr)
+					keepPtr = oldFlushPtr;
+
+				rp = keepPtr - (keepPtr % XLOG_BLCKSZ);
+
+				/*
+				 * We may have let the record at flushPtr sent, so it's worth
+				 * looking
+				 */
+				while (rp <= flushPtr)
+				{
+					XLogPageHeaderData header;
+
+					/*
+					 * If the page header is not available for now, don't move
+					 * keepPtr forward. We can read it by the next chance.
+					 */
+					if(sentPtr - rp >= sizeof(XLogPageHeaderData))
+					{
+						bool found;
+						/*
+						 * Fetch the page header of the next page. Move
+						 * keepPtr forward only if when it is not a
+						 * continuation page.
+						 */
+						found = XLogRead((char *)&header, rp,
+											 sizeof(XLogPageHeaderData), true);
+						if (found &&
+							(header.xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+							keepPtr = rp;
+					}
+					rp += XLOG_BLCKSZ;
+				}
+
+				/*
+				 * If keepPtr is on the same page with flushPtr, it means that
+				 * we are catching up
+				 */
+				if (keepPtr / XLOG_BLCKSZ == flushPtr / XLOG_BLCKSZ)
+					keepPtr = InvalidXLogRecPtr;
+
+				if (oldKeepPtr != keepPtr)
+					elog(LOG, "%lX => %lX / %lX",
+						 oldKeepPtr, keepPtr, flushPtr);
+			}
+
+			/* keepPtr == InvalidXLogRecPtr means catching up */
+			PhysicalConfirmReceivedLocation(keepPtr != InvalidXLogRecPtr ?
+											keepPtr : flushPtr);
+		}
 	}
 }
 
@@ -2031,6 +2116,7 @@ WalSndKill(int code, Datum arg)
 
 /*
  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
+ * Returns false if the segment file is not found when notfoundok is true.
  *
  * XXX probably this should be improved to suck data directly from the
  * WAL buffers when possible.
@@ -2040,8 +2126,8 @@ WalSndKill(int code, Datum arg)
  * always be one descriptor left open until the process ends, but never
  * more than one.
  */
-static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+static bool
+XLogRead(char *buf, XLogRecPtr startptr, Size count, bool notfoundok)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -2118,10 +2204,15 @@ retry:
 				 * removed or recycled.
 				 */
 				if (errno == ENOENT)
+				{
+					if (notfoundok)
+						return false;
+
 					ereport(ERROR,
 							(errcode_for_file_access(),
 							 errmsg("requested WAL segment %s has already been removed",
 								XLogFileNameP(curFileTimeLine, sendSegNo))));
+				}
 				else
 					ereport(ERROR,
 							(errcode_for_file_access(),
@@ -2201,6 +2292,8 @@ retry:
 			goto retry;
 		}
 	}
+
+	return true;
 }
 
 /*
@@ -2405,7 +2498,7 @@ XLogSendPhysical(void)
 	 * calls.
 	 */
 	enlargeStringInfo(&output_message, nbytes);
-	XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+	XLogRead(&output_message.data[output_message.len], startptr, nbytes, false);
 	output_message.len += nbytes;
 	output_message.data[output_message.len] = '\0';
 
-- 
2.9.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to