On Mon, Jun 15, 2020 at 5:00 PM Michael Paquier <mich...@paquier.xyz> wrote:
> On Fri, Dec 28, 2018 at 02:21:44PM +1300, Thomas Munro wrote:
> > Just to be clear, although this patch is registered in the commitfest
> > and currently applies and tests pass, it is prototype/WIP code with
> > significant problems that remain to be resolved.  I sort of wish there
> > were a way to indicate that in the CF but since there isn't, I'm
> > saying that here.  What I hope to get from Kevin, Simon or other
> > reviewers is some feedback on the general approach and problems
> > discussed upthread (and other problems and better ideas I might have
> > missed).  So it's not seriously proposed for commit in this CF.
>
> No feedback has actually come, so I have moved it to next CF.

Having been nerd-sniped by SSI again, I spent some time this weekend
rebasing this old patch, making a few improvements, and reformulating
the problems to be solved as I see them.  It's very roughly based on
Kevin Grittner and Dan Ports' description of how you could give
SERIALIZABLE a useful meaning on hot standbys.  The short version of
the theory is that you can make it work like SERIALIZABLE READ ONLY
DEFERRABLE by adding a bit of extra information into the WAL stream.

Problems:

1.  As a prerequisite, we'd need to teach primary servers to make
transactions visible in the same order that they log commits.
Otherwise, we permit nonsense like seeing TX1 but not TX2 on the
primary, and TX2 but not TX1 on the replica.  You can probably argue
that our read replicas don't satisfy the lower isolation levels, let
alone serializable.

2.  Similarly, it's probably not OK that
PreCommit_CheckForSerializationFailure() determines
MySerializableXact->snapshotSafetyAfterThisCommit.  That may not
happen in exactly the same order as commits are logged.  Or maybe
there is some argument for why that is OK, based on what we're doing
with prepareSeqNo, or maybe we can do something with that to detect
disorder.

3.  The patch doesn't yet attempt to checkpoint the snapshot safety
state.  That's needed to start up in a sane state, without having to
wait for WAL activity.

4.  XactLogSnapshotSafetyRecord() flushes the WAL an extra time after
a commit is flushed, which I put in for testing; that's silly...
somehow it needs to be better integrated so we don't generate two sync
I/Os in a row.

5.  You probably want a way to turn off the extra WAL records and
SERIALIZABLEXACT consumption if you're using SERIALIZABLE on a primary
but not on the standby.  Or maybe there is some way to make it come on
automatically.

I think I have cleared up the matter of xmin tracking for
"hypothetical" SERIALIZABLEXACTs mentioned earlier.  It's not needed,
so should be set to InvalidTransactionId, and I added a comment to
explain why.

I also wrote a TAP test to exercise this thing.  It is the same
schedule as src/test/isolation/specs/read-only-anomaly-3.spec, except
that transaction 3 runs on a streaming replica.

One thing to point out is that this patch only aims to make it so that
streaming replicas can't observe a state that would have caused a
transaction to abort if it had been observed on the primary.  The TAP
test still has to insert its own wait-for-LSN loop to make sure step
"s1c" is replayed before "s3r" runs.  We could use
synchronous_commit=remote_apply, and that'd probably work just as well
for this particular test, but I'm not sure how to square that with
fixing problem #1 above.

The perl hackery I used to do overlapping transactions in a TAP test
is pretty crufty.  I guess we'd ideally have the isolation tester
support per-session connection strings, and somehow get some perl code
to orchestrate the cluster setup but then run the real isolation
tester.  Or something like that.
From 30b25251964b494e75a8ad2d48cd49583f32ebc5 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sun, 14 Jun 2020 21:46:02 +1200
Subject: [PATCH v5 1/3] SERIALIZABLE READ ONLY DEFERRABLE on streaming
 replicas.

Allow streaming replicas to wait for "safe" snapshots.  Do this by
injecting extra information into the WAL, so that replica servers can
determine when a safe snapshot can be taken.

WORK IN PROGRESS -- see thread for list of problems

Discussion: https://postgr.es/m/CAEepm%3D2b9TV%2BvJ4UeSBixDrW7VUiTjxPwWq8K3QwFSWx0pTXHQ%40mail.gmail.com
Discussion: https://postgr.es/m/4d3735e30200002500039...@gw.wicourts.gov
---
 doc/src/sgml/high-availability.sgml       |   9 -
 doc/src/sgml/mvcc.sgml                    |  21 --
 doc/src/sgml/ref/set_transaction.sgml     |  12 +
 src/backend/access/rmgrdesc/xactdesc.c    |  56 ++++
 src/backend/access/transam/xact.c         |  69 ++++-
 src/backend/access/transam/xlog.c         |   7 +
 src/backend/commands/variable.c           |   8 -
 src/backend/storage/lmgr/predicate.c      | 337 ++++++++++++++++++++--
 src/backend/storage/lmgr/proc.c           |   5 +
 src/include/access/xact.h                 |  15 +-
 src/include/access/xlogdefs.h             |  12 +
 src/include/storage/predicate.h           |   7 +-
 src/include/storage/predicate_internals.h |  12 +
 src/include/storage/proc.h                |   5 +
 14 files changed, 504 insertions(+), 71 deletions(-)

diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 65c3fc62a9..ebd50d940d 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -2463,15 +2463,6 @@ LOG:  database system is ready to accept read only connections
      your setting of <varname>max_prepared_transactions</varname> is 0.
     </para>
    </listitem>
-   <listitem>
-    <para>
-     The Serializable transaction isolation level is not yet available in hot
-     standby.  (See <xref linkend="xact-serializable"/> and
-     <xref linkend="serializable-consistency"/> for details.)
-     An attempt to set a transaction to the serializable isolation level in
-     hot standby mode will generate an error.
-    </para>
-   </listitem>
   </itemizedlist>
 
    </para>
diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml
index dda6f1f2ad..744d000498 100644
--- a/doc/src/sgml/mvcc.sgml
+++ b/doc/src/sgml/mvcc.sgml
@@ -1636,15 +1636,6 @@ SELECT pg_advisory_lock(q.id) FROM
     <para>
      See <xref linkend="xact-serializable"/> for performance suggestions.
     </para>
