From 19854e30d9122ffd4a948e61d3fd9a5add208d10 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Fri, 21 Sep 2018 01:49:22 +1200
Subject: [PATCH] SERIALIZABLE READ ONLY DEFERRABLE for hot standbys.

Work in progress!

Author: Thomas Munro, based on an idea from Kevin Grittner
Discussion: https://postgr.es/m/CAEepm%3D2b9TV%2BvJ4UeSBixDrW7VUiTjxPwWq8K3QwFSWx0pTXHQ%40mail.gmail.com
---
 doc/src/sgml/high-availability.sgml       |   9 -
 doc/src/sgml/mvcc.sgml                    |  21 --
 doc/src/sgml/ref/set_transaction.sgml     |  36 +++
 src/backend/access/rmgrdesc/xactdesc.c    |  56 ++++
 src/backend/access/transam/xact.c         |  73 ++++-
 src/backend/access/transam/xlog.c         |   7 +
 src/backend/commands/variable.c           |   8 -
 src/backend/storage/lmgr/predicate.c      | 337 ++++++++++++++++++++--
 src/backend/storage/lmgr/proc.c           |   5 +
 src/include/access/xact.h                 |  17 +-
 src/include/access/xlogdefs.h             |  12 +
 src/include/storage/predicate.h           |   8 +-
 src/include/storage/predicate_internals.h |  12 +
 src/include/storage/proc.h                |   5 +
 14 files changed, 535 insertions(+), 71 deletions(-)

diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 8cb77f85ec0..1a26c26a71f 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -2449,15 +2449,6 @@ LOG:  database system is ready to accept read only connections
      your setting of <varname>max_prepared_transactions</varname> is 0.
     </para>
    </listitem>
-   <listitem>
-    <para>
-     The Serializable transaction isolation level is not yet available in hot
-     standby.  (See <xref linkend="xact-serializable"/> and
-     <xref linkend="serializable-consistency"/> for details.)
-     An attempt to set a transaction to the serializable isolation level in
-     hot standby mode will generate an error.
-    </para>
-   </listitem>
   </itemizedlist>
 
    </para>
diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml
index 73934e5cf37..49aa7d51c4a 100644
--- a/doc/src/sgml/mvcc.sgml
+++ b/doc/src/sgml/mvcc.sgml
@@ -1599,15 +1599,6 @@ SELECT pg_advisory_lock(q.id) FROM
     <para>
      See <xref linkend="xact-serializable"/> for performance suggestions.
     </para>
-
-    <warning>
-     <para>
-      This level of integrity protection using Serializable transactions
-      does not yet extend to hot standby mode (<xref linkend="hot-standby"/>).
-      Because of that, those using hot standby may want to use Repeatable
-      Read and explicit locking on the master.
-     </para>
-    </warning>
    </sect2>
 
    <sect2 id="non-serializable-consistency">
@@ -1700,18 +1691,6 @@ SELECT pg_advisory_lock(q.id) FROM
     could cause visible inconsistency between the contents of the target
     table and other tables in the database.
    </para>
-
-   <para>
-    Support for the Serializable transaction isolation level has not yet
-    been added to Hot Standby replication targets (described in
-    <xref linkend="hot-standby"/>).  The strictest isolation level currently
-    supported in hot standby mode is Repeatable Read.  While performing all
-    permanent database writes within Serializable transactions on the
-    master will ensure that all standbys will eventually reach a consistent
-    state, a Repeatable Read transaction run on the standby can sometimes
-    see a transient state that is inconsistent with any serial execution
-    of the transactions on the master.
-   </para>
   </sect1>
 
   <sect1 id="locking-indexes">
diff --git a/doc/src/sgml/ref/set_transaction.sgml b/doc/src/sgml/ref/set_transaction.sgml
index 43b1c6c892b..0c7d8b22710 100644
--- a/doc/src/sgml/ref/set_transaction.sgml
+++ b/doc/src/sgml/ref/set_transaction.sgml
@@ -151,6 +151,42 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
    is well suited for long-running reports or backups.
   </para>
 
