>From 37c90cc2563cf1517ac6d917a632ba4beb32bc8c Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Sat, 3 Jun 2017 02:06:22 +0200
Subject: [PATCH 1/2] Use virtual transaction instead of normal ones in
 exported snapshots

---
 doc/src/sgml/ref/set_transaction.sgml |  8 ++--
 src/backend/storage/ipc/procarray.c   | 11 +++--
 src/backend/storage/ipc/sinvaladt.c   | 21 +++++++---
 src/backend/storage/lmgr/predicate.c  | 26 +++++++-----
 src/backend/utils/time/snapmgr.c      | 76 +++++++++++++++++++++++++----------
 src/include/storage/predicate.h       |  4 +-
 src/include/storage/procarray.h       |  2 +-
 src/include/storage/sinvaladt.h       |  1 +
 8 files changed, 101 insertions(+), 48 deletions(-)

diff --git a/doc/src/sgml/ref/set_transaction.sgml b/doc/src/sgml/ref/set_transaction.sgml
index ca55a5b..116315f 100644
--- a/doc/src/sgml/ref/set_transaction.sgml
+++ b/doc/src/sgml/ref/set_transaction.sgml
@@ -221,9 +221,9 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
 <programlisting>
 BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
 SELECT pg_export_snapshot();
- pg_export_snapshot
---------------------
- 000003A1-1
+ pg_export_snapshot  
+---------------------
+ 00000003-0000001B-1
 (1 row)
 </programlisting>
 
@@ -233,7 +233,7 @@ SELECT pg_export_snapshot();
 
 <programlisting>
 BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-SET TRANSACTION SNAPSHOT '000003A1-1';
+SET TRANSACTION SNAPSHOT '00000003-0000001B-1';
 </programlisting></para>
  </refsect1>
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 8a71536..dfddfc4 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1793,14 +1793,15 @@ GetSnapshotData(Snapshot snapshot)
  * Returns TRUE if successful, FALSE if source xact is no longer running.
  */
 bool
-ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
+ProcArrayInstallImportedXmin(TransactionId xmin,
+							 VirtualTransactionId *sourcevxid)
 {
 	bool		result = false;
 	ProcArrayStruct *arrayP = procArray;
 	int			index;
 
 	Assert(TransactionIdIsNormal(xmin));
-	if (!TransactionIdIsNormal(sourcexid))
+	if (!sourcevxid)
 		return false;
 
 	/* Get lock so source xact can't end while we're doing this */
@@ -1817,8 +1818,10 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
 		if (pgxact->vacuumFlags & PROC_IN_VACUUM)
 			continue;
 
-		xid = pgxact->xid;		/* fetch just once */
-		if (xid != sourcexid)
+		/* We are only interested in the specific virtual transaction. */
+		if (proc->backendId != sourcevxid->backendId)
+			continue;
+		if (proc->lxid != sourcevxid->localTransactionId)
 			continue;
 
 		/*
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index 18302ca..78ad145 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -193,6 +193,7 @@ static SISeg *shmInvalBuffer;	/* pointer to the shared inval buffer */
 
 
 static LocalTransactionId nextLocalTransactionId;
+static LocalTransactionId lastLocalTransactionId = InvalidLocalTransactionId;
 
 static void CleanupInvalidationState(int status, Datum arg);
 
@@ -764,17 +765,27 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
  * within a short interval, successive procs occupying the same backend ID
  * slot should use a consecutive sequence of local IDs, which is implemented
  * by copying nextLocalTransactionId as seen above.
+ *
+ * The return value of this function is remembered for future use.
  */
 LocalTransactionId
 GetNextLocalTransactionId(void)
 {
-	LocalTransactionId result;
-
 	/* loop to avoid returning InvalidLocalTransactionId at wraparound */
 	do
 	{
-		result = nextLocalTransactionId++;
-	} while (!LocalTransactionIdIsValid(result));
+		lastLocalTransactionId = nextLocalTransactionId++;
+	} while (!LocalTransactionIdIsValid(lastLocalTransactionId));
 
-	return result;
+	return lastLocalTransactionId;
+}
+
+/*
+ * Get the last local LocalTransactionId which was allocated by
+ * GetNextLocalTransactionId
+ */
+LocalTransactionId
+GetLastLocalTransactionId(void)
+{
+	return lastLocalTransactionId;
 }
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 27c4af9..bce505a 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -148,7 +148,7 @@
  * predicate lock maintenance
  *		GetSerializableTransactionSnapshot(Snapshot snapshot)
  *		SetSerializableTransactionSnapshot(Snapshot snapshot,
- *										   TransactionId sourcexid)
+ *										   VirtualTransactionId *sourcevxid)
  *		RegisterPredicateLockingXid(void)
  *		PredicateLockRelation(Relation relation, Snapshot snapshot)
  *		PredicateLockPage(Relation relation, BlockNumber blkno,
@@ -434,7 +434,8 @@ static uint32 predicatelock_hash(const void *key, Size keysize);
 static void SummarizeOldestCommittedSxact(void);
 static Snapshot GetSafeSnapshot(Snapshot snapshot);
 static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
-									  TransactionId sourcexid);
+									  VirtualTransactionId *sourcevxid,
+									  int sourcepid);
 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
 						  PREDICATELOCKTARGETTAG *parent);