-
-    <warning>
-     <para>
-      This level of integrity protection using Serializable transactions
-      does not yet extend to hot standby mode (<xref linkend="hot-standby"/>).
-      Because of that, those using hot standby may want to use Repeatable
-      Read and explicit locking on the master.
-     </para>
-    </warning>
    </sect2>
 
    <sect2 id="non-serializable-consistency">
@@ -1738,18 +1729,6 @@ SELECT pg_advisory_lock(q.id) FROM
     table and other tables in the database.
    </para>
 
-   <para>
-    Support for the Serializable transaction isolation level has not yet
-    been added to Hot Standby replication targets (described in
-    <xref linkend="hot-standby"/>).  The strictest isolation level currently
-    supported in hot standby mode is Repeatable Read.  While performing all
-    permanent database writes within Serializable transactions on the
-    master will ensure that all standbys will eventually reach a consistent
-    state, a Repeatable Read transaction run on the standby can sometimes
-    see a transient state that is inconsistent with any serial execution
-    of the transactions on the master.
-   </para>
-
    <para>
     Internal access to the system catalogs is not done using the isolation
     level of the current transaction.  This means that newly created database
diff --git a/doc/src/sgml/ref/set_transaction.sgml b/doc/src/sgml/ref/set_transaction.sgml
index ec436b2d16..6eaadd47ac 100644
--- a/doc/src/sgml/ref/set_transaction.sgml
+++ b/doc/src/sgml/ref/set_transaction.sgml
@@ -155,6 +155,18 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
    is well suited for long-running reports or backups.
   </para>
 
+  <para>
+    Serializable transactions cannot use the <literal>NOT DEFERRABLE</literal>
+    option when run on Hot Standby servers.  <literal>READ ONLY</literal>
+    and <literal>DEFERRABLE</literal> are the default settings for Hot Standby
+    servers, but may also be specified explicitly.  The first statement run in
+    a Serializable transaction on a Hot Standby server may need to wait until a
+    point in the transaction stream where it is not possible to see a transient
+    state that is inconsistent with any serial execution of the transactions on
+    the primary server.  This may introduce a pause, but avoids the need to
+    coordinate predicate locking across multiple servers.
+  </para>
+
   <para>
    The <literal>SET TRANSACTION SNAPSHOT</literal> command allows a new
    transaction to run with the same <firstterm>snapshot</firstterm> as an existing
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 9fce75565f..440241fe00 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -123,6 +123,17 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_origin);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xl_snapshot_safety =
+			(xl_xact_snapshot_safety *) data;
+
+		parsed->snapshot_token = xl_snapshot_safety->token;
+		parsed->snapshot_safety = xl_snapshot_safety->safety;
+
+		data += sizeof(xl_xact_snapshot_safety);
+	}
 }
 
 void
@@ -279,6 +290,27 @@ xact_desc_subxacts(StringInfo buf, int nsubxacts, TransactionId *subxacts)
 	}
 }
 
+static const char *
+xact_snapshot_safety_to_string(SnapshotSafety snapshot_safety)
+{
+	const char *string = "<unknown>";
+
+	switch(snapshot_safety)
+	{
+	case SNAPSHOT_SAFE:
+		string = "SNAPSHOT_SAFE";
+		break;
+	case SNAPSHOT_UNSAFE:
+		string = "SNAPSHOT_UNSAFE";
+		break;
+	case SNAPSHOT_SAFETY_UNKNOWN:
+		string = "SNAPSHOT_SAFETY_UNKNOWN";
+		break;
+	}
+
+	return string;
+}
+
 static void
 xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
 {
@@ -310,6 +342,13 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
 						 (uint32) parsed.origin_lsn,
 						 timestamptz_to_str(parsed.origin_timestamp));
 	}
+
+	if (parsed.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		appendStringInfo(buf, "; snapshot safety: %s, token: %lx",
+						 xact_snapshot_safety_to_string(parsed.snapshot_safety),
+						 parsed.snapshot_token);
+	}
 }
 
 static void
@@ -359,6 +398,14 @@ xact_desc_assignment(StringInfo buf, xl_xact_assignment *xlrec)
 		appendStringInfo(buf, " %u", xlrec->xsub[i]);
 }
 
+static void
+xact_desc_snapshot_safety(StringInfo buf, xl_xact_snapshot_safety *xlrec)
+{
+	appendStringInfo(buf, "snapshot safety: %s, token: %lx",
+					 xact_snapshot_safety_to_string(xlrec->safety),
+					 xlrec->token);
+}
+
 void
 xact_desc(StringInfo buf, XLogReaderState *record)
 {
@@ -396,6 +443,12 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
 		xact_desc_assignment(buf, xlrec);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec = (xl_xact_snapshot_safety *) rec;
+
+		xact_desc_snapshot_safety(buf, xlrec);
+	}
 }
 
 const char *
@@ -423,6 +476,9 @@ xact_identify(uint8 info)
 		case XLOG_XACT_ASSIGNMENT:
 			id = "ASSIGNMENT";
 			break;
+		case XLOG_XACT_SNAPSHOT_SAFETY:
+			id = "SNAPSHOT_SAFETY";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index cd30b62d36..0a1ee18fb8 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1946,13 +1946,14 @@ StartTransaction(void)
 	{
 		s->startedInRecovery = true;
 		XactReadOnly = true;
+		XactDeferrable = true;
 	}
 	else
 	{
 		s->startedInRecovery = false;
 		XactReadOnly = DefaultXactReadOnly;
+		XactDeferrable = DefaultXactDeferrable;
 	}
-	XactDeferrable = DefaultXactDeferrable;
 	XactIsoLevel = DefaultXactIsoLevel;
 	forceSyncCommit = false;
 	MyXactFlags = 0;
@@ -5480,6 +5481,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
+	xl_xact_snapshot_safety xl_snapshot_safety;
 	uint8		info;
 
 	Assert(CritSectionCount > 0);
