On 2017-05-04 17:00:04 -0700, Andres Freund wrote:
> Attached is a prototype patch for that.

Oops.

Andres
>From b6eb46e376e40f3e2e9a55d16b1b37b27904564b Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 4 May 2017 16:40:52 -0700
Subject: [PATCH 1/2] WIP: Fix off-by-one around GetLastImportantRecPtr.

---
 src/backend/postmaster/bgwriter.c | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index dcb4cf249c..d409d977c0 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -325,10 +325,11 @@ BackgroundWriterMain(void)
 
 			/*
 			 * Only log if enough time has passed and interesting records have
-			 * been inserted since the last snapshot.
+			 * been inserted since the last snapshot (it's <= because
+			 * last_snapshot_lsn points at the end+1 of the record).
 			 */
 			if (now >= timeout &&
-				last_snapshot_lsn < GetLastImportantRecPtr())
+				last_snapshot_lsn <= GetLastImportantRecPtr())
 			{
 				last_snapshot_lsn = LogStandbySnapshot();
 				last_snapshot_ts = now;
-- 
2.12.0.264.gd6db3f2165.dirty

>From 7ed2aeb832029f5602566a665b3f4dbe8baedfcd Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 4 May 2017 16:48:00 -0700
Subject: [PATCH 2/2] WIP: Possibly more robust snapbuild approach.

---
 contrib/test_decoding/expected/ondisk_startup.out |  15 +-
 contrib/test_decoding/specs/ondisk_startup.spec   |   8 +-
 src/backend/replication/logical/decode.c          |   3 -
 src/backend/replication/logical/snapbuild.c       | 386 +++++++++++-----------
 src/include/replication/snapbuild.h               |  25 +-
 5 files changed, 215 insertions(+), 222 deletions(-)

diff --git a/contrib/test_decoding/expected/ondisk_startup.out b/contrib/test_decoding/expected/ondisk_startup.out
index 65115c830a..c7b1f45b46 100644
--- a/contrib/test_decoding/expected/ondisk_startup.out
+++ b/contrib/test_decoding/expected/ondisk_startup.out
@@ -1,21 +1,30 @@
 Parsed test spec with 3 sessions
 
-starting permutation: s2txid s1init s3txid s2alter s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start
-step s2txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
+starting permutation: s2b s2txid s1init s3b s3txid s2alter s2c s2b s2txid s3c s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start
+step s2b: BEGIN;
+step s2txid: SELECT txid_current() IS NULL;
 ?column?       
 
 f              
 step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...>
-step s3txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
+step s3b: BEGIN;
+step s3txid: SELECT txid_current() IS NULL;
 ?column?       
 
 f              
 step s2alter: ALTER TABLE do_write ADD COLUMN addedbys2 int;
 step s2c: COMMIT;
+step s2b: BEGIN;
+step s2txid: SELECT txid_current() IS NULL;
+?column?       
+
+f              
+step s3c: COMMIT;
 step s1init: <... completed>
 ?column?       
 
 init           
+step s2c: COMMIT;
 step s1insert: INSERT INTO do_write DEFAULT VALUES;
 step s1checkpoint: CHECKPOINT;
 step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');
diff --git a/contrib/test_decoding/specs/ondisk_startup.spec b/contrib/test_decoding/specs/ondisk_startup.spec
index 8223705639..12c57a813d 100644
--- a/contrib/test_decoding/specs/ondisk_startup.spec
+++ b/contrib/test_decoding/specs/ondisk_startup.spec
@@ -24,7 +24,8 @@ step "s1alter" { ALTER TABLE do_write ADD COLUMN addedbys1 int; }
 session "s2"
 setup { SET synchronous_commit=on; }
 
-step "s2txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; }
+step "s2b" { BEGIN; }
+step "s2txid" { SELECT txid_current() IS NULL; }
 step "s2alter" { ALTER TABLE do_write ADD COLUMN addedbys2 int; }
 step "s2c" { COMMIT; }
 
@@ -32,7 +33,8 @@ step "s2c" { COMMIT; }
 session "s3"
 setup { SET synchronous_commit=on; }
 
-step "s3txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; }
+step "s3b" { BEGIN; }
+step "s3txid" { SELECT txid_current() IS NULL; }
 step "s3c" { COMMIT; }
 
 # Force usage of ondisk snapshot by starting and not finishing a
@@ -40,4 +42,4 @@ step "s3c" { COMMIT; }
 # reached. In combination with a checkpoint forcing a snapshot to be
 # written and a new restart point computed that'll lead to the usage
 # of the snapshot.
-permutation "s2txid" "s1init" "s3txid" "s2alter" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start"
+permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2alter" "s2c" "s2b" "s2txid" "s3c" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start"
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5c13d26099..68825ef598 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -622,9 +622,6 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 {
 	int			i;
 
-	SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
-					  parsed->nsubxacts, parsed->subxacts);
-
 	for (i = 0; i < parsed->nsubxacts; i++)
 	{
 		ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 068d214fa1..1176d2059b 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -56,23 +56,34 @@
  *
  *
  * The snapbuild machinery is starting up in several stages, as illustrated
- * by the following graph:
+ * by the following graph describing the SnapBuild->state transitions:
+ *
  *		   +-------------------------+
- *	  +----|SNAPBUILD_START			 |-------------+
+ *	  +----|         START			 |-------------+
  *	  |    +-------------------------+			   |
  *	  |					|						   |
  *	  |					|						   |
- *	  |		running_xacts with running xacts	   |
+ *	  |		   running_xacts #1					   |
  *	  |					|						   |
  *	  |					|						   |
  *	  |					v						   |
  *	  |    +-------------------------+			   v
- *	  |    |SNAPBUILD_FULL_SNAPSHOT  |------------>|
+ *	  |    |   BUILDING_SNAPSHOT     |------------>|
  *	  |    +-------------------------+			   |
+ *	  |					|						   |
+ *	  |					|						   |
+ *	  |	running_xacts #2, xacts from #1 finished   |
+ *	  |					|						   |
+ *	  |					|						   |
+ *	  |					v						   |
+ *	  |    +-------------------------+			   v
+ *	  |    |       FULL_SNAPSHOT     |------------>|
+ *	  |    +-------------------------+			   |
+ *	  |					|						   |
  * running_xacts		|					   saved snapshot
  * with zero xacts		|				  at running_xacts's lsn
  *	  |					|						   |
- *	  |		all running toplevel TXNs finished	   |
+ *	  |	running_xacts with xacts from #2 finished  |
  *	  |					|						   |
  *	  |					v						   |
  *	  |    +-------------------------+			   |
@@ -83,7 +94,7 @@
  * record is read that is sufficiently new (above the safe xmin horizon),
  * there's a state transition. If there were no running xacts when the
  * running_xacts record was generated, we'll directly go into CONSISTENT
- * state, otherwise we'll switch to the FULL_SNAPSHOT state. Having a full
+ * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
  * snapshot means that all transactions that start henceforth can be decoded
  * in their entirety, but transactions that started previously can't. In
  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
@@ -184,6 +195,14 @@ struct SnapBuild
 	ReorderBuffer *reorder;
 
 	/*
+	 * When can the next state be reached?
+	 *
+	 * FIXME: More accurate name, possibly split into two?
+	 * FIXME: need to be moved into ->running.xmin or such for ABI compat.
+	 */
+	TransactionId started_collection_at;
+
+	/*
 	 * Information about initially running transactions
 	 *
 	 * When we start building a snapshot there already may be transactions in
@@ -203,7 +222,7 @@ struct SnapBuild
 		size_t		xcnt;		/* number of used xip entries */
 		size_t		xcnt_space; /* allocated size of xip */
 		TransactionId *xip;		/* running xacts array, xidComparator-sorted */
-	}			running;
+	}			running_old;
 
 	/*
 	 * Array of transactions which could have catalog changes that committed
@@ -249,12 +268,6 @@ struct SnapBuild
 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
 static bool ExportInProgress = false;
 
-/* transaction state manipulation functions */
-static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
-
-/* ->running manipulation */
-static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid);
-
 /* ->committed manipulation */
 static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
 