+  <para>
+    Serializable transactions cannot use the <literal>NOT DEFERRABLE</literal>
+    option when run on Hot Standby servers.  <literal>READ ONLY</literal>
+    and <literal>DEFERRABLE</literal> are the default settings for Hot Standby
+    servers, but may also be specified explicitly.  The first statement run in
+    a Serializable transaction on a Hot Standby server may need to wait until a
+    point in the transaction stream where it is not possible to see a transient
+    state that is inconsistent with any serial execution of the transactions on
+    the master server.  This may introduce a pause, but avoids the need to
+    coordinate predicate locking across multiple servers.
+  </para>
+
+  <para>
+    Serializable transactions cannot use the <literal>NOT DEFERRABLE</literal>
+    option when run on Hot Standby servers.  <literal>READ ONLY</literal>
+    and <literal>DEFERRABLE</literal> are the default settings for Hot Standby
+    servers, but may also be specified explicitly.  The first statement run in
+    a Serializable transaction on a Hot Standby server may need to wait until a
+    point in the transaction stream where it is not possible to see a transient
+    state that is inconsistent with any serial execution of the transactions on
+    the master server.  This may introduce a pause, but avoids the need to
+    coordinate predicate locking across multiple servers.
+  </para>
+
+  <para>
+    Serializable transactions cannot use the <literal>NOT DEFERRABLE</literal>
+    option when run on Hot Standby servers.  <literal>READ ONLY</literal>
+    and <literal>DEFERRABLE</literal> are the default settings for Hot Standby
+    servers, but may also be specified explicitly.  The first statement run in
+    a Serializable transaction on a Hot Standby server may need to wait until a
+    point in the transaction stream where it is not possible to see a transient
+    state that is inconsistent with any serial execution of the transactions on
+    the master server.  This may introduce a pause, but avoids the need to
+    coordinate predicate locking across multiple servers.
+  </para>
+
   <para>
    The <literal>SET TRANSACTION SNAPSHOT</literal> command allows a new
    transaction to run with the same <firstterm>snapshot</firstterm> as an existing
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 6d5ebd475b4..e4ab28b9eab 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -123,6 +123,17 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_origin);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xl_snapshot_safety =
+			(xl_xact_snapshot_safety *) data;
+
+		parsed->snapshot_token = xl_snapshot_safety->token;
+		parsed->snapshot_safety = xl_snapshot_safety->safety;
+
+		data += sizeof(xl_xact_snapshot_safety);
+	}
 }
 
 void
@@ -209,6 +220,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 	}
 }
 
+static const char *
+xact_snapshot_safety_to_string(SnapshotSafety snapshot_safety)
+{
+	const char *string = "<unknown>";
+
+	switch(snapshot_safety)
+	{
+	case SNAPSHOT_SAFE:
+		string = "SNAPSHOT_SAFE";
+		break;
+	case SNAPSHOT_UNSAFE:
+		string = "SNAPSHOT_UNSAFE";
+		break;
+	case SNAPSHOT_SAFETY_UNKNOWN:
+		string = "SNAPSHOT_SAFETY_UNKNOWN";
+		break;
+	}
+
+	return string;
+}
+
 static void
 xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
 {
@@ -258,6 +290,13 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
 						 (uint32) parsed.origin_lsn,
 						 timestamptz_to_str(parsed.origin_timestamp));
 	}
+
+	if (parsed.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		appendStringInfo(buf, "; snapshot safety: %s, token: %lx",
+						 xact_snapshot_safety_to_string(parsed.snapshot_safety),
+						 parsed.snapshot_token);
+	}
 }
 
 static void
@@ -304,6 +343,14 @@ xact_desc_assignment(StringInfo buf, xl_xact_assignment *xlrec)
 		appendStringInfo(buf, " %u", xlrec->xsub[i]);
 }
 
+static void
+xact_desc_snapshot_safety(StringInfo buf, xl_xact_snapshot_safety *xlrec)
+{
+	appendStringInfo(buf, "snapshot safety: %s, token: %lx",
+					 xact_snapshot_safety_to_string(xlrec->safety),
+					 xlrec->token);
+}
+
 void
 xact_desc(StringInfo buf, XLogReaderState *record)
 {
@@ -335,6 +382,12 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
 		xact_desc_assignment(buf, xlrec);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec = (xl_xact_snapshot_safety *) rec;
+
+		xact_desc_snapshot_safety(buf, xlrec);
+	}
 }
 
 const char *
@@ -362,6 +415,9 @@ xact_identify(uint8 info)
 		case XLOG_XACT_ASSIGNMENT:
 			id = "ASSIGNMENT";
 			break;
+		case XLOG_XACT_SNAPSHOT_SAFETY:
+			id = "SNAPSHOT_SAFETY";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 875be180fe4..98136e05c6f 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1812,13 +1812,14 @@ StartTransaction(void)
 	{
 		s->startedInRecovery = true;
 		XactReadOnly = true;
+		XactDeferrable = true;
 	}
 	else
 	{
 		s->startedInRecovery = false;
 		XactReadOnly = DefaultXactReadOnly;
+		XactDeferrable = DefaultXactDeferrable;
 	}
-	XactDeferrable = DefaultXactDeferrable;
 	XactIsoLevel = DefaultXactIsoLevel;
 	forceSyncCommit = false;
 	MyXactFlags = 0;
@@ -5228,6 +5229,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
+	xl_xact_snapshot_safety xl_snapshot_safety;
 	uint8		info;
 
 	Assert(CritSectionCount > 0);