@@ -5558,6 +5560,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
+	if (IsolationIsSerializable())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_SNAPSHOT_SAFETY;
+		GetSnapshotSafetyAfterThisCommit(&xl_snapshot_safety.token,
+										 &xl_snapshot_safety.safety);
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
@@ -5606,6 +5615,10 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+		XLogRegisterData((char *) (&xl_snapshot_safety),
+						 sizeof(xl_xact_snapshot_safety));
+
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
@@ -5792,6 +5805,10 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 		 */
 		RecordKnownAssignedTransactionIds(max_xid);
 
+		if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+			BeginSnapshotSafetyReplay(parsed->snapshot_token,
+									  parsed->snapshot_safety);
+
 		/*
 		 * Mark the transaction committed in pg_xact. We use async commit
 		 * protocol during recovery to provide information on database
@@ -5803,6 +5820,9 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 		 */
 		TransactionIdAsyncCommitTree(xid, parsed->nsubxacts, parsed->subxacts, lsn);
 
+		if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+			CompleteSnapshotSafetyReplay();
+
 		/*
 		 * We must mark clog before we update the ProcArray.
 		 */
@@ -5944,6 +5964,46 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
 	DropRelationFiles(parsed->xnodes, parsed->nrels, true);
 }
 
+XLogRecPtr
+XactLogSnapshotSafetyRecord(SnapshotToken token, SnapshotSafety safety)
+{
+	XLogRecPtr result;
+	xl_xact_snapshot_safety snapshot_safety;
+
+	snapshot_safety.token = token;
+	snapshot_safety.safety = safety;
+
+	START_CRIT_SECTION();
+	XLogBeginInsert();
+	XLogRegisterData((char *) &snapshot_safety, sizeof(snapshot_safety));
+	result = XLogInsert(RM_XACT_ID, XLOG_XACT_SNAPSHOT_SAFETY);
+	END_CRIT_SECTION();
+
+	/*
+	 * TODO: How can we avoid having to flush again (after already flushing
+	 * for the commit)?  If we don't have this flush here, then they standby
+	 * has to wait a while to find out whether its snapshot is safe.
+	 */
+	XLogFlush(result);
+
+	return result;
+}
+
+static void
+xact_redo_snapshot_safety(xl_xact_snapshot_safety *snapshot_safety)
+{
+	/*
+	 * Any earlier COMMIT record must have carried a snapshot safety message
+	 * the same token as this record, and had safety ==
+	 * SNAPSHOT_SAFETY_UNKNOWN.  This new independent snapshot safety message
+	 * reports that the safety is now known.  We will wake any backend that is
+	 * waiting to learn if the snapshot is safe.
+	 */
+	if (standbyState >= STANDBY_INITIALIZED)
+		NotifyHypotheticalSnapshotSafety(snapshot_safety->token,
+										 snapshot_safety->safety);
+}
+
 void
 xact_redo(XLogReaderState *record)
 {
@@ -6017,6 +6077,13 @@ xact_redo(XLogReaderState *record)
 			ProcArrayApplyXidAssignment(xlrec->xtop,
 										xlrec->nsubxacts, xlrec->xsub);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec =
+			(xl_xact_snapshot_safety *) XLogRecGetData(record);
+
+		xact_redo_snapshot_safety(xlrec);
+	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 55cac186dc..7ca629945b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5252,6 +5252,8 @@ BootStrapXLOG(void)
 	checkPoint.newestCommitTsXid = InvalidTransactionId;
 	checkPoint.time = (pg_time_t) time(NULL);
 	checkPoint.oldestActiveXid = InvalidTransactionId;
+//	checkPoint.newestSnapshotToken = 0;
+//	checkPoint.newestSnapshotSafety = SNAPSHOT_SAFE;
 
 	ShmemVariableCache->nextFullXid = checkPoint.nextFullXid;
 	ShmemVariableCache->nextOid = checkPoint.nextOid;
@@ -5261,6 +5263,8 @@ BootStrapXLOG(void)
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
+//	SetNewestSnapshotSafety(checkPoint.newestSnapshotToken,
+//							checkPoint.newestSnapshotSafety);
 
 	/* Set up the XLOG page header */
 	page->xlp_magic = XLOG_PAGE_MAGIC;
@@ -6771,6 +6775,9 @@ StartupXLOG(void)
 	SetCommitTsLimit(checkPoint.oldestCommitTsXid,
 					 checkPoint.newestCommitTsXid);
 	XLogCtl->ckptFullXid = checkPoint.nextFullXid;
+// TODO: figure out correct checkpointing protocol for safe snapshots
+//	SetNewestSnapshotSafety(checkPoint.newestSnapshotToken,
+//							checkPoint.newestSnapshotSafety);
 
 	/*
 	 * Initialize replication slots, before there's a chance to remove
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index 484f7ea2c0..7e5dabbab2 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -541,14 +541,6 @@ check_XactIsoLevel(int *newval, void **extra, GucSource source)
 			GUC_check_errmsg("SET TRANSACTION ISOLATION LEVEL must not be called in a subtransaction");
 			return false;
 		}
-		/* Can't go to serializable mode while recovery is still active */
-		if (newXactIsoLevel == XACT_SERIALIZABLE && RecoveryInProgress())
-		{
-			GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED);
-			GUC_check_errmsg("cannot use serializable mode in a hot standby");
-			GUC_check_errhint("You can use REPEATABLE READ instead.");
-			return false;
-		}
 	}
 
 	return true;
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index d24919f76b..5e7b8fc43b 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -286,6 +286,7 @@
 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
 #define SxactIsPartiallyReleased(sxact) (((sxact)->flags & SXACT_FLAG_PARTIALLY_RELEASED) != 0)
+#define SxactIsHypothetical(sxact) (((sxact)->flags & SXACT_FLAG_HYPOTHETICAL) != 0)
 
 /*
  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
@@ -449,7 +450,8 @@ static void SummarizeOldestCommittedSxact(void);
 static Snapshot GetSafeSnapshot(Snapshot snapshot);
 static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 													  VirtualTransactionId *sourcevxid,
-													  int sourcepid);
+													  int sourcepid,
+													  SnapshotSafety *snapshot_safety);
 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
 									  PREDICATELOCKTARGETTAG *parent);
@@ -1206,6 +1208,9 @@ InitPredicateLocks(void)
 		PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
 		PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
 		PredXact->OldCommittedSxact->pid = 0;
+		SHMQueueInit(&PredXact->snapshotSafetyWaitList);
+		PredXact->NewestSnapshotToken = 0;
+		PredXact->NewestSnapshotSafety = SNAPSHOT_SAFE;
 	}
 	/* This never changes, so let's keep a local copy. */
 	OldCommittedSxact = PredXact->OldCommittedSxact;