@@ -1510,7 +1511,7 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * one passed to it, but we avoid assuming that here.
 		 */
 		snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
-													   InvalidTransactionId);
+														 NULL, InvalidPid);
 
 		if (MySerializableXact == InvalidSerializableXact)
 			return snapshot;	/* no concurrent r/w xacts; it's safe */
@@ -1643,7 +1644,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 		return GetSafeSnapshot(snapshot);
 
 	return GetSerializableTransactionSnapshotInt(snapshot,
-												 InvalidTransactionId);
+												 NULL, InvalidPid);
 }
 
 /*
@@ -1658,7 +1659,8 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
  */
 void
 SetSerializableTransactionSnapshot(Snapshot snapshot,
-								   TransactionId sourcexid)
+								   VirtualTransactionId *sourcevxid,
+								   int sourcepid)
 {
 	Assert(IsolationIsSerializable());
 
@@ -1673,7 +1675,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
 
-	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid);
+	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid,
+												 sourcepid);
 }
 
 /*
@@ -1687,7 +1690,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
  */
 static Snapshot
 GetSerializableTransactionSnapshotInt(Snapshot snapshot,
-									  TransactionId sourcexid)
+									  VirtualTransactionId *sourcevxid,
+									  int sourcepid)
 {
 	PGPROC	   *proc;
 	VirtualTransactionId vxid;
@@ -1741,17 +1745,17 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 	} while (!sxact);
 
 	/* Get the snapshot, or check that it's safe to use */
-	if (!TransactionIdIsValid(sourcexid))
+	if (!sourcevxid)
 		snapshot = GetSnapshotData(snapshot);
-	else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid))
+	else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcevxid))
 	{
 		ReleasePredXact(sxact);
 		LWLockRelease(SerializableXactHashLock);
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("could not import the requested snapshot"),
-			   errdetail("The source transaction %u is not running anymore.",
-						 sourcexid)));
+			errdetail("The source process with pid %d is not running anymore.",
+						sourcepid)));
 	}
 
 	/*
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index b3d4fe3..ba1e78e 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -58,6 +58,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/sinval.h"
+#include "storage/sinvaladt.h"
 #include "storage/spin.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
@@ -211,9 +212,9 @@ static Snapshot FirstXactSnapshot = NULL;
 
 /* Define pathname of exported-snapshot files */
 #define SNAPSHOT_EXPORT_DIR "pg_snapshots"
-#define XactExportFilePath(path, xid, num, suffix) \
-	snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%d%s", \
-			 xid, num, suffix)
+#define XactExportFilePath(path, backenId, lxid, num, suffix) \
+	snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d%s", \
+			 backenId, lxid, num, suffix)
 
 /* Current xact's exported snapshots (a list of Snapshot structs) */
 static List *exportedSnapshots = NIL;
@@ -558,8 +559,8 @@ SnapshotSetCommandId(CommandId curcid)
  * in GetTransactionSnapshot.
  */
 static void
-SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
-					   PGPROC *sourceproc)
+SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
+					   int sourcepid, PGPROC *sourceproc)
 {
 	/* Caller should have checked this already */
 	Assert(!FirstSnapshotSet);
@@ -617,12 +618,12 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
 					 errmsg("could not import the requested snapshot"),
 			   errdetail("The source transaction is not running anymore.")));
 	}