@@ -269,6 +282,7 @@ static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr
 
 /* xlog reading helper functions for SnapBuildProcessRecord */
 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
+static void SnapBuildWaitSnapshot(xl_running_xacts *running);
 
 /* serialization functions */
 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
@@ -700,7 +714,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 	 * we got into the SNAPBUILD_FULL_SNAPSHOT state.
 	 */
 	if (builder->state < SNAPBUILD_CONSISTENT &&
-		SnapBuildTxnIsRunning(builder, xid))
+		TransactionIdPrecedes(xid, builder->started_collection_at))
 		return false;
 
 	/*
@@ -769,38 +783,6 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 }
 
 /*
- * Check whether `xid` is currently 'running'.
- *
- * Running transactions in our parlance are transactions which we didn't
- * observe from the start so we can't properly decode their contents. They
- * only exist after we freshly started from an < CONSISTENT snapshot.
- */
-static bool
-SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
-{
-	Assert(builder->state < SNAPBUILD_CONSISTENT);
-	Assert(TransactionIdIsNormal(builder->running.xmin));
-	Assert(TransactionIdIsNormal(builder->running.xmax));
-
-	if (builder->running.xcnt &&
-		NormalTransactionIdFollows(xid, builder->running.xmin) &&
-		NormalTransactionIdPrecedes(xid, builder->running.xmax))
-	{
-		TransactionId *search =
-		bsearch(&xid, builder->running.xip, builder->running.xcnt_space,
-				sizeof(TransactionId), xidComparator);
-
-		if (search != NULL)
-		{
-			Assert(*search == xid);
-			return true;
-		}
-	}
-
-	return false;
-}
-
-/*
  * Add a new Snapshot to all transactions we're decoding that currently are
  * in-progress so they can see new catalog contents made by the transaction
  * that just committed. This is necessary because those in-progress
@@ -922,63 +904,6 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
 }
 
 /*
- * Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with
- * keeping track of the amount of running transactions.
- */
-static void
-SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
-{
-	if (builder->state == SNAPBUILD_CONSISTENT)
-		return;
-
-	/*
-	 * NB: This handles subtransactions correctly even if we started from
-	 * suboverflowed xl_running_xacts because we only keep track of toplevel
-	 * transactions. Since the latter are always allocated before their
-	 * subxids and since they end at the same time it's sufficient to deal
-	 * with them here.
-	 */
-	if (SnapBuildTxnIsRunning(builder, xid))
-	{
-		Assert(builder->running.xcnt > 0);
-
-		if (!--builder->running.xcnt)
-		{
-			/*
-			 * None of the originally running transaction is running anymore,
-			 * so our incrementally built snapshot now is consistent.
-			 */
-			ereport(LOG,
-				  (errmsg("logical decoding found consistent point at %X/%X",
-						  (uint32) (lsn >> 32), (uint32) lsn),
-				   errdetail("Transaction ID %u finished; no more running transactions.",
-							 xid)));
-			builder->state = SNAPBUILD_CONSISTENT;
-		}
-	}
-}
-
-/*
- * Abort a transaction, throw away all state we kept.
- */
-void
-SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
-				  TransactionId xid,
-				  int nsubxacts, TransactionId *subxacts)
-{
-	int			i;
-
-	for (i = 0; i < nsubxacts; i++)
-	{
-		TransactionId subxid = subxacts[i];
-
-		SnapBuildEndTxn(builder, lsn, subxid);
-	}
-
-	SnapBuildEndTxn(builder, lsn, xid);
-}
-
-/*
  * Handle everything that needs to be done when a transaction commits
  */
 void