@@ -1488,6 +1493,7 @@ static Snapshot
 GetSafeSnapshot(Snapshot origSnapshot)
 {
 	Snapshot	snapshot;
+	SnapshotSafety snapshot_safety;
 
 	Assert(XactReadOnly && XactDeferrable);
 
@@ -1500,10 +1506,33 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * one passed to it, but we avoid assuming that here.
 		 */
 		snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
-														 NULL, InvalidPid);
-
-		if (MySerializableXact == InvalidSerializableXact)
-			return snapshot;	/* no concurrent r/w xacts; it's safe */
+														 NULL, InvalidPid,
+														 &snapshot_safety);
+		if (RecoveryInProgress())
+		{
+			/*
+			 * Check if the most recently replayed COMMIT record was either
+			 * known to be safe because it had no concurrent r/w xacts on the
+			 * primary, or has subsequently been declared safe by a snapshot
+			 * safety record.
+			 */
+			if (snapshot_safety == SNAPSHOT_SAFE)
+				return snapshot;
+			else if (snapshot_safety == SNAPSHOT_UNSAFE)
+			{
+				/*
+				 * TODO:  This can only happen if the master ran out of memory
+				 * while trying to create a hypothetical transaction, right?
+				 * Should we wait or error out?
+				 */
+				continue;
+			}
+		}
+		else
+		{
+			if (MySerializableXact == InvalidSerializableXact)
+				return snapshot;	/* no concurrent r/w xacts; it's safe */
+	}
 
 		LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
 
@@ -1511,20 +1540,48 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * Wait for concurrent transactions to finish. Stop early if one of
 		 * them marked us as conflicted.
 		 */
-		MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
-		while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
-				 SxactIsROUnsafe(MySerializableXact)))
+		if (RecoveryInProgress())
 		{
-			LWLockRelease(SerializableXactHashLock);
-			ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
-			LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			/*
+			 * Running on a standby.  Wait for a the primary to tell us the
+			 * result of testing a hypothetical transaction whose
+			 * serializability matches the snapshot we have.
+			 */
+			Assert(snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN);
+			Assert(!SHMQueueIsDetached(&MyProc->safetyLinks));
+			while (MyProc->snapshotSafety == SNAPSHOT_SAFETY_UNKNOWN)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			if (MyProc->snapshotSafety == SNAPSHOT_SAFE)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
-		MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
-
-		if (!SxactIsROUnsafe(MySerializableXact))
+		else
 		{
-			LWLockRelease(SerializableXactHashLock);
-			break;				/* success */
+			/*
+			 * Running on primary.  Wait for a signal from one of the backends
+			 * that we possibly conflict with.
+			 */
+			MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
+			while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
+					 SxactIsROUnsafe(MySerializableXact)))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
+
+			if (!SxactIsROUnsafe(MySerializableXact))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
 
 		LWLockRelease(SerializableXactHashLock);
@@ -1539,12 +1596,64 @@ GetSafeSnapshot(Snapshot origSnapshot)
 	/*
 	 * Now we have a safe snapshot, so we don't need to do any further checks.
 	 */
-	Assert(SxactIsROSafe(MySerializableXact));
+	Assert(RecoveryInProgress() || SxactIsROSafe(MySerializableXact));
 	ReleasePredicateLocks(false, true);
 
 	return snapshot;
 }
 
+	/*
+	 * When the primary server has determined the safety of a hypothetical
+	 * snapshot which was previously reported as SNAPSHOT_SAFETY_UNKNOWN in a
+	 * COMMIT record, it emits a WAL record that causes the recovery process on
+	 * standbys to call this function.  Here, we will wake up any backend that is
+	 * currently waiting in GetSafeSnapshot to learn about the safety of a
+	 * snapshot taken after that point in the transaction stream.
+	 */
+void
+NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety)
+{
+	PGPROC	   *proc;
+	PGPROC	   *next;
+
+	Assert(AmStartupProcess());
+	elog(LOG, "NotifyHypotheticalSnapshotSafety token = %ld, safety = %d", token, safety);
+
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+	/*
+	 * Walk the list of processes that are waiting in GetSafeSnapshot on a
+	 * standby, and find any that are waiting to learn the safety of a
+	 * snapshot taken at a point in time when this token appeared on the most
+	 * recently replayed SSI transaction.  If we find any of those, tell them
+	 * the final status for and wake them up.
+	 */
+	proc = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+								   &PredXact->snapshotSafetyWaitList,
+								   offsetof(PGPROC, safetyLinks));
+	while (proc != NULL)
+	{
+		next = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+									   &proc->safetyLinks,
+									   offsetof(PGPROC, safetyLinks));
+		if (proc->waitSnapshotToken == token)
+		{
+			SHMQueueDelete(&proc->safetyLinks);
+			proc->snapshotSafety = safety;
+			ProcSendSignal(proc->pid);
+		}
+		proc = next;
+	}
+
+	/*
+	 * If this happens to be the most recently replayed snapshot token then
+	 * remember this safety value.
+	 */
+	if (PredXact->NewestSnapshotToken == token)
+		PredXact->NewestSnapshotSafety = safety;
+	LWLockRelease(SerializableXactHashLock);
+}
+
 /*
  * GetSafeSnapshotBlockingPids
  *		If the specified process is currently blocked in GetSafeSnapshot,
@@ -1613,16 +1722,17 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 
 	/*
 	 * Can't use serializable mode while recovery is still active, as it is,
-	 * for example, on a hot standby.  We could get here despite the check in
-	 * check_XactIsoLevel() if default_transaction_isolation is set to
-	 * serializable, so phrase the hint accordingly.
+	 * for example, on a hot standby, unless DEFERRABLE mode is active.  In
+	 * that case, DEFERRABLE is the default, so this error should should only
+	 * be reachable if the user has explicitly asked for NOT DEFERRABLE via
+	 * SET transaction_deferrable or SET/BEGIN TRANSACTION ISOLATION LEVEL.
 	 */