-	else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
+	else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("could not import the requested snapshot"),
-			   errdetail("The source transaction %u is not running anymore.",
-						 sourcexid)));
+			errdetail("The source process with pid %d is not running anymore.",
+						sourcepid)));
 
 	/*
 	 * In transaction-snapshot mode, the first snapshot must live until end of
@@ -632,7 +633,8 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
 	if (IsolationUsesXactSnapshot())
 	{
 		if (IsolationIsSerializable())
-			SetSerializableTransactionSnapshot(CurrentSnapshot, sourcexid);
+			SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
+											   sourcepid);
 		/* Make a saved copy */
 		CurrentSnapshot = CopySnapshot(CurrentSnapshot);
 		FirstXactSnapshot = CurrentSnapshot;
@@ -1075,7 +1077,6 @@ AtEOXact_Snapshot(bool isCommit, bool resetXmin)
 	 */
 	if (exportedSnapshots != NIL)
 	{
-		TransactionId myxid = GetTopTransactionId();
 		int			i;
 		char		buf[MAXPGPATH];
 		ListCell   *lc;
@@ -1087,7 +1088,8 @@ AtEOXact_Snapshot(bool isCommit, bool resetXmin)
 		 */
 		for (i = 1; i <= list_length(exportedSnapshots); i++)
 		{
-			XactExportFilePath(buf, myxid, i, "");
+			XactExportFilePath(buf, MyProc->backendId,
+							   GetLastLocalTransactionId(), i, "");
 			if (unlink(buf))
 				elog(WARNING, "could not unlink file \"%s\": %m", buf);
 		}
@@ -1183,9 +1185,9 @@ ExportSnapshot(Snapshot snapshot)
 	 */
 
 	/*
-	 * This will assign a transaction ID if we do not yet have one.
+	 * Get our transaction ID if there is one, to include in the snapshot.
 	 */
-	topXid = GetTopTransactionId();
+	topXid = GetTopTransactionIdIfAny();
 
 	/*
 	 * We cannot export a snapshot from a subtransaction because there's no
@@ -1226,7 +1228,8 @@ ExportSnapshot(Snapshot snapshot)
 	 */
 	initStringInfo(&buf);
 
-	appendStringInfo(&buf, "xid:%u\n", topXid);
+	appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
+	appendStringInfo(&buf, "pid:%d\n", MyProcPid);
 	appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
 	appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
 	appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
@@ -1245,7 +1248,8 @@ ExportSnapshot(Snapshot snapshot)
 	 * xmax.  (We need not make the same check for subxip[] members, see
 	 * snapshot.h.)
 	 */
-	addTopXid = TransactionIdPrecedes(topXid, snapshot->xmax) ? 1 : 0;
+	addTopXid = (TransactionIdIsValid(topXid) &&
+		TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
 	appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
 	for (i = 0; i < snapshot->xcnt; i++)
 		appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
@@ -1276,7 +1280,8 @@ ExportSnapshot(Snapshot snapshot)
 	 * ensures that no other backend can read an incomplete file
 	 * (ImportSnapshot won't allow it because of its valid-characters check).
 	 */
-	XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp");
+	XactExportFilePath(pathtmp, MyProc->backendId, MyProc->lxid,
+					   list_length(exportedSnapshots), ".tmp");
 	if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
 		ereport(ERROR,
 				(errcode_for_file_access(),
@@ -1298,7 +1303,8 @@ ExportSnapshot(Snapshot snapshot)
 	 * Now that we have written everything into a .tmp file, rename the file
 	 * to remove the .tmp suffix.
 	 */
-	XactExportFilePath(path, topXid, list_length(exportedSnapshots), "");
+	XactExportFilePath(path, MyProc->backendId, MyProc->lxid,
+					   list_length(exportedSnapshots), "");
 
 	if (rename(pathtmp, path) < 0)
 		ereport(ERROR,
@@ -1384,6 +1390,30 @@ parseXidFromText(const char *prefix, char **s, const char *filename)
 	return val;
 }
 
+static void
+parseVxidFromText(const char *prefix, char **s, const char *filename,
+				  VirtualTransactionId *vxid)
+{
+	char	   *ptr = *s;
+	int			prefixlen = strlen(prefix);
+
+	if (strncmp(ptr, prefix, prefixlen) != 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+				 errmsg("invalid snapshot data in file \"%s\"", filename)));
+	ptr += prefixlen;
+	if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+				 errmsg("invalid snapshot data in file \"%s\"", filename)));
+	ptr = strchr(ptr, '\n');
+	if (!ptr)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+				 errmsg("invalid snapshot data in file \"%s\"", filename)));
+	*s = ptr + 1;
+}
+
 /*
  * ImportSnapshot
  *		Import a previously exported snapshot.  The argument should be a
@@ -1399,7 +1429,8 @@ ImportSnapshot(const char *idstr)
 	char	   *filebuf;
 	int			xcnt;
 	int			i;
-	TransactionId src_xid;
+	VirtualTransactionId src_vxid;
+	int			src_pid;
 	Oid			src_dbid;
 	int			src_isolevel;
 	bool		src_readonly;
@@ -1463,7 +1494,8 @@ ImportSnapshot(const char *idstr)
 	 */
 	memset(&snapshot, 0, sizeof(snapshot));
 