@@ -5306,6 +5308,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
+	if (IsolationIsSerializable())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_SNAPSHOT_SAFETY;
+		GetSnapshotSafetyAfterThisCommit(&xl_snapshot_safety.token,
+										 &xl_snapshot_safety.safety);
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
@@ -5354,6 +5363,10 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+		XLogRegisterData((char *) (&xl_snapshot_safety),
+						 sizeof(xl_xact_snapshot_safety));
+
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
@@ -5565,12 +5578,23 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 		TransactionIdAsyncCommitTree(
 									 xid, parsed->nsubxacts, parsed->subxacts, lsn);
 
+		/* Coordinate atomic update of snapshot safety and ProcArray. */
+		if (parsed->snapshot_token != 0)
+		{
+			BeginSnapshotSafetyReplay();
+			SetNewestSnapshotSafety(parsed->snapshot_token,
+									parsed->snapshot_safety);
+		}
+
 		/*
 		 * We must mark clog before we update the ProcArray.
 		 */
 		ExpireTreeKnownAssignedTransactionIds(
 											  xid, parsed->nsubxacts, parsed->subxacts, max_xid);
 
+		if (parsed->snapshot_token != 0)
+			CompleteSnapshotSafetyReplay();
+
 		/*
 		 * Send any cache invalidations attached to the commit. We must
 		 * maintain the same order of invalidation then release locks as
@@ -5724,6 +5748,46 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
 	DropRelationFiles(parsed->xnodes, parsed->nrels, true);
 }
 
+XLogRecPtr
+XactLogSnapshotSafetyRecord(SnapshotToken token, SnapshotSafety safety)
+{
+	XLogRecPtr result;
+	xl_xact_snapshot_safety snapshot_safety;
+
+	snapshot_safety.token = token;
+	snapshot_safety.safety = safety;
+
+	START_CRIT_SECTION();
+	XLogBeginInsert();
+	XLogRegisterData((char *) &snapshot_safety, sizeof(snapshot_safety));
+	result = XLogInsert(RM_XACT_ID, XLOG_XACT_SNAPSHOT_SAFETY);
+	END_CRIT_SECTION();
+
+	/*
+	 * TODO: How can we avoid having to flush again (after already flushing
+	 * for the commit)?  If we don't have this flush here, then they standby
+	 * has to wait a while to find out whether its snapshot is safe.
+	 */
+	XLogFlush(result);
+
+	return result;
+}
+
+static void
+xact_redo_snapshot_safety(xl_xact_snapshot_safety *snapshot_safety)
+{
+	/*
+	 * Any earlier COMMIT record must have carried a snapshot safety message
+	 * the same token as this record, and had safety ==
+	 * SNAPSHOT_SAFETY_UNKNOWN.  This new independent snapshot safety message
+	 * reports that the safety is now known.  We will wake any backend that is
+	 * waiting to learn if the snapshot is safe.
+	 */
+	if (standbyState >= STANDBY_INITIALIZED)
+		NotifyHypotheticalSnapshotSafety(snapshot_safety->token,
+										 snapshot_safety->safety);
+}
+
 void
 xact_redo(XLogReaderState *record)
 {
@@ -5797,6 +5861,13 @@ xact_redo(XLogReaderState *record)
 			ProcArrayApplyXidAssignment(xlrec->xtop,
 										xlrec->nsubxacts, xlrec->xsub);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec =
+			(xl_xact_snapshot_safety *) XLogRecGetData(record);
+
+		xact_redo_snapshot_safety(xlrec);
+	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3025d0badb8..fec37f67e1b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5177,6 +5177,8 @@ BootStrapXLOG(void)
 	checkPoint.newestCommitTsXid = InvalidTransactionId;
 	checkPoint.time = (pg_time_t) time(NULL);
 	checkPoint.oldestActiveXid = InvalidTransactionId;
+//	checkPoint.newestSnapshotToken = 0;
+//	checkPoint.newestSnapshotSafety = SNAPSHOT_SAFE;
 
 	ShmemVariableCache->nextXid = checkPoint.nextXid;
 	ShmemVariableCache->nextOid = checkPoint.nextOid;
@@ -5186,6 +5188,8 @@ BootStrapXLOG(void)
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
+//	SetNewestSnapshotSafety(checkPoint.newestSnapshotToken,
+//							checkPoint.newestSnapshotSafety);
 
 	/* Set up the XLOG page header */
 	page->xlp_magic = XLOG_PAGE_MAGIC;
@@ -6813,6 +6817,9 @@ StartupXLOG(void)
 					 checkPoint.newestCommitTsXid);
 	XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
 	XLogCtl->ckptXid = checkPoint.nextXid;