@@ -1022,11 +947,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		TransactionId subxid = subxacts[nxact];
 
 		/*
-		 * make sure txn is not tracked in running txn's anymore, switch state
-		 */
-		SnapBuildEndTxn(builder, lsn, subxid);
-
-		/*
 		 * If we're forcing timetravel we also need visibility information
 		 * about subtransaction, so keep track of subtransaction's state.
 		 */
@@ -1055,12 +975,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		}
 	}
 
-	/*
-	 * Make sure toplevel txn is not tracked in running txn's anymore, switch
-	 * state to consistent if possible.
-	 */
-	SnapBuildEndTxn(builder, lsn, xid);
-
 	if (forced_timetravel)
 	{
 		elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
@@ -1250,9 +1164,45 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 *
 	 * a) There were no running transactions when the xl_running_xacts record
 	 *	  was inserted, jump to CONSISTENT immediately. We might find such a
-	 *	  state we were waiting for b) or c).
+	 *	  state while waiting for c) or d), e).
 	 *
-	 * b) Wait for all toplevel transactions that were running to end. We
+	 * b) This (in a previous run) or another decoding slot serialized a
+	 *	  snapshot to disk that we can use.  Can't use this method for the
+	 *	  initial snapshot when slot is being created and needs full snapshot
+	 *	  for export or direct use, as that snapshot will only contain catalog
+	 *	  modifying transactions.
+	 *
+	 * c) First incrementally build a snapshot for catalog tuples
+	 *    (BUILDING_SNAPSHOT), that requires all, already in-progress,
+	 *    transactions to finish.  Every transaction starting after that
+	 *    (FULL_SNAPSHOT state), has enough information to be decoded.  But
+	 *    for older running transactions no viable snapshot exists yet, so
+	 *    CONSISTENT will only be reached once all of those have finished.
+	 *
+	 * c) In BUILDING_SNAPSHOT state (see d) ), and this xl_running_xacts'
+	 *    oldestRunningXid is >= than nextXid from when we switched to
+	 *    BUILDING_SNAPSHOT.  Switch to FULL_SNAPSHOT.
+	 *
+	 * d) In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
+	 *    oldestRunningXid is >= than nextXid from when we switched to
+	 *    FULL_SNAPSHOT.   Switch to CONSISTENT.
+	 *
+	 * e) In START state, and a xl_running_xacts record with running xacts is
+	 *    encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
+	 *    record xl_running_xacts->nextXid.  Once all running xacts have
+	 *    finished (i.e. they're all >= nextXid), we have a complete snapshot.
+	 *    It might look that we could use xl_running_xact's ->xids information
+	 *    to get there quicker, but that is problematic because transactions
+	 *    marked as running, might already have inserted their commit record -
+	 *    it's infeasible to change that with locking.
+
+	 *
+	 * d) In BUILDING_SNAPSHOT state (see c) ), and this xl_running_xacts'
+	 *    oldestRunningXid is newer than the
+	 *
+
+
+ Wait for all toplevel transactions that were running to end. We
 	 *	  simply track the number of in-progress toplevel transactions and
 	 *	  lower it whenever one commits or aborts. When that number
 	 *	  (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
@@ -1264,11 +1214,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 *	  subtransactions - and by extension suboverflowed xl_running_xacts -
 	 *	  at all.
 	 *
-	 * c) This (in a previous run) or another decoding slot serialized a
-	 *	  snapshot to disk that we can use.  Can't use this method for the
-	 *	  initial snapshot when slot is being created and needs full snapshot
-	 *	  for export or direct use, as that snapshot will only contain catalog
-	 *	  modifying transactions.
+	 *    Unfortunately there's a race condition around LogStandbySnapshot(),
+	 *    where transactions might have logged their commit record, before
+	 *    xl_running_xacts itself is logged. In that case the decoding logic
+	 *    would have missed that fact.  Thus
+	 *
+	 * d) xl_running_xacts shows us that transaction(s) assumed to be still
+	 *    running have actually already finished.  Adjust their status.
 	 * ---
 	 */
 