-	src_xid = parseXidFromText("xid:", &filebuf, path);
+	parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
+	src_pid = parseIntFromText("pid:", &filebuf, path);
 	/* we abuse parseXidFromText a bit here ... */
 	src_dbid = parseXidFromText("dbid:", &filebuf, path);
 	src_isolevel = parseIntFromText("iso:", &filebuf, path);
@@ -1513,7 +1545,7 @@ ImportSnapshot(const char *idstr)
 	 * don't trouble to check the array elements, just the most critical
 	 * fields.
 	 */
-	if (!TransactionIdIsNormal(src_xid) ||
+	if (!VirtualTransactionIdIsValid(src_vxid) ||
 		!OidIsValid(src_dbid) ||
 		!TransactionIdIsNormal(snapshot.xmin) ||
 		!TransactionIdIsNormal(snapshot.xmax))
@@ -1554,7 +1586,7 @@ ImportSnapshot(const char *idstr)
 			  errmsg("cannot import a snapshot from a different database")));
 
 	/* OK, install the snapshot */
-	SetTransactionSnapshot(&snapshot, src_xid, NULL);
+	SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
 }
 
 /*
@@ -2141,5 +2173,5 @@ RestoreSnapshot(char *start_address)
 void
 RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
 {
-	SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc);
+	SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc);
 }
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index 8f9ea29..941ba71 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -14,6 +14,7 @@
 #ifndef PREDICATE_H
 #define PREDICATE_H
 
+#include "storage/lock.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
 
@@ -46,7 +47,8 @@ extern bool PageIsPredicateLocked(Relation relation, BlockNumber blkno);
 /* predicate lock maintenance */
 extern Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot);
 extern void SetSerializableTransactionSnapshot(Snapshot snapshot,
-								   TransactionId sourcexid);
+								   VirtualTransactionId *sourcevxid,
+								   int sourcepid);
 extern void RegisterPredicateLockingXid(TransactionId xid);
 extern void PredicateLockRelation(Relation relation, Snapshot snapshot);
 extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot);
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 22955a7..5cf8ff7 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -82,7 +82,7 @@ extern int	GetMaxSnapshotSubxidCount(void);
 extern Snapshot GetSnapshotData(Snapshot snapshot);
 
 extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
-							 TransactionId sourcexid);
+							 VirtualTransactionId *sourcevxid);
 extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
 
 extern RunningTransactions GetRunningTransactionData(void);
diff --git a/src/include/storage/sinvaladt.h b/src/include/storage/sinvaladt.h
index 07ac2f8..df98ee4 100644
--- a/src/include/storage/sinvaladt.h
+++ b/src/include/storage/sinvaladt.h
@@ -39,5 +39,6 @@ extern int	SIGetDataEntries(SharedInvalidationMessage *data, int datasize);
 extern void SICleanupQueue(bool callerHasWriteLock, int minFree);
 
 extern LocalTransactionId GetNextLocalTransactionId(void);
+extern LocalTransactionId GetLastLocalTransactionId(void);
 
 #endif   /* SINVALADT_H */
-- 
2.7.4