+	/* TODO: figure out checkpoint protocol */
+//	SetNewestSnapshotSafety(checkPoint.newestSnapshotToken,
+//							checkPoint.newestSnapshotSafety);
 
 	/*
 	 * Initialize replication slots, before there's a chance to remove
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index 9a754dae3fd..9c6083d0edc 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -564,14 +564,6 @@ check_XactIsoLevel(char **newval, void **extra, GucSource source)
 			GUC_check_errmsg("SET TRANSACTION ISOLATION LEVEL must not be called in a subtransaction");
 			return false;
 		}
-		/* Can't go to serializable mode while recovery is still active */
-		if (newXactIsoLevel == XACT_SERIALIZABLE && RecoveryInProgress())
-		{
-			GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED);
-			GUC_check_errmsg("cannot use serializable mode in a hot standby");
-			GUC_check_errhint("You can use REPEATABLE READ instead.");
-			return false;
-		}
 	}
 
 	*extra = malloc(sizeof(int));
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index e8390311d03..87bf45e918a 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -279,6 +279,7 @@
 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
+#define SxactIsHypothetical(sxact) (((sxact)->flags & SXACT_FLAG_HYPOTHETICAL) != 0)
 
 /*
  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
@@ -433,7 +434,8 @@ static void SummarizeOldestCommittedSxact(void);
 static Snapshot GetSafeSnapshot(Snapshot snapshot);
 static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 									  VirtualTransactionId *sourcevxid,
-									  int sourcepid);
+									  int sourcepid,
+									  SnapshotSafety *snapshot_safety);
 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
 						  PREDICATELOCKTARGETTAG *parent);
@@ -1186,6 +1188,9 @@ InitPredicateLocks(void)
 		PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
 		PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
 		PredXact->OldCommittedSxact->pid = 0;
+		SHMQueueInit(&PredXact->snapshotSafetyWaitList);
+		PredXact->NewestSnapshotToken = 0;
+		PredXact->NewestSnapshotSafety = SNAPSHOT_SAFE;
 	}
 	/* This never changes, so let's keep a local copy. */
 	OldCommittedSxact = PredXact->OldCommittedSxact;
@@ -1468,6 +1473,7 @@ static Snapshot
 GetSafeSnapshot(Snapshot origSnapshot)
 {
 	Snapshot	snapshot;
+	SnapshotSafety snapshot_safety;
 
 	Assert(XactReadOnly && XactDeferrable);
 
@@ -1480,10 +1486,36 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * one passed to it, but we avoid assuming that here.
 		 */
 		snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
-														 NULL, InvalidPid);
-
-		if (MySerializableXact == InvalidSerializableXact)
-			return snapshot;	/* no concurrent r/w xacts; it's safe */
+														 NULL, InvalidPid,
+														 &snapshot_safety);
+		if (RecoveryInProgress())
+		{
+			/*
+			 * Check if the most recently replayed COMMIT record was either
+			 * known to be safe because it had no concurrent r/w xacts on the
+			 * primary, or has subsequently been declared safe by a snapshot
+			 * safety record.
+			 */
+			if (snapshot_safety == SNAPSHOT_SAFE)
+			{
+				elog(WARNING, "got a safe snapshot!");
+				return snapshot;
+			}
+			else if (snapshot_safety == SNAPSHOT_UNSAFE)
+			{
+				/*
+				 * TODO:  This can only happen if the master ran out of memory
+				 * while trying to create a hypothetical transaction, right?
+				 * Should we wait or error out?
+				 */
+				continue;
+			}
+		}
+		else
+		{
+			if (MySerializableXact == InvalidSerializableXact)
+				return snapshot;	/* no concurrent r/w xacts; it's safe */
+	}
 
 		LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
 
@@ -1491,20 +1523,48 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * Wait for concurrent transactions to finish. Stop early if one of
 		 * them marked us as conflicted.
 		 */
-		MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
-		while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
-				 SxactIsROUnsafe(MySerializableXact)))
+		if (RecoveryInProgress())
 		{
-			LWLockRelease(SerializableXactHashLock);
-			ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
-			LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			/*
+			 * Running on a standby.  Wait for a the primary to tell us the
+			 * result of testing a hypothetical transaction whose
+			 * serializability matches the snapshot we have.
+			 */
+			Assert(snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN);
+			Assert(!SHMQueueIsDetached(&MyProc->safetyLinks));
+			while (MyProc->snapshotSafety == SNAPSHOT_SAFETY_UNKNOWN)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			if (MyProc->snapshotSafety == SNAPSHOT_SAFE)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
-		MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
-
-		if (!SxactIsROUnsafe(MySerializableXact))
+		else
 		{
-			LWLockRelease(SerializableXactHashLock);
-			break;				/* success */
+			/*
+			 * Running on primary.  Wait for a signal from one of the backends
+			 * that we possibly conflict with.
+			 */
+			MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
+			while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
+					 SxactIsROUnsafe(MySerializableXact)))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
+
+			if (!SxactIsROUnsafe(MySerializableXact))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
 
 		LWLockRelease(SerializableXactHashLock);