@@ -1291,10 +1243,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	/*
 	 * a) No transaction were running, we can jump to consistent.
 	 *
+	 * This is not affected by races, because we can miss transaction commits,
+	 * but we can't miss transactions starting (XXX: Not true if we relax locking!).
+	 *
 	 * NB: We might have already started to incrementally assemble a snapshot,
 	 * so we need to be careful to deal with that.
 	 */
-	if (running->xcnt == 0)
+	if (running->oldestRunningXid == running->nextXid)
 	{
 		if (builder->start_decoding_at == InvalidXLogRecPtr ||
 			builder->start_decoding_at <= lsn)
@@ -1310,9 +1265,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 		Assert(TransactionIdIsNormal(builder->xmax));
 
 		/* no transactions running now */
-		builder->running.xcnt = 0;
-		builder->running.xmin = InvalidTransactionId;
-		builder->running.xmax = InvalidTransactionId;
+		builder->running_old.xcnt = 0;
+		builder->running_old.xmin = InvalidTransactionId;
+		builder->running_old.xmax = InvalidTransactionId;
 
 		builder->state = SNAPBUILD_CONSISTENT;
 
@@ -1323,30 +1278,29 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 
 		return false;
 	}
-	/* c) valid on disk state and not building full snapshot */
+	/* b) valid on disk state and not building full snapshot */
 	else if (!builder->building_full_snapshot &&
 			 SnapBuildRestore(builder, lsn))
 	{
 		/* there won't be any state to cleanup */
 		return false;
 	}