-	if (RecoveryInProgress())
+	if (RecoveryInProgress() && (!XactReadOnly || !XactDeferrable))
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("cannot use serializable mode in a hot standby"),
-				 errdetail("\"default_transaction_isolation\" is set to \"serializable\"."),
-				 errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default.")));
+				 errmsg("cannot use serializable not deferrable mode in a hot standby"),
+				 errdetail("Serializable transactions must be DEFERRABLE when run on hot standby servers."),
+				 errhint("You can use \"SET transaction_deferrable = true\", use DEFERRABLE when specifying the transaction isolation level, or avoid explicitly specifying NOT DEFERRABLE.")));
 
 	/*
 	 * A special optimization is available for SERIALIZABLE READ ONLY
@@ -1633,7 +1743,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 		return GetSafeSnapshot(snapshot);
 
 	return GetSerializableTransactionSnapshotInt(snapshot,
-												 NULL, InvalidPid);
+												 NULL, InvalidPid, NULL);
 }
 
 /*
@@ -1676,7 +1786,7 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
 				 errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
 
 	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid,
-												 sourcepid);
+												 sourcepid, NULL);
 }
 
 /*
@@ -1687,11 +1797,17 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
  * loaded up.  HOWEVER: to avoid race conditions, we must check that the
  * source xact is still running after we acquire SerializableXactHashLock.
  * We do that by calling ProcArrayInstallImportedXmin.
+ *
+ * If snapshot_safety is a non-NULL, then the safety of this snapshot if used
+ * on a standby server is written to it.  If it is SNAPSHOT_SAFE, then the
+ * snapshot may be safely used.  If it is SNAPSHOT_SAFETY_UNKNOWN, then the
+ * caller must wait for the safety to be announced in the WAL.
  */
 static Snapshot
 GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 									  VirtualTransactionId *sourcevxid,