@@ -1519,12 +1579,64 @@ GetSafeSnapshot(Snapshot origSnapshot)
 	/*
 	 * Now we have a safe snapshot, so we don't need to do any further checks.
 	 */
-	Assert(SxactIsROSafe(MySerializableXact));
+	Assert(RecoveryInProgress() || SxactIsROSafe(MySerializableXact));
 	ReleasePredicateLocks(false);
 
 	return snapshot;
 }
 
+  /*
+   * When the primary server has determined the safety of a hypothetical
+   * snapshot which was previously reported as SNAPSHOT_SAFETY_UNKNOWN in a
+   * COMMIT record, it emits a WAL record that causes the recovery process on
+   * standbys to call this function.  Here, we will wake up any backend that is
+   * currently waiting in GetSafeSnapshot to learn about the safety of a
+   * snapshot taken after that point in the transaction stream.
+   */
+void
+NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety)
+{
+	PGPROC	   *proc;
+	PGPROC	   *next;
+
+	Assert(AmStartupProcess());
+	elog(LOG, "NotifyHypotheticalSnapshotSafety token = %ld, safety = %d", token, safety);
+
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+	/*
+	 * Walk the list of processes that are waiting in GetSafeSnapshot on a
+	 * standby, and find any that are waiting to learn the safety of a
+	 * snapshot taken at a point in time when this token appeared onthe most
+	 * recently replayed SSI transaction.  If we find any of those, tell them
+	 * the final status for and wake them up.
+	 */
+	proc = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+								   &PredXact->snapshotSafetyWaitList,
+								   offsetof(PGPROC, safetyLinks));
+	while (proc != NULL)
+	{
+		next = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+									   &proc->safetyLinks,
+									   offsetof(PGPROC, safetyLinks));
+		if (proc->waitSnapshotToken == token)
+		{
+			SHMQueueDelete(&proc->safetyLinks);
+			proc->snapshotSafety = safety;
+			ProcSendSignal(proc->pid);
+		}
+		proc = next;
+	}
+
+	/*
+	 * If this happens to be the most recently replayed snapshot token then
+	 * remember this safety value.
+	 */
+	if (PredXact->NewestSnapshotToken == token)
+		PredXact->NewestSnapshotSafety = safety;
+	LWLockRelease(SerializableXactHashLock);
+}
+
 /*
  * GetSafeSnapshotBlockingPids
  *		If the specified process is currently blocked in GetSafeSnapshot,
@@ -1593,16 +1705,17 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 
 	/*
 	 * Can't use serializable mode while recovery is still active, as it is,
-	 * for example, on a hot standby.  We could get here despite the check in
-	 * check_XactIsoLevel() if default_transaction_isolation is set to
-	 * serializable, so phrase the hint accordingly.
+	 * for example, on a hot standby, unless DEFERRABLE mode is active.  In
+	 * that case, DEFERRABLE is the default, so this error should should only
+	 * be reachable if the user has explicitly asked for NOT DEFERRABLE via
+	 * SET transaction_deferrable or SET/BEGIN TRANSACTION ISOLATION LEVEL.
 	 */
-	if (RecoveryInProgress())
+	if (RecoveryInProgress() && (!XactReadOnly || !XactDeferrable))
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("cannot use serializable mode in a hot standby"),
-				 errdetail("\"default_transaction_isolation\" is set to \"serializable\"."),
-				 errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default.")));
+				 errmsg("cannot use serializable not deferrable mode in a hot standby"),
+				 errdetail("Serializable transactions must be DEFERRABLE when run on hot standby servers."),
+				 errhint("You can use \"SET transaction_deferrable = true\", use DEFERRABLE when specifying the transaction isolation level, or avoid explicitly specifying NOT DEFERRABLE.")));
 
 	/*
 	 * A special optimization is available for SERIALIZABLE READ ONLY
@@ -1613,7 +1726,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 		return GetSafeSnapshot(snapshot);
 
 	return GetSerializableTransactionSnapshotInt(snapshot,
-												 NULL, InvalidPid);
+												 NULL, InvalidPid, NULL);
 }
 
 /*
@@ -1645,7 +1758,7 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
 				 errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
 
 	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid,
-												 sourcepid);
+												 sourcepid, NULL);
 }
 
 /*
@@ -1656,11 +1769,17 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
  * loaded up.  HOWEVER: to avoid race conditions, we must check that the
  * source xact is still running after we acquire SerializableXactHashLock.
  * We do that by calling ProcArrayInstallImportedXmin.
+ *
+ * If snapshot_safety is a non-NULL, then the safety of this snapshot if used
+ * on a standby server is written to it.  If it is SNAPSHOT_SAFE, then the
+ * snapshot may be safely used.  If it is SNAPSHOT_SAFETY_UNKNOWN, then the
+ * caller must wait for the safety to be announced in the WAL.
  */
 static Snapshot
 GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 									  VirtualTransactionId *sourcevxid,