-
 	/*
-	 * b) first encounter of a useable xl_running_xacts record. If we had
-	 * found one earlier we would either track running transactions (i.e.
-	 * builder->running.xcnt != 0) or be consistent (this function wouldn't
-	 * get called).
+	 * c) transition from START to BUILDING_SNAPSHOT.
+	 *
+	 * In START state, and a xl_running_xacts record with running xacts is
+	 * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
+	 * record xl_running_xacts->nextXid.  Once all running xacts have finished
+	 * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
+	 * might look that we could use xl_running_xact's ->xids information to
+	 * get there quicker, but that is problematic because transactions marked
+	 * as running, might already have inserted their commit record - it's
+	 * infeasible to change that with locking.
 	 */
-	else if (!builder->running.xcnt)
+	else if (builder->state == SNAPBUILD_START)
 	{
-		int			off;
-
-		/*
-		 * We only care about toplevel xids as those are the ones we
-		 * definitely see in the wal stream. As snapbuild.c tracks committed
-		 * instead of running transactions we don't need to know anything
-		 * about uncommitted subtransactions.
-		 */
+		builder->started_collection_at = running->nextXid;
+		builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
 
 		/*
 		 * Start with an xmin/xmax that's correct for future, when all the
@@ -1360,59 +1314,59 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 		Assert(TransactionIdIsNormal(builder->xmin));
 		Assert(TransactionIdIsNormal(builder->xmax));
 
-		builder->running.xcnt = running->xcnt;
-		builder->running.xcnt_space = running->xcnt;
-		builder->running.xip =
-			MemoryContextAlloc(builder->context,
-							   builder->running.xcnt * sizeof(TransactionId));
-		memcpy(builder->running.xip, running->xids,
-			   builder->running.xcnt * sizeof(TransactionId));
-
-		/* sort so we can do a binary search */
-		qsort(builder->running.xip, builder->running.xcnt,
-			  sizeof(TransactionId), xidComparator);
-
-		builder->running.xmin = builder->running.xip[0];
-		builder->running.xmax = builder->running.xip[running->xcnt - 1];
-
-		/* makes comparisons cheaper later */
-		TransactionIdRetreat(builder->running.xmin);
-		TransactionIdAdvance(builder->running.xmax);
-
-		builder->state = SNAPBUILD_FULL_SNAPSHOT;
-
 		ereport(LOG,
 			(errmsg("logical decoding found initial starting point at %X/%X",
 					(uint32) (lsn >> 32), (uint32) lsn),
-			 errdetail_plural("%u transaction needs to finish.",
-							  "%u transactions need to finish.",
-							  builder->running.xcnt,
-							  (uint32) builder->running.xcnt)));
+			 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+					   running->xcnt, running->nextXid)));
 
