At Thu, 17 May 2018 13:54:07 +0300, Arseny Sher <a.s...@postgrespro.ru> wrote 
in <87in7md034.fsf@ars-thinkpad>
> 
> Konstantin Knizhnik <k.knizh...@postgrespro.ru> writes:
> 
> > I think that using restart_lsn instead of confirmed_flush is not right 
> > approach.
> > If restart_lsn is not available and confirmed_flush is pointing to page
> > boundary, then in any case we should somehow handle this case and adjust
> > startlsn to point on the valid record position (by jjust adding page header
> > size?).
> 
> Well, restart_lsn is always available on live slot: it is initially set
> in ReplicationSlotReserveWal during slot creation.

restart_lsn stays at the beginning of a transaction until the
transaction ends so just using restart_lsn allows repeated
decoding of a transaction, in short, rewinding occurs. The
function works only for inactive slot so the current code works
fine on this point. Addition to that restart_lsn also can be on a
page bounary.


We can see the problem easily.

1. Just create a logical replication slot with setting current LSN.

  select pg_create_logical_replication_slot('s1', 'pgoutput');

2. Advance LSN by two or three pages by doing anyting.

3. Advance the slot to a page bounadry.

  e.g. select pg_replication_slot_advance('s1', '0/9624000');

4. advance the slot further, then crash.

So directly set ctx->reader->EndRecPtr by startlsn fixes the
problem, but I found another problem here.

The function accepts any LSN even if it is not at the begiining
of a record. We will see errors or crashs or infinite waiting or
maybe any kind of trouble by such values. The moved LSN must
always be at the "end of a record" (that is, at the start of the
next recored). The attached patch also fixes this.

The documentation doesn't look requiring a fix.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..d3cb777f9f 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -318,6 +318,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 /*
  * Helper function for advancing physical replication slot forward.
+ *
+ * This function accepts arbitrary LSN even if the LSN is not at the beginning
+ * of a record. This can lead to any kind of misbehavior but currently the
+ * value is used only to determine up to what wal segment to keep and
+ * successive implicit advancing fixes the state.
  */
 static XLogRecPtr
 pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
@@ -344,6 +349,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
 	XLogRecPtr	retlsn = InvalidXLogRecPtr;
+	XLogRecPtr	upto;
 
 	PG_TRY();
 	{
@@ -354,6 +360,13 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 									logical_read_local_xlog_page,
 									NULL, NULL, NULL);
 
+		/*
+		 * startlsn can be on page boundary but it is not accepted as explicit
+		 * parameter to XLogReadRecord. Set it in reader context.
+		 */
+		Assert(startlsn != InvalidXLogRecPtr);
+		upto = ctx->reader->EndRecPtr = startlsn;
+	
 		CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
 												   "logical decoding");
 
@@ -361,22 +374,18 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 		InvalidateSystemCaches();
 
 		/* Decode until we run out of records */
-		while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
-			   (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
+		while (ctx->reader->EndRecPtr <= moveto)
 		{
 			XLogRecord *record;
 			char	   *errm = NULL;
+ 
+			/* ctx->reader->EndRecPtr cannot be go backward here */
+			upto = ctx->reader->EndRecPtr;
 
-			record = XLogReadRecord(ctx->reader, startlsn, &errm);
+			record = XLogReadRecord(ctx->reader, InvalidXLogRecPtr, &errm);
 			if (errm)
 				elog(ERROR, "%s", errm);
 
-			/*
-			 * Now that we've set up the xlog reader state, subsequent calls
-			 * pass InvalidXLogRecPtr to say "continue from last record"
-			 */
-			startlsn = InvalidXLogRecPtr;
-
 			/*
 			 * The {begin_txn,change,commit_txn}_wrapper callbacks above will
 			 * store the description into our tuplestore.
@@ -384,18 +393,14 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 			if (record != NULL)
 				LogicalDecodingProcessRecord(ctx, ctx->reader);
 
-			/* check limits */
-			if (moveto <= ctx->reader->EndRecPtr)
-				break;
-
 			CHECK_FOR_INTERRUPTS();
 		}
 
 		CurrentResourceOwner = old_resowner;
 
-		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+		if (startlsn != upto)
 		{
-			LogicalConfirmReceivedLocation(moveto);
+			LogicalConfirmReceivedLocation(upto);
 
 			/*
 			 * If only the confirmed_flush_lsn has changed the slot won't get

Reply via email to