-									  int sourcepid)
+									  int sourcepid,
+									  SnapshotSafety *snapshot_safety)
 {
 	PGPROC	   *proc;
 	VirtualTransactionId vxid;
@@ -1671,8 +1790,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 	/* We only do this for serializable transactions.  Once. */
 	Assert(MySerializableXact == InvalidSerializableXact);
 
-	Assert(!RecoveryInProgress());
-
 	/*
 	 * Since all parts of a serializable transaction must use the same
 	 * snapshot, it is too late to establish one after a parallel operation
@@ -1713,6 +1830,30 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 		}
 	} while (!sxact);
 
+	/*
+	 * Note the snapshot safety information for standbys.  This can be used to
+	 * know if the returned snapshot is already known to be safe/unsafe, or if
+	 * we must wait for notification of the final safety determination.
+	 */
+	if (snapshot_safety != NULL)
+	{
+		*snapshot_safety = PredXact->NewestSnapshotSafety;
+		if (*snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN)
+		{
+			/*
+			 * We must put the this process into the waitlist while we hold
+			 * the lock or there would be a race condition where we might miss
+			 * a notification.  The caller must wait for
+			 * MyProc->snapshotSafety to be set to a final value.
+			 */
+			MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+			MyProc->waitSnapshotToken = PredXact->NewestSnapshotToken;
+			if (SHMQueueIsDetached(&MyProc->safetyLinks))
+				SHMQueueInsertBefore(&PredXact->snapshotSafetyWaitList,
+									 &MyProc->safetyLinks);
+		}
+	}
+
 	/* Get the snapshot, or check that it's safe to use */
 	if (!sourcevxid)
 		snapshot = GetSnapshotData(snapshot);
@@ -3181,6 +3322,7 @@ SetNewSxactGlobalXmin(void)
 	for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
 	{
 		if (!SxactIsRolledBack(sxact)
+			&& !SxactIsHypothetical(sxact)
 			&& !SxactIsCommitted(sxact)
 			&& sxact != OldCommittedSxact)
 		{
@@ -3476,12 +3618,35 @@ ReleasePredicateLocks(bool isCommit)
 
 			/*
 			 * Wake up the process for a waiting DEFERRABLE transaction if we
-			 * now know it's either safe or conflicted.
+			 * now know it's either safe or conflicted.  This releases
+			 * SERIALIZABLE READ ONLY DEFERRABLE transactions on the primary.
 			 */
 			if (SxactIsDeferrableWaiting(roXact) &&
 				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
 				ProcSendSignal(roXact->pid);
 
+			/*
+			 * If a hypothetical transaction is now known to be safe or
+			 * unsafe, we can report that in the WAL for the benefit of
+			 * standbys and recycle it.  This releases SERIALIZABLE READ ONLY
+			 * DEFERRABLE transactions that are waiting for the status of this
+			 * particular hypothetical tranasactions on any standby that
+			 * replays it.
+			 */
+			if (SxactIsHypothetical(roXact) &&
+				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
+			{
+				SnapshotSafety safety;
+
+				if (SxactIsROSafe(roXact))
+					safety = SNAPSHOT_SAFE;
+				else
+					safety = SNAPSHOT_UNSAFE;
+				XactLogSnapshotSafetyRecord(roXact->SeqNo.lastCommitBeforeSnapshot,
+											safety);
+				ReleasePredXact(roXact);
+			}
+
 			possibleUnsafeConflict = nextConflict;
 		}
 	}
@@ -4741,6 +4906,80 @@ PreCommit_CheckForSerializationFailure(void)
 	MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
 	MySerializableXact->flags |= SXACT_FLAG_PREPARED;
 
+	/*
+	 * For the benefit of hot standby servers that want to take a safe
+	 * SERIALIZABLE READ ONLY DEFERRABLE snapshot, we will check whether a
+	 * hypothetical read-only serializable transaction that starts after this
+	 * transaction commits would be safe.
+	 */
+	if (PredXact->WritableSxactCount == (XactReadOnly ? 0 : 1))
+	{
+		/*
+		 * There are no concurrent writable SERIALIZABLE transactions.  A
+		 * read-only snapshot taken immediately after this one commits is
+		 * safe.
+		 */
+		MySerializableXact->snapshotSafetyAfterThisCommit = SNAPSHOT_SAFE;
+	}
+	else
+	{
+		SERIALIZABLEXACT *sxact;
+		SERIALIZABLEXACT *othersxact;
+
+		/*
+		 * We can't yet determine whether a read-only transaction beginning
+		 * now would be safe.  Create a hypothetical SERIALIZABLEXACT and let
+		 * ReleasePredicateLocks report on its safety once that can be
+		 * determined.
+		 */
+		sxact = CreatePredXact();
+		if (sxact == NULL)
+		{
+			/* Out of space.  Don't allow SERIALIZABLE on standbys. */
+			MySerializableXact->snapshotSafetyAfterThisCommit =
+				SNAPSHOT_UNSAFE;
+		}
+		else
+		{
+			SetInvalidVirtualTransactionId(sxact->vxid);
+			sxact->SeqNo.lastCommitBeforeSnapshot =
+				MySerializableXact->prepareSeqNo;
+			sxact->prepareSeqNo = InvalidSerCommitSeqNo;
+			sxact->commitSeqNo = InvalidSerCommitSeqNo;
+			SHMQueueInit(&(sxact->outConflicts));
+			SHMQueueInit(&(sxact->inConflicts));
+			SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+			sxact->topXid = InvalidTransactionId;
+			sxact->finishedBefore = InvalidTransactionId;
+			sxact->xmin = MySerializableXact->xmin;
+			sxact->pid = InvalidPid;
+			SHMQueueInit(&(sxact->predicateLocks));
+			SHMQueueElemInit(&(sxact->finishedLink));
+			sxact->flags = SXACT_FLAG_READ_ONLY | SXACT_FLAG_HYPOTHETICAL;
+
+			/* Register concurrent r/w transactions as possible conflicts. */
+			for (othersxact = FirstPredXact();
+				 othersxact != NULL;
+				 othersxact = NextPredXact(othersxact))
+			{
+				if (othersxact != MySerializableXact
+					&& !SxactIsCommitted(othersxact)
+					&& !SxactIsDoomed(othersxact)
+					&& !SxactIsReadOnly(othersxact))
+				{
+					SetPossibleUnsafeConflict(sxact, othersxact);
+				}
+			}
+
+			/*
+			 * The status will be reported in a later WAL record once it has
+			 * been determined.
+			 */
+			MySerializableXact->snapshotSafetyAfterThisCommit =
+				SNAPSHOT_SAFETY_UNKNOWN;
+		}
+	}
+
 	LWLockRelease(SerializableXactHashLock);
 }
 
@@ -5003,3 +5242,41 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
 		CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
 	}
 }