-		/*
-		 * Iterate through all xids, wait for them to finish.
-		 *
-		 * This isn't required for the correctness of decoding, but to allow
-		 * isolationtester to notice that we're currently waiting for
-		 * something.
-		 */
-		for (off = 0; off < builder->running.xcnt; off++)
-		{
-			TransactionId xid = builder->running.xip[off];
+		SnapBuildWaitSnapshot(running);
+	}
+	/*
+	 * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
+	 *
+	 * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
+	 * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
+	 * means all transactions starting afterwards have enough information to
+	 * be decoded.  Switch to FULL_SNAPSHOT.
+	 */
+	else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
+			 TransactionIdPrecedesOrEquals(builder->started_collection_at,
+										   running->oldestRunningXid))
+	{
+		builder->state = SNAPBUILD_FULL_SNAPSHOT;
+		builder->started_collection_at = running->nextXid;
 
-			/*
-			 * Upper layers should prevent that we ever need to wait on
-			 * ourselves. Check anyway, since failing to do so would either
-			 * result in an endless wait or an Assert() failure.
-			 */
-			if (TransactionIdIsCurrentTransactionId(xid))
-				elog(ERROR, "waiting for ourselves");
+		SnapBuildWaitSnapshot(running);
 
-			XactLockTableWait(xid, NULL, NULL, XLTW_None);
-		}
-
-		/* nothing could have built up so far, so don't perform cleanup */
-		return false;
+		ereport(LOG,
+				(errmsg("logical decoding found initial consistent point at %X/%X",
+						(uint32) (lsn >> 32), (uint32) lsn),
+				 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+						   running->xcnt, running->nextXid)));
+	}
+	/*
+	 * c) transition from FULL_SNAPSHOT to CONSISTENT.
+	 *
+	 * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
+	 * oldestRunningXid is >= than nextXid from when we switched to
+	 * FULL_SNAPSHOT.  This means all transactions that are currently in
+	 * progress have a catalog snapshot, and all their changes have been
+	 * collected.  Switch to CONSISTENT.
+	 */
+	else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
+			 TransactionIdPrecedesOrEquals(builder->started_collection_at,
+										   running->oldestRunningXid))
+	{
+		builder->state = SNAPBUILD_CONSISTENT;
+		ereport(LOG,
+				(errmsg("logical decoding found consistent point at %X/%X",
+						(uint32) (lsn >> 32), (uint32) lsn),
+				 errdetail("There are no old transactions anymore.")));
+	}
+	else
+	{
+		SnapBuildWaitSnapshot(running);
 	}
 
 	/*
@@ -1421,8 +1375,35 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 * records so incremental cleanup can be performed.
 	 */
 	return true;
+
 }
 
+/*
+ * Iterate through all xids in record, wait for them to finish.
+ *
+ * This isn't required for the correctness of decoding, but to allow
+ * isolationtester to notice that we're currently waiting for something.
+ */
+static void
+SnapBuildWaitSnapshot(xl_running_xacts *running)
+{
+	int			off;
+
+	for (off = 0; off < running->xcnt; off++)
+	{
+		TransactionId xid = running->xids[off];
+
+		/*
+		 * Upper layers should prevent that we ever need to wait on
+		 * ourselves. Check anyway, since failing to do so would either
+		 * result in an endless wait or an Assert() failure.
+		 */
+		if (TransactionIdIsCurrentTransactionId(xid))
+			elog(ERROR, "waiting for ourselves");
+
+		XactLockTableWait(xid, NULL, NULL, XLTW_None);
+	}
+}
 
 /* -----------------------------------
  * Snapshot serialization support
@@ -1572,7 +1553,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 				 errmsg("could not remove file \"%s\": %m", path)));
 
 	needed_length = sizeof(SnapBuildOnDisk) +
-		sizeof(TransactionId) * builder->running.xcnt_space +
+		sizeof(TransactionId) * builder->running_old.xcnt_space +
 		sizeof(TransactionId) * builder->committed.xcnt;
 
 	ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
@@ -1591,7 +1572,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 	ondisk->builder.context = NULL;
 	ondisk->builder.snapshot = NULL;
 	ondisk->builder.reorder = NULL;
-	ondisk->builder.running.xip = NULL;
+	ondisk->builder.running_old.xip = NULL;
 	ondisk->builder.committed.xip = NULL;
 
 	COMP_CRC32C(ondisk->checksum,
@@ -1599,8 +1580,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 				sizeof(SnapBuild));
 
 	/* copy running xacts */
-	sz = sizeof(TransactionId) * builder->running.xcnt_space;
-	memcpy(ondisk_c, builder->running.xip, sz);
+	sz = sizeof(TransactionId) * builder->running_old.xcnt_space;
+	memcpy(ondisk_c, builder->running_old.xip, sz);
 	COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
 	ondisk_c += sz;
 