-									  int sourcepid)
+									  int sourcepid,
+									  SnapshotSafety *snapshot_safety)
 {
 	PGPROC	   *proc;
 	VirtualTransactionId vxid;
@@ -1701,8 +1817,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 	/* We only do this for serializable transactions.  Once. */
 	Assert(MySerializableXact == InvalidSerializableXact);
 
-	Assert(!RecoveryInProgress());
-
 	/*
 	 * Since all parts of a serializable transaction must use the same
 	 * snapshot, it is too late to establish one after a parallel operation
@@ -1743,6 +1857,30 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 		}
 	} while (!sxact);
 
+	/*
+	 * Note the snapshot safety information for standbys.  This can be used to
+	 * know if the returned snapshot is already known to be safe/unsafe, or if
+	 * we must wait for notification of the final safety determination.
+	 */
+	if (snapshot_safety != NULL)
+	{
+		*snapshot_safety = PredXact->NewestSnapshotSafety;
+		if (*snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN)
+		{
+			/*
+			 * We must put the this process into the waitlist while we hold
+			 * the lock or there would be a race condition where we might miss
+			 * a notification.  The caller must wait for
+			 * MyProc->snapshotSafety to be set to a final value.
+			 */
+			MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+			MyProc->waitSnapshotToken = PredXact->NewestSnapshotToken;
+			if (SHMQueueIsDetached(&MyProc->safetyLinks))
+				SHMQueueInsertBefore(&PredXact->snapshotSafetyWaitList,
+									 &MyProc->safetyLinks);
+		}
+	}
+
 	/* Get the snapshot, or check that it's safe to use */
 	if (!sourcevxid)
 		snapshot = GetSnapshotData(snapshot);
@@ -3214,6 +3352,7 @@ SetNewSxactGlobalXmin(void)
 	for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
 	{
 		if (!SxactIsRolledBack(sxact)
+			&& !SxactIsHypothetical(sxact)
 			&& !SxactIsCommitted(sxact)
 			&& sxact != OldCommittedSxact)
 		{
@@ -3596,12 +3735,35 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
 
 			/*
 			 * Wake up the process for a waiting DEFERRABLE transaction if we
-			 * now know it's either safe or conflicted.
+			 * now know it's either safe or conflicted.  This releases
+			 * SERIALIZABLE READ ONLY DEFERRABLE transactions on the primary.
 			 */
 			if (SxactIsDeferrableWaiting(roXact) &&
 				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
 				ProcSendSignal(roXact->pid);
 
+			/*
+			 * If a hypothetical transaction is now known to be safe or
+			 * unsafe, we can report that in the WAL for the benefit of
+			 * standbys and recycle it.  This releases SERIALIZABLE READ ONLY
+			 * DEFERRABLE transactions that are waiting for the status of this
+			 * particular hypothetical tranasactions on any standby that
+			 * replays it.
+			 */
+			if (SxactIsHypothetical(roXact) &&
+				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
+			{
+				SnapshotSafety safety;
+
+				if (SxactIsROSafe(roXact))
+					safety = SNAPSHOT_SAFE;
+				else
+					safety = SNAPSHOT_UNSAFE;
+				XactLogSnapshotSafetyRecord(roXact->SeqNo.lastCommitBeforeSnapshot,
+											safety);
+				ReleasePredXact(roXact);
+			}
+
 			possibleUnsafeConflict = nextConflict;
 		}
 	}
@@ -4838,6 +5000,88 @@ PreCommit_CheckForSerializationFailure(void)
 	MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
 	MySerializableXact->flags |= SXACT_FLAG_PREPARED;
 
+	/*
+	 * For the benefit of hot standby servers that want to take a safe
+	 * SERIALIZABLE READ ONLY DEFERRABLE snapshot, we will check whether a
+	 * hypothetical read-only serializable transaction that starts after this
+	 * transaction commits would be safe.
+	 */
+	if (PredXact->WritableSxactCount == (XactReadOnly ? 0 : 1))
+	{
+		/*
+		 * There are no concurrent writable SERIALIZABLE transactions.  A
+		 * read-only snapshot taken immediately after this one commits is
+		 * safe.
+		 */
+		MySerializableXact->snapshotSafetyAfterThisCommit = SNAPSHOT_SAFE;
+	}
+	else
+	{
+		SERIALIZABLEXACT *sxact;
+		SERIALIZABLEXACT *othersxact;
+
+		/*
+		 * We can't yet determine whether a read-only transaction beginning
+		 * now would be safe.  Create a hypothetical SERIALIZABLEXACT and let
+		 * ReleasePredicateLocks report on its safety once that can be
+		 * determined.
+		 */
+		sxact = CreatePredXact();
+		if (sxact == NULL)
+		{
+			/* Out of space.  Don't allow SERIALIZABLE on standbys. */
+			MySerializableXact->snapshotSafetyAfterThisCommit =
+				SNAPSHOT_UNSAFE;
+		}
+		else
+		{
+			SetInvalidVirtualTransactionId(sxact->vxid);
+			sxact->SeqNo.lastCommitBeforeSnapshot =
+				MySerializableXact->prepareSeqNo;
+			sxact->prepareSeqNo = InvalidSerCommitSeqNo;
+			sxact->commitSeqNo = InvalidSerCommitSeqNo;
+			SHMQueueInit(&(sxact->outConflicts));
+			SHMQueueInit(&(sxact->inConflicts));
+			SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+			sxact->topXid = InvalidTransactionId;
+			sxact->finishedBefore = InvalidTransactionId;
+			sxact->pid = InvalidPid;
+			SHMQueueInit(&(sxact->predicateLocks));
+			SHMQueueElemInit(&(sxact->finishedLink));
+			sxact->flags = SXACT_FLAG_READ_ONLY | SXACT_FLAG_HYPOTHETICAL;
+
+			/*
+			 * Hypothetical sxacts are not garbage-collected by
+			 * ClearOldPredicateLocks() and are not tracked by
+			 * SetNewSxactGlobalXmin().  Instead they are freed by
+			 * ReleasePredicateLocks() as soon as they are determined to be
+			 * safe or unsafe.  Therefore, they don't need an xmin.
+			 */
+			sxact->xmin = InvalidTransactionId;
+
+			/* Register concurrent r/w transactions as possible conflicts. */
+			for (othersxact = FirstPredXact();
+				 othersxact != NULL;
+				 othersxact = NextPredXact(othersxact))
+			{
+				if (othersxact != MySerializableXact
+					&& !SxactIsCommitted(othersxact)
+					&& !SxactIsDoomed(othersxact)
+					&& !SxactIsReadOnly(othersxact))
+				{
+					SetPossibleUnsafeConflict(sxact, othersxact);
+				}
+			}
+
+			/*
+			 * The status will be reported in a later WAL record once it has
+			 * been determined.
+			 */
+			MySerializableXact->snapshotSafetyAfterThisCommit =
+				SNAPSHOT_SAFETY_UNKNOWN;
+		}
+	}
+
 	LWLockRelease(SerializableXactHashLock);
 }
 
@@ -5132,3 +5376,36 @@ AttachSerializableXact(SerializableXactHandle handle)
 	if (MySerializableXact != InvalidSerializableXact)
 		CreateLocalPredicateLockHash();
 }
+
+/*
+ * Accessor for the hypothetical snapshot safety information needed for commit
+ * records generated on primary servers.  This is used by XlactLogCommitRecord
+ * to receive the safety level computed by
+ * PreCommit_CheckForSerializationFailure in a committing SSI transaction.
+ */
+void
+GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety)
+{
+	*token = MySerializableXact->prepareSeqNo;
+	*safety = MySerializableXact->snapshotSafetyAfterThisCommit;
+}
+
+/*
+ * Used in recovery when replaying commit records.  On a hot standby, these
+ * values must be set atomically with ProcArray updates, mirroring the code in
+ * GetSerializableTransactionSnapshotInt.  CompleteSnapshotSafetyReplay() must
+ * be called after the transaction is marked committed.
+ */
+void
+BeginSnapshotSafetyReplay(SnapshotToken token, SnapshotSafety safety)
+{
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+	PredXact->NewestSnapshotToken = token;
+	PredXact->NewestSnapshotSafety = safety;
+}
+
+void
+CompleteSnapshotSafetyReplay(void)
+{
+	LWLockRelease(SerializableXactHashLock);
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index f5eef6fa4e..78263eb986 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -421,6 +421,11 @@ InitProcess(void)
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
 	SHMQueueElemInit(&(MyProc->syncRepLinks));
 
+	/* Initialize fields for SERIALIZABLE on standbys */
+	MyProc->waitSnapshotToken = 0;
+	MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+	SHMQueueElemInit(&(MyProc->safetyLinks));
+
 	/* Initialize fields for group XID clearing. */
 	MyProc->procArrayGroupMember = false;
 	MyProc->procArrayGroupMemberXid = InvalidTransactionId;
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 88025b1cc2..52007675ba 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -146,7 +146,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_SNAPSHOT_SAFETY   0x60
 /* free opcode 0x70 */
 
 /* mask for filtering opcodes out of xl_info */
@@ -167,6 +167,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_ORIGIN			(1U << 5)
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
 #define XACT_XINFO_HAS_GID				(1U << 7)
+#define XACT_XINFO_HAS_SNAPSHOT_SAFETY	(1U << 8)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -262,6 +263,12 @@ typedef struct xl_xact_origin
 	TimestampTz origin_timestamp;
 } xl_xact_origin;
 
+typedef struct xl_xact_snapshot_safety
+{
+	SnapshotToken token;
+	SnapshotSafety safety;
+} xl_xact_snapshot_safety;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -274,6 +281,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
 	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
+	/* xl_xact_snapshot_safety follows if XINFO_HAS_SNAPSHOT_SAFETY */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -339,6 +347,9 @@ typedef struct xl_xact_parsed_commit
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
+
+	SnapshotToken snapshot_token;
+	SnapshotSafety snapshot_safety;
 } xl_xact_parsed_commit;
 
 typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