+
+/*
+ * Accessor for the hypothetical snapshot safety information needed for commit
+ * records generated on primary servers.  This is used by XlactLogCommitRecord
+ * to receive the safety level computed by
+ * PreCommit_CheckForSerializationFailure in a committing SSI transaction.
+ */
+void
+GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety)
+{
+	*token = MySerializableXact->prepareSeqNo;
+	*safety = MySerializableXact->snapshotSafetyAfterThisCommit;
+}
+
+void
+BeginSnapshotSafetyReplay(void)
+{
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+}
+
+/*
+ * Used in recovery when replaying commit records.  On a hot standby, these
+ * values must be set atomically with ProcArray updates, mirroring the code
+ * in GetSerializableTransactionSnapshotInt.  This is done by wrapping both
+ * in BeginSnapshotSafetyReplay/CompleteSnapshotSafetyReplay.
+ */
+void
+SetNewestSnapshotSafety(SnapshotToken token, SnapshotSafety safety)
+{
+	PredXact->NewestSnapshotToken = token;
+	PredXact->NewestSnapshotSafety = safety;
+}
+
+void
+CompleteSnapshotSafetyReplay(void)
+{
+	LWLockRelease(SerializableXactHashLock);
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 6f9aaa52faf..33e7ace17bb 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -398,6 +398,11 @@ InitProcess(void)
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
 	SHMQueueElemInit(&(MyProc->syncRepLinks));
 
+	/* Initialize fields for SERIALIZABLE on standbys */
+	MyProc->waitSnapshotToken = 0;
+	MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+	SHMQueueElemInit(&(MyProc->safetyLinks));
+
 	/* Initialize fields for group XID clearing. */
 	MyProc->procArrayGroupMember = false;
 	MyProc->procArrayGroupMemberXid = InvalidTransactionId;
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 083e879d5c3..da632445c68 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -143,7 +143,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_SNAPSHOT_SAFETY   0x60
 /* free opcode 0x70 */
 
 /* mask for filtering opcodes out of xl_info */
@@ -164,6 +164,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_ORIGIN			(1U << 5)
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
 #define XACT_XINFO_HAS_GID				(1U << 7)
+#define XACT_XINFO_HAS_SNAPSHOT_SAFETY	(1U << 8)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -259,6 +260,12 @@ typedef struct xl_xact_origin
 	TimestampTz origin_timestamp;
 } xl_xact_origin;
 