@@ -1763,10 +1744,10 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
 
 	/* restore running xacts information */
-	sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
-	ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
+	sz = sizeof(TransactionId) * ondisk.builder.running_old.xcnt_space;
+	ondisk.builder.running_old.xip = MemoryContextAllocZero(builder->context, sz);
 	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
-	readBytes = read(fd, ondisk.builder.running.xip, sz);
+	readBytes = read(fd, ondisk.builder.running_old.xip, sz);
 	pgstat_report_wait_end();
 	if (readBytes != sz)
 	{
@@ -1776,7 +1757,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 				 errmsg("could not read file \"%s\", read %d of %d: %m",
 						path, readBytes, (int) sz)));
 	}
-	COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
+	COMP_CRC32C(checksum, ondisk.builder.running_old.xip, sz);
 
 	/* restore committed xacts information */
 	sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
@@ -1842,11 +1823,12 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	}
 	ondisk.builder.committed.xip = NULL;
 
-	builder->running.xcnt = ondisk.builder.running.xcnt;
-	if (builder->running.xip)
-		pfree(builder->running.xip);
-	builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
-	builder->running.xip = ondisk.builder.running.xip;
+	/* FIXME: remove */
+	builder->running_old.xcnt = ondisk.builder.running_old.xcnt;
+	if (builder->running_old.xip)
+		pfree(builder->running_old.xip);
+	builder->running_old.xcnt_space = ondisk.builder.running_old.xcnt_space;
+	builder->running_old.xip = ondisk.builder.running_old.xip;
 
 	/* our snapshot is not interesting anymore, build a new one */
 	if (builder->snapshot != NULL)
@@ -1867,8 +1849,8 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	return true;
 
 snapshot_not_interesting:
-	if (ondisk.builder.running.xip != NULL)
-		pfree(ondisk.builder.running.xip);
+	if (ondisk.builder.running_old.xip != NULL)
+		pfree(ondisk.builder.running_old.xip);
 	if (ondisk.builder.committed.xip != NULL)
 		pfree(ondisk.builder.committed.xip);
 	return false;
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 494751d70a..ccb5f831c4 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -20,24 +20,30 @@ typedef enum
 	/*
 	 * Initial state, we can't do much yet.
 	 */
-	SNAPBUILD_START,
+	SNAPBUILD_START = -1,
+
+	/*
+	 * Collecting committed transactions, to build the initial catalog
+	 * snapshot.
+	 */
+	SNAPBUILD_BUILDING_SNAPSHOT = 0,
 
 	/*
 	 * We have collected enough information to decode tuples in transactions
 	 * that started after this.
 	 *
 	 * Once we reached this we start to collect changes. We cannot apply them
-	 * yet because the might be based on transactions that were still running
-	 * when we reached them yet.
+	 * yet, because they might be based on transactions that were still running
+	 * when FULL_SNAPSHOT was reached.
 	 */
-	SNAPBUILD_FULL_SNAPSHOT,
+	SNAPBUILD_FULL_SNAPSHOT = 1,
 
 	/*
-	 * Found a point after hitting built_full_snapshot where all transactions
-	 * that were running at that point finished. Till we reach that we hold
-	 * off calling any commit callbacks.
+	 * Found a point after SNAPBUILD_FULL_SNAPSHOT where all transactions that
+	 * were running at that point finished. Till we reach that we hold off
+	 * calling any commit callbacks.
 	 */
-	SNAPBUILD_CONSISTENT
+	SNAPBUILD_CONSISTENT = 2
 } SnapBuildState;
 
 /* forward declare so we don't have to expose the struct to the public */
@@ -73,9 +79,6 @@ extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
 				   TransactionId xid, int nsubxacts,
 				   TransactionId *subxacts);
-extern void SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
-				  TransactionId xid, int nsubxacts,
-				  TransactionId *subxacts);
 extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid,
 					   XLogRecPtr lsn);
 extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
-- 
2.12.0.264.gd6db3f2165.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to