@@ -444,6 +455,8 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 									 int nrels, RelFileNode *rels,
 									 int xactflags, TransactionId twophase_xid,
 									 const char *twophase_gid);
+extern XLogRecPtr XactLogSnapshotSafetyRecord(SnapshotToken token,
+											  SnapshotSafety safety);
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index e1f5812213..a425160f54 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -57,6 +57,18 @@ typedef uint32 TimeLineID;
  */
 typedef uint16 RepOriginId;
 
+/*
+ * Snapshot safety information using to control SERIALIAZABLE on standby
+ * servers appears in checkpoints, so we define the types used here.
+ */
+typedef uint64 SnapshotToken;
+typedef enum SnapshotSafety
+{
+	SNAPSHOT_SAFE,
+	SNAPSHOT_UNSAFE,
+	SNAPSHOT_SAFETY_UNKNOWN
+} SnapshotSafety;
+
 /*
  *	Because O_DIRECT bypasses the kernel buffers, and because we never
  *	read those buffers except during crash recovery or if wal_level != minimal,
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index 86e756d5fb..7015aab167 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -18,7 +18,6 @@
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
 
-
 /*
  * GUC variables
  */
@@ -84,4 +83,10 @@ extern void predicatelock_twophase_recover(TransactionId xid, uint16 info,
 extern SerializableXactHandle ShareSerializableXact(void);
 extern void AttachSerializableXact(SerializableXactHandle handle);
 
+/* hypothetical snapshot safety support, allowing SERIALIZABLE on standbys */
+extern void GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety);
+extern void NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety);
+extern void BeginSnapshotSafetyReplay(SnapshotToken token, SnapshotSafety safety);
+extern void CompleteSnapshotSafetyReplay(void);
+
 #endif							/* PREDICATE_H */
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index cf9694d65e..055c8d0ea8 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -105,6 +105,12 @@ typedef struct SERIALIZABLEXACT
 	 */
 	SHM_QUEUE	possibleUnsafeConflicts;
 
+	/*
+	 * for committing transactions: would a hypothetical read-only snapshot
+	 * taken immediately after this transaction commits be safe?
+	 */
+	SnapshotSafety snapshotSafetyAfterThisCommit;
+
 	TransactionId topXid;		/* top level xid for the transaction, if one
 								 * exists; else invalid */
 	TransactionId finishedBefore;	/* invalid means still running; else the
@@ -137,6 +143,7 @@ typedef struct SERIALIZABLEXACT
  * reference to it.  It'll be recycled by the leader at end-of-transaction.
  */
 #define SXACT_FLAG_PARTIALLY_RELEASED	0x00000800
+#define SXACT_FLAG_HYPOTHETICAL			0x00001000
 
 /*
  * The following types are used to provide an ad hoc list for holding
@@ -186,6 +193,11 @@ typedef struct PredXactListData
 												 * seq no */
 	SERIALIZABLEXACT *OldCommittedSxact;	/* shared copy of dummy sxact */
 
+	/* Tracking of snapshot safety on standby servers. */
+	SHM_QUEUE	snapshotSafetyWaitList;
+	SnapshotToken NewestSnapshotToken;
+	SnapshotSafety NewestSnapshotSafety;
+
 	PredXactListElement element;
 }			PredXactListData;
 
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 1ee9000b2b..eeb36e0657 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -154,6 +154,11 @@ struct PGPROC
 	int			syncRepState;	/* wait state for sync rep */
 	SHM_QUEUE	syncRepLinks;	/* list link if process is in syncrep queue */
 