+typedef struct xl_xact_snapshot_safety
+{
+	SnapshotToken token;
+	SnapshotSafety safety;
+} xl_xact_snapshot_safety;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -271,6 +278,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
 	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
+	/* xl_xact_snapshot_safety follows if XINFO_HAS_SNAPSHOT_SAFETY */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -318,6 +326,9 @@ typedef struct xl_xact_parsed_commit
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
+
+	SnapshotToken snapshot_token;
+	SnapshotSafety snapshot_safety;
 } xl_xact_parsed_commit;
 
 typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
@@ -416,6 +427,10 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nrels, RelFileNode *rels,
 				   int xactflags, TransactionId twophase_xid,
 				   const char *twophase_gid);
+
+extern XLogRecPtr XactLogSnapshotSafetyRecord(SnapshotToken token,
+											  SnapshotSafety safety);
+
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 0a48d1cfb40..c747a126865 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -50,6 +50,18 @@ typedef uint32 TimeLineID;
  */
 typedef uint16 RepOriginId;
 
+/*
+ * Snapshot safety information using to control SERIALIAZABLE on standby
+ * servers appears in checkpoints, so we define the types used here.
+ */
+typedef uint64 SnapshotToken;
+typedef enum SnapshotSafety
+{
+	SNAPSHOT_SAFE,
+	SNAPSHOT_UNSAFE,
+	SNAPSHOT_SAFETY_UNKNOWN
+} SnapshotSafety;
+
 /*
  *	Because O_DIRECT bypasses the kernel buffers, and because we never
  *	read those buffers except during crash recovery or if wal_level != minimal,
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index 6a3464daa1e..a84f80d88fd 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -30,7 +30,6 @@ extern int	max_predicate_locks_per_page;
 /* Number of SLRU buffers to use for predicate locking */
 #define NUM_OLDSERXID_BUFFERS	16
 
-
 /*
  * function prototypes
  */
@@ -74,4 +73,11 @@ extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
 extern void predicatelock_twophase_recover(TransactionId xid, uint16 info,
 							   void *recdata, uint32 len);
 
+/* hypothetical snapshot safety support, allowing SERIALIZABLE on standbys */
+extern void GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety);
+extern void NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety);
+extern void BeginSnapshotSafetyReplay(void);
+extern void SetNewestSnapshotSafety(SnapshotToken token, SnapshotSafety safety);
+extern void CompleteSnapshotSafetyReplay(void);
+
 #endif							/* PREDICATE_H */
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 0f736d37dff..db69fba2925 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -97,6 +97,12 @@ typedef struct SERIALIZABLEXACT
 	 */
 	SHM_QUEUE	possibleUnsafeConflicts;
 
+	/*
+	 * for committing transactions: would a hypothetical read-only snapshot
+	 * taken immediately after this transaction commits be safe?
+	 */
+	SnapshotSafety snapshotSafetyAfterThisCommit;
+
 	TransactionId topXid;		/* top level xid for the transaction, if one
 								 * exists; else invalid */
 	TransactionId finishedBefore;	/* invalid means still running; else the
@@ -123,6 +129,7 @@ typedef struct SERIALIZABLEXACT
 #define SXACT_FLAG_RO_UNSAFE			0x00000100
 #define SXACT_FLAG_SUMMARY_CONFLICT_IN	0x00000200
 #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400
+#define SXACT_FLAG_HYPOTHETICAL			0x00000800
 
 /*
  * The following types are used to provide an ad hoc list for holding
@@ -172,6 +179,11 @@ typedef struct PredXactListData
 												 * seq no */
 	SERIALIZABLEXACT *OldCommittedSxact;	/* shared copy of dummy sxact */
 
+	/* Tracking of snapshot safety on standby servers. */
+	SHM_QUEUE	snapshotSafetyWaitList;
+	SnapshotToken NewestSnapshotToken;
+	SnapshotSafety NewestSnapshotSafety;
+
 	PredXactListElement element;
 }			PredXactListData;
 
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index cb613c8076e..1497cdf66c4 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -152,6 +152,11 @@ struct PGPROC
 	int			syncRepState;	/* wait state for sync rep */
 	SHM_QUEUE	syncRepLinks;	/* list link if process is in syncrep queue */
 
+	/* Info to allow standbys to wait for a safe SERIALIZABLE snapshot */
+	SnapshotToken waitSnapshotToken;
+	SnapshotSafety snapshotSafety;	/* space for result */
+	SHM_QUEUE	safetyLinks;	/* list link for GetSafeSnapshot */
+
 	/*
 	 * All PROCLOCK objects for locks held or awaited by this backend are
 	 * linked into one of these lists, according to the partition number of
-- 
2.17.0

