On Thu, Mar 04, 2021 at 05:10:36PM +0900, Kyotaro Horiguchi wrote:
> read_local_xlog_page is *designed* to maintain ThisTimeLineID.
> Currently it doesn't seem utilized but I think it's sufficiently
> reasonable that the function maintains ThisTimeLineID.

I don't quite follow this line of thoughts.  ThisTimeLineID is
designed to remain 0 while recovery is running in most processes
(at the close exception of a WAL sender with a cascading setup,
physical or logical, of course), so why is there any business for
read_local_xlog_page() to touch this field at all while in recovery to
begin with?

I equally find confusing that XLogReadDetermineTimeline() relies on a
specific value of ThisTimeLineID in its own logic, while it clearly
states that all its callers have to read the current active TLI
beforehand.  So I think that the existing logic is pretty weak, and
that resetting the field is an incorrect approach?  It seems to me
that we had better not change ThisTimeLineID actively while in
recovery in this code path and just let others do the job, like
RecoveryInProgress() once recovery finishes, or
GetStandbyFlushRecPtr() for a WAL sender.  And finally, we should
store the current TLI used for replay in a separate variable that gets
passed down to the XLogReadDetermineTimeline() as argument.

While going through it, I have simplified a bit the proposed TAP tests
(thanks for replacing the sleep() call, Soumyadeep.  This would have
made the test slower for nothing on fast machines, and it would cause
failures on very slow machines).

The attached fixes the original issue for me, keeping all the records
in their correct timeline.  And I have not been able to break
cascading setups.  If it happens that such cases actually break, we
have holes in our existing test coverage that should be improved.  I
cannot see anything fancy missing on this side, though.

Any thoughts?
--
Michael
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 9ac602b674..34d6286d75 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -56,7 +56,9 @@ extern void wal_segment_open(XLogReaderState *state,
 extern void wal_segment_close(XLogReaderState *state);
 
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
-									  XLogRecPtr wantPage, uint32 wantLength);
+									  XLogRecPtr wantPage,
+									  uint32 wantLength,
+									  TimeLineID readtli);
 
 extern void WALReadRaiseError(WALReadError *errinfo);
 
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index d17d660f46..9695653bc2 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -658,6 +658,13 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
  * wantLength to the amount of the page that will be read, up to
  * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
  *
+ * readtli is the current timeline as found by the caller of this routine.
+ * When not in recovery, this should be ThisTimeLineID.  As ThisTimeLineID
+ * remains set to 0 for most processes while in recovery, the caller ought
+ * to provide the timeline number given as a result of GetXLogReplayRecPtr()
+ * instead (for a WAL sender this would actually be ThisTimeLineID as the
+ * field gets updated in a cascading WAL sender).
+ *
  * We switch to an xlog segment from the new timeline eagerly when on a
  * historical timeline, as soon as we reach the start of the xlog segment
  * containing the timeline switch.  The server copied the segment to the new
@@ -679,12 +686,13 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
  *
  * The caller must also make sure it doesn't read past the current replay
  * position (using GetXLogReplayRecPtr) if executing in recovery, so it
- * doesn't fail to notice that the current timeline became historical. The
- * caller must also update ThisTimeLineID with the result of
- * GetXLogReplayRecPtr and must check RecoveryInProgress().
+ * doesn't fail to notice that the current timeline became historical.
  */
 void
-XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
+XLogReadDetermineTimeline(XLogReaderState *state,
+						  XLogRecPtr wantPage,
+						  uint32 wantLength,
+						  TimeLineID readtli)
 {
 	const XLogRecPtr lastReadPage = (state->seg.ws_segno *
 									 state->segcxt.ws_segsize + state->segoff);
@@ -712,12 +720,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	 * just carry on. (Seeking backwards requires a check to make sure the
 	 * older page isn't on a prior timeline).
 	 *
-	 * ThisTimeLineID might've become historical since we last looked, but the
+	 * readtli might've become historical since we last looked, but the
 	 * caller is required not to read past the flush limit it saw at the time
 	 * it looked up the timeline. There's nothing we can do about it if
 	 * StartupXLOG() renames it to .partial concurrently.
 	 */
-	if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
+	if (state->currTLI == readtli && wantPage >= lastReadPage)
 	{
 		Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
 		return;
@@ -729,7 +737,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	 * the current segment we can just keep reading.
 	 */
 	if (state->currTLIValidUntil != InvalidXLogRecPtr &&
-		state->currTLI != ThisTimeLineID &&
+		state->currTLI != readtli &&
 		state->currTLI != 0 &&
 		((wantPage + wantLength) / state->segcxt.ws_segsize) <
 		(state->currTLIValidUntil / state->segcxt.ws_segsize))
@@ -752,7 +760,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 		 * We need to re-read the timeline history in case it's been changed
 		 * by a promotion or replay from a cascaded replica.
 		 */
-		List	   *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+		List	   *timelineHistory = readTimeLineHistory(readtli);
 		XLogRecPtr	endOfSegment;
 
 		endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) *
@@ -842,16 +850,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 		/*
 		 * Determine the limit of xlog we can currently read to, and what the
 		 * most recent timeline is.
-		 *
-		 * RecoveryInProgress() will update ThisTimeLineID when it first
-		 * notices recovery finishes, so we only have to maintain it for the
-		 * local process until recovery ends.
 		 */
 		if (!RecoveryInProgress())
+		{
 			read_upto = GetFlushRecPtr();
+			tli = ThisTimeLineID;
+		}
 		else
-			read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
-		tli = ThisTimeLineID;
+			read_upto = GetXLogReplayRecPtr(&tli);
 
 		/*
 		 * Check which timeline to get the record from.
@@ -877,11 +883,10 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 		 * standby whose primary gets promoted while we're decoding, so a
 		 * one-off ERROR isn't too bad.
 		 */
-		XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+		XLogReadDetermineTimeline(state, targetPagePtr, reqLen, tli);
 
-		if (state->currTLI == ThisTimeLineID)
+		if (state->currTLI == tli)
 		{
-
 			if (loc <= read_upto)
 				break;
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 23baa4498a..bbff969ab5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -815,7 +815,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	WALReadError errinfo;
 	XLogSegNo	segno;
 
-	XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+	XLogReadDetermineTimeline(state, targetPagePtr, reqLen, ThisTimeLineID);
 	sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
 	sendTimeLine = state->currTLI;
 	sendTimeLineValidUpto = state->currTLIValidUntil;
diff --git a/src/test/recovery/t/022_pitr_prepared_xact.pl b/src/test/recovery/t/022_pitr_prepared_xact.pl
new file mode 100644
index 0000000000..3a7907b2a0
--- /dev/null
+++ b/src/test/recovery/t/022_pitr_prepared_xact.pl
@@ -0,0 +1,86 @@
+# Test for point-in-time-recovery (PITR) with prepared transactions
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+use File::Compare;
+
+# Initialize and start primary node with WAL archiving
+my $node_primary = get_new_node('primary');
+$node_primary->init(has_archiving => 1);
+$node_primary->append_conf('postgresql.conf', qq{
+max_wal_senders = 10
+wal_level = 'replica'
+max_prepared_transactions = 10});
+$node_primary->start;
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Initialize node for PITR targeting a very specific restore point, just
+# after a PREPARE TRANSACTION is issued so as we finish with a promoted
+# node where this 2PC transaction needs an explicit COMMIT PREPARED.
+my $node_pitr = get_new_node('node_pitr');
+$node_pitr->init_from_backup(
+	$node_primary, $backup_name,
+	standby       => 0,
+	has_restoring => 1);
+$node_pitr->append_conf('postgresql.conf', qq{
+max_prepared_transactions = 10
+recovery_target_name = 'rp'
+recovery_target_action = 'promote'});
+
+# Workload with a prepared transaction and the target restore point.
+$node_primary->psql(
+	'postgres', qq{
+CREATE TABLE foo(i int);
+BEGIN;
+INSERT INTO foo VALUES(1);
+PREPARE TRANSACTION 'fooinsert';
+SELECT pg_create_restore_point('rp');
+INSERT INTO foo VALUES(2);
+});
+
+# Find next WAL segment to be archived
+my $walfile_to_be_archived = $node_primary->safe_psql('postgres',
+	"SELECT pg_walfile_name(pg_current_wal_lsn());");
+
+# Make WAL segment eligible for archival
+$node_primary->safe_psql('postgres', 'SELECT pg_switch_wal()');
+
+# Wait until the WAL segment has been archived.
+my $archive_wait_query =
+  "SELECT '$walfile_to_be_archived' <= last_archived_wal FROM pg_stat_archiver;";
+$node_primary->poll_query_until('postgres', $archive_wait_query)
+  or die "Timed out while waiting for WAL segment to be archived";
+my $last_archived_wal_file = $walfile_to_be_archived;
+
+# Now start the PITR node.
+$node_pitr->start;
+
+# Wait until the PITR node exits recovery.
+$node_pitr->poll_query_until('postgres', "SELECT pg_is_in_recovery() = 'f';")
+  or die "Timed out while waiting for PITR promotion";
+
+# Ensure that we didn't write to the older timeline during PITR promotion by
+# checking that the last archived WAL segment was not overwritten during
+# recovery
+my $archive_dir           = $node_primary->archive_dir;
+my $archive_wal_file_path = "$archive_dir/$last_archived_wal_file";
+my $node_pitr_data        = $node_pitr->data_dir;
+my $local_wal_file_path   = "$node_pitr_data/pg_wal/$last_archived_wal_file";
+is(compare($archive_wal_file_path, $local_wal_file_path),
+	qq{0}, "check if the last archived WAL file was overwritten");
+
+# Commit the prepared transaction in the latest timeline and check its result.
+# There should only be one row in the table, coming from the prepared
+# transaction).  The row from the INSERT after the restore point should not
+# show up, since our recovery target was older than the second INSERT done.
+$node_pitr->psql(
+	'postgres', qq{
+COMMIT PREPARED 'fooinsert';
+});
+my $result = $node_pitr->safe_psql('postgres', "SELECT * FROM foo;");
+is($result, qq{1}, "check table contents after COMMIT PREPARED");

Attachment: signature.asc
Description: PGP signature

Reply via email to