+	/* Info to allow standbys to wait for a safe SERIALIZABLE snapshot */
+	SnapshotToken waitSnapshotToken;
+	SnapshotSafety snapshotSafety;	/* space for result */
+	SHM_QUEUE	safetyLinks;	/* list link for GetSafeSnapshot */
+
 	/*
 	 * All PROCLOCK objects for locks held or awaited by this backend are
 	 * linked into one of these lists, according to the partition number of
-- 
2.20.1

From 9a2d668d15526daa6f47101c2eafcf4dbb36aabb Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sun, 14 Jun 2020 22:30:59 +1200
Subject: [PATCH v5 2/3] Add perl module for isolation-like testing.

To allow TAP tests to send interleaved statements to one or more
backends, like the isolation tester, define a PsqlSession class.
---
 src/test/perl/PsqlSession.pm | 139 +++++++++++++++++++++++++++++++++++
 1 file changed, 139 insertions(+)
 create mode 100644 src/test/perl/PsqlSession.pm

diff --git a/src/test/perl/PsqlSession.pm b/src/test/perl/PsqlSession.pm
new file mode 100644
index 0000000000..20ff923756
--- /dev/null
+++ b/src/test/perl/PsqlSession.pm
@@ -0,0 +1,139 @@
+=pod
+
+=head1 NAME
+
+PsqlSession - class representing psql connection
+
+=head1 SYNOPSIS
+
+  use PsqlSession;
+
+  my $node = PostgresNode->get_new_node('mynode');
+  my $session = PsqlSession->new($node, "dbname");
+
+  # send simple query and wait for one line response
+  my $result = $session->send("SELECT 42;", 1);
+
+  # close connection
+  $session->close();
+
+=head1 DESCRIPTION
+
+PsqlSession allows for tests of interleaved operations, similar to
+isolation tests.
+
+=cut
+
+package PsqlSession;
+
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use IPC::Run qw(pump finish timer);
+
+our @EXPORT = qw(
+  new
+  send
+  close
+);
+
+=pod
+
+=head1 METHODS
+
+=over
+
+=item PsqlSession::new($class, $node, $dbname)
+
+Create a new PsqlSession instance, connected to a database.
+
+=cut
+
+sub new
+{
+	my ($class, $node, $dbname) = @_;
+	my $timer = timer(5);
+	my $stdin = '';
+	my $stdout = '';
+	my $harness = $node->interactive_psql($dbname, \$stdin, \$stdout, $timer);
+	my $self = {
+		_harness => $harness,
+		_stdin => \$stdin,
+		_stdout => \$stdout,
+		_timer => $timer
+	};
+	bless $self, $class;
+	return $self;
+}
+
+=pod
+
+=item $session->send($input, $lines)
+
+Send the given input to psql, and then wait for the given number of lines
+of output, or a timeout.
+
+=cut
+
+sub count_lines
+{
+	my ($s) = @_;
+	return $s =~ tr/\n//;
+}
+
+sub send
+{
+	my ($self, $statement, $lines) = @_;
+	${$self->{_stdout}} = '';
+	${$self->{_stdin}} .= $statement;
+	$self->{_timer}->start(5);
+	pump $self->{_harness} until count_lines(${$self->{_stdout}}) == $lines || $self->{_timer}->is_expired;
+	die "expected ${lines} lines but after timeout, received only: ${$self->{_stdout}}" if $self->{_timer}->is_expired;
+	my @result = split /\n/, ${$self->{_stdout}};
+	chop(@result);
+	return @result;
+}
+
+=pod
+
+=item $session->check_is_blocked($input, $lines)
+
+Wait for a timeout to expire, and complain if any input is received before that.
+
+=cut
+
+sub check_is_blocked
+{
+	my ($self) = @_;
+	${$self->{_stdout}} = '';
+	$self->{_timer}->start(5);
+	pump $self->{_harness} until (${$self->{_stdout}} ne '') || $self->{_timer}->is_expired;
+	die "expected to be blocked, but received: ${$self->{_stdout}}" if !$self->{_timer}->is_expired;
+}
+
+=pod
+
+=item $session->close()
+
+Close a PsqlSession connection.
+
+=cut
+
+sub close
+{
+	my ($self) = @_;
+	$self->{_timer}->start(5);
+	${$self->{_stdin}} .= "\\q\n";
+	finish $self->{_harness} or die "psql returned $?";
+	$self->{_timer}->reset;
+}
+
+=pod
+
+=back
+
+=cut
+
+1;
-- 
2.20.1

From b203630ec3e8e57d2dece0c76f1578377ecbc897 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sun, 14 Jun 2020 23:26:58 +1200
Subject: [PATCH v5 3/3] Add test for SERIALIZABLE on streaming replicas.

This is a (slightly) distributed version of the isolation test in
src/test/isolation/read-only-anomaly-3.spec.
---
 src/test/recovery/t/021_serializable.pl | 107 ++++++++++++++++++++++++
 1 file changed, 107 insertions(+)
 create mode 100644 src/test/recovery/t/021_serializable.pl

diff --git a/src/test/recovery/t/021_serializable.pl b/src/test/recovery/t/021_serializable.pl
new file mode 100644
index 0000000000..6ab20b8b42
--- /dev/null
+++ b/src/test/recovery/t/021_serializable.pl
@@ -0,0 +1,107 @@
+# Like src/test/isolation/specs/read-only-anomaly-3.spec, except that
+# s3 runs on a streaming replica server.
+
+use strict;
+use warnings;
+
+use PostgresNode;
+use PsqlSession;
+use TestLib;
+use Test::More tests => 13;
+
+my $node_primary = get_new_node('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE bank_account (id TEXT PRIMARY KEY, balance DECIMAL NOT NULL)");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO bank_account (id, balance) VALUES ('X', 0), ('Y', 0)");
+
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_replica->start;
+
+# We need three sessions.  s1 and s2 are on the primary, s3 is on the replica.
+my $s1 = PsqlSession->new($node_primary, "postgres");
+$s1->send("\\set PROMPT1 ''\n", 2);
+$s1->send("\\set PROMPT2 ''\n", 1);
+my $s2 = PsqlSession->new($node_primary, "postgres");
+$s2->send("\\set PROMPT1 ''\n", 2);
+$s2->send("\\set PROMPT2 ''\n", 1);
+my $s3 = PsqlSession->new($node_replica, "postgres");
+$s3->send("\\set PROMPT1 ''\n", 2);
+$s3->send("\\set PROMPT2 ''\n", 1);
+
+my @lines;
+my $result;
+@lines = $s1->send("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;\n", 2);
+shift @lines;
+is(shift @lines, "BEGIN");
+@lines = $s2->send("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;\n", 2);
+shift @lines;
+is(shift @lines, "BEGIN");
+@lines = $s3->send("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;\n", 2);
+shift @lines;
+is(shift @lines, "BEGIN");
+
+# s2rx
+@lines = $s2->send("SELECT balance FROM bank_account WHERE id = 'X';\n", 2);
+shift @lines;
+is(shift @lines, "0");
+
+# s2ry
+@lines = $s2->send("SELECT balance FROM bank_account WHERE id = 'Y';\n", 2);
+shift @lines;
+is(shift @lines, "0");
+
+# s1ry
+@lines = $s1->send("SELECT balance FROM bank_account WHERE id = 'Y';\n", 2);
+shift @lines;
+is(shift @lines, "0");
+
+# s1wy
+@lines = $s1->send("UPDATE bank_account SET balance = 20 WHERE id = 'Y';\n", 2);
+shift @lines;
+is(shift @lines, "UPDATE 1");
+
+# s1c
+@lines = $s1->send("COMMIT;\n", 2);
+shift @lines;
+is(shift @lines, "COMMIT");
+
+# now, we want to wait until the replica has replayed s1c
+my $until_lsn =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_replica->poll_query_until('postgres',
+    "SELECT (pg_last_wal_replay_lsn() - '$until_lsn'::pg_lsn) >= 0")
+  or die "standby never caught up";
+
+# s3r begins...
+@lines = $s3->send("SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id;\n", 1);
+$s3->check_is_blocked();
+
+# s2wx
+@lines = $s2->send("UPDATE bank_account SET balance = -11 WHERE id = 'X';\n", 2);
+shift @lines;
+is(shift @lines, "UPDATE 1");
+
+# s2c
+@lines = $s2->send("COMMIT;\n", 2);
+shift @lines;
+is(shift @lines, "COMMIT");
+
+# ... s3r completes
+@lines = $s3->send("", 2);
+is(shift @lines, "X|-11");
+is(shift @lines, "Y|20");
+
+# s3c
+@lines = $s3->send("COMMIT;\n", 2);
+shift @lines;
+is(shift @lines, "COMMIT");
+
-- 
2.20.1

Reply via email to