Rebased onto current master (fb544735f1).

--
Andrey Lepikhov
Postgres Professional
https://postgrespro.com
The Russian Postgres Company
>From 29183c42a8ae31b830ab5af0dfcfdaadd6229700 Mon Sep 17 00:00:00 2001
From: "Andrey V. Lepikhov" <a.lepik...@postgrespro.ru>
Date: Tue, 12 May 2020 08:29:54 +0500
Subject: [PATCH 1/3] GlobalCSNLog-SLRU-v3

---
 src/backend/access/transam/Makefile         |   1 +
 src/backend/access/transam/global_csn_log.c | 439 ++++++++++++++++++++
 src/backend/access/transam/twophase.c       |   1 +
 src/backend/access/transam/varsup.c         |   2 +
 src/backend/access/transam/xlog.c           |  12 +
 src/backend/storage/ipc/ipci.c              |   3 +
 src/backend/storage/ipc/procarray.c         |   3 +
 src/backend/storage/lmgr/lwlocknames.txt    |   1 +
 src/backend/tcop/postgres.c                 |   1 +
 src/backend/utils/misc/guc.c                |   9 +
 src/backend/utils/probes.d                  |   2 +
 src/bin/initdb/initdb.c                     |   3 +-
 src/include/access/global_csn_log.h         |  30 ++
 src/include/storage/lwlock.h                |   1 +
 src/include/utils/snapshot.h                |   3 +
 15 files changed, 510 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/access/transam/global_csn_log.c
 create mode 100644 src/include/access/global_csn_log.h

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de72..60ff8b141e 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
 OBJS = \
 	clog.o \
 	commit_ts.o \
+	global_csn_log.o \
 	generic_xlog.o \
 	multixact.o \
 	parallel.o \
diff --git a/src/backend/access/transam/global_csn_log.c b/src/backend/access/transam/global_csn_log.c
new file mode 100644
index 0000000000..6f7fded350
--- /dev/null
+++ b/src/backend/access/transam/global_csn_log.c
@@ -0,0 +1,439 @@
+/*-----------------------------------------------------------------------------
+ *
+ * global_csn_log.c
+ *		Track global commit sequence numbers of finished transactions
+ *
+ * Implementation of cross-node transaction isolation relies on commit sequence
+ * number (CSN) based visibility rules.  This module provides SLRU to store
+ * CSN for each transaction.  This mapping need to be kept only for xid's
+ * greater then oldestXid, but that can require arbitrary large amounts of
+ * memory in case of long-lived transactions.  Because of same lifetime and
+ * persistancy requirements this module is quite similar to subtrans.c
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/global_csn_log.c
+ *
+ *-----------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/global_csn_log.h"
+#include "access/slru.h"
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "utils/snapmgr.h"
+
+bool track_global_snapshots;
+
+/*
+ * Defines for GlobalCSNLog page sizes.  A page is the same BLCKSZ as is used
+ * everywhere else in Postgres.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * GlobalCSNLog page numbering also wraps around at
+ * 0xFFFFFFFF/GLOBAL_CSN_LOG_XACTS_PER_PAGE, and GlobalCSNLog segment numbering at
+ * 0xFFFFFFFF/CLOG_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
+ * explicit notice of that fact in this module, except when comparing segment
+ * and page numbers in TruncateGlobalCSNLog (see GlobalCSNLogPagePrecedes).
+ */
+
+/* We store the commit GlobalCSN for each xid */
+#define GCSNLOG_XACTS_PER_PAGE (BLCKSZ / sizeof(GlobalCSN))
+
+#define TransactionIdToPage(xid)	((xid) / (TransactionId) GCSNLOG_XACTS_PER_PAGE)
+#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) GCSNLOG_XACTS_PER_PAGE)
+
+/*
+ * Link to shared-memory data structures for CLOG control
+ */
+static SlruCtlData GlobalCSNLogCtlData;
+#define GlobalCsnlogCtl (&GlobalCSNLogCtlData)
+
+static int	ZeroGlobalCSNLogPage(int pageno);
+static bool GlobalCSNLogPagePrecedes(int page1, int page2);
+static void GlobalCSNLogSetPageStatus(TransactionId xid, int nsubxids,
+									  TransactionId *subxids,
+									  GlobalCSN csn, int pageno);
+static void GlobalCSNLogSetCSNInSlot(TransactionId xid, GlobalCSN csn,
+									  int slotno);
+
+/*
+ * GlobalCSNLogSetCSN
+ *
+ * Record GlobalCSN of transaction and its subtransaction tree.
+ *
+ * xid is a single xid to set status for. This will typically be the top level
+ * transactionid for a top level commit or abort. It can also be a
+ * subtransaction when we record transaction aborts.
+ *
+ * subxids is an array of xids of length nsubxids, representing subtransactions
+ * in the tree of xid. In various cases nsubxids may be zero.
+ *
+ * csn is the commit sequence number of the transaction. It should be
+ * AbortedGlobalCSN for abort cases.
+ */
+void
+GlobalCSNLogSetCSN(TransactionId xid, int nsubxids,
+					 TransactionId *subxids, GlobalCSN csn)
+{
+	int			pageno;
+	int			i = 0;
+	int			offset = 0;
+
+	/* Callers of GlobalCSNLogSetCSN() must check GUC params */
+	Assert(track_global_snapshots);
+
+	Assert(TransactionIdIsValid(xid));
+
+	pageno = TransactionIdToPage(xid);		/* get page of parent */
+	for (;;)
+	{
+		int			num_on_page = 0;
+
+		while (i < nsubxids && TransactionIdToPage(subxids[i]) == pageno)
+		{
+			num_on_page++;
+			i++;
+		}
+
+		GlobalCSNLogSetPageStatus(xid,
+							num_on_page, subxids + offset,
+							csn, pageno);
+		if (i >= nsubxids)
+			break;
+
+		offset = i;
+		pageno = TransactionIdToPage(subxids[offset]);
+		xid = InvalidTransactionId;
+	}
+}
+
+/*
+ * Record the final state of transaction entries in the csn log for
+ * all entries on a single page.  Atomic only on this page.
+ *
+ * Otherwise API is same as TransactionIdSetTreeStatus()
+ */
+static void
+GlobalCSNLogSetPageStatus(TransactionId xid, int nsubxids,
+						   TransactionId *subxids,
+						   GlobalCSN csn, int pageno)
+{
+	int			slotno;
+	int			i;
+
+	LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+	slotno = SimpleLruReadPage(GlobalCsnlogCtl, pageno, true, xid);
+
+	/* Subtransactions first, if needed ... */
+	for (i = 0; i < nsubxids; i++)
+	{
+		Assert(GlobalCsnlogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
+		GlobalCSNLogSetCSNInSlot(subxids[i],	csn, slotno);
+	}
+
+	/* ... then the main transaction */
+	if (TransactionIdIsValid(xid))
+		GlobalCSNLogSetCSNInSlot(xid, csn, slotno);
+
+	GlobalCsnlogCtl->shared->page_dirty[slotno] = true;
+
+	LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * Sets the commit status of a single transaction.
+ */
+static void
+GlobalCSNLogSetCSNInSlot(TransactionId xid, GlobalCSN csn, int slotno)
+{
+	int			entryno = TransactionIdToPgIndex(xid);
+	GlobalCSN *ptr;
+
+	Assert(LWLockHeldByMe(GlobalCSNLogControlLock));
+
+	ptr = (GlobalCSN *) (GlobalCsnlogCtl->shared->page_buffer[slotno] + entryno * sizeof(XLogRecPtr));
+
+	*ptr = csn;
+}
+
+/*
+ * Interrogate the state of a transaction in the log.
+ *
+ * NB: this is a low-level routine and is NOT the preferred entry point
+ * for most uses; TransactionIdGetGlobalCSN() in global_snapshot.c is the
+ * intended caller.
+ */
+GlobalCSN
+GlobalCSNLogGetCSN(TransactionId xid)
+{
+	int			pageno = TransactionIdToPage(xid);
+	int			entryno = TransactionIdToPgIndex(xid);
+	int			slotno;
+	GlobalCSN *ptr;
+	GlobalCSN	global_csn;
+
+	/* Callers of GlobalCSNLogGetCSN() must check GUC params */
+	Assert(track_global_snapshots);
+
+	/* Can't ask about stuff that might not be around anymore */
+	Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
+
+	/* lock is acquired by SimpleLruReadPage_ReadOnly */
+
+	slotno = SimpleLruReadPage_ReadOnly(GlobalCsnlogCtl, pageno, xid);
+	ptr = (GlobalCSN *) (GlobalCsnlogCtl->shared->page_buffer[slotno] + entryno * sizeof(XLogRecPtr));
+	global_csn = *ptr;
+
+	LWLockRelease(GlobalCSNLogControlLock);
+
+	return global_csn;
+}
+
+/*
+ * Number of shared GlobalCSNLog buffers.
+ */
+static Size
+GlobalCSNLogShmemBuffers(void)
+{
+	return Min(32, Max(4, NBuffers / 512));
+}
+
+/*
+ * Reserve shared memory for GlobalCsnlogCtl.
+ */
+Size
+GlobalCSNLogShmemSize(void)
+{
+	if (!track_global_snapshots)
+		return 0;
+
+	return SimpleLruShmemSize(GlobalCSNLogShmemBuffers(), 0);
+}
+
+/*
+ * Initialization of shared memory for GlobalCSNLog.
+ */
+void
+GlobalCSNLogShmemInit(void)
+{
+	if (!track_global_snapshots)
+		return;
+
+	GlobalCsnlogCtl->PagePrecedes = GlobalCSNLogPagePrecedes;
+	SimpleLruInit(GlobalCsnlogCtl, "GlobalCSNLog Ctl", GlobalCSNLogShmemBuffers(), 0,
+				  GlobalCSNLogControlLock, "pg_global_csn", LWTRANCHE_GLOBAL_CSN_LOG_BUFFERS);
+}
+
+/*
+ * This func must be called ONCE on system install.  It creates the initial
+ * GlobalCSNLog segment.  The pg_global_csn directory is assumed to have been
+ * created by initdb, and GlobalCSNLogShmemInit must have been called already.
+ */
+void
+BootStrapGlobalCSNLog(void)
+{
+	int			slotno;
+
+	if (!track_global_snapshots)
+		return;
+
+	LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+	/* Create and zero the first page of the commit log */
+	slotno = ZeroGlobalCSNLogPage(0);
+
+	/* Make sure it's written out */
+	SimpleLruWritePage(GlobalCsnlogCtl, slotno);
+	Assert(!GlobalCsnlogCtl->shared->page_dirty[slotno]);
+
+	LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * Initialize (or reinitialize) a page of GlobalCSNLog to zeroes.
+ *
+ * The page is not actually written, just set up in shared memory.
+ * The slot number of the new page is returned.
+ *
+ * Control lock must be held at entry, and will be held at exit.
+ */
+static int
+ZeroGlobalCSNLogPage(int pageno)
+{
+	Assert(LWLockHeldByMe(GlobalCSNLogControlLock));
+	return SimpleLruZeroPage(GlobalCsnlogCtl, pageno);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ * after StartupXLOG has initialized ShmemVariableCache->nextXid.
+ *
+ * oldestActiveXID is the oldest XID of any prepared transaction, or nextXid
+ * if there are none.
+ */
+void
+StartupGlobalCSNLog(TransactionId oldestActiveXID)
+{
+	int			startPage;
+	int			endPage;
+
+	if (!track_global_snapshots)
+		return;
+
+	/*
+	 * Since we don't expect pg_global_csn to be valid across crashes, we
+	 * initialize the currently-active page(s) to zeroes during startup.
+	 * Whenever we advance into a new page, ExtendGlobalCSNLog will likewise
+	 * zero the new page without regard to whatever was previously on disk.
+	 */
+	LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+	startPage = TransactionIdToPage(oldestActiveXID);
+	endPage = TransactionIdToPage(XidFromFullTransactionId(ShmemVariableCache->nextFullXid));
+
+	while (startPage != endPage)
+	{
+		(void) ZeroGlobalCSNLogPage(startPage);
+		startPage++;
+		/* must account for wraparound */
+		if (startPage > TransactionIdToPage(MaxTransactionId))
+			startPage = 0;
+	}
+	(void) ZeroGlobalCSNLogPage(startPage);
+
+	LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend shutdown
+ */
+void
+ShutdownGlobalCSNLog(void)
+{
+	if (!track_global_snapshots)
+		return;
+
+	/*
+	 * Flush dirty GlobalCSNLog pages to disk.
+	 *
+	 * This is not actually necessary from a correctness point of view. We do
+	 * it merely as a debugging aid.
+	 */
+	TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_START(false);
+	SimpleLruFlush(GlobalCsnlogCtl, false);
+	TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_DONE(false);
+}
+
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
+void
+CheckPointGlobalCSNLog(void)
+{
+	if (!track_global_snapshots)
+		return;
+
+	/*
+	 * Flush dirty GlobalCSNLog pages to disk.
+	 *
+	 * This is not actually necessary from a correctness point of view. We do
+	 * it merely to improve the odds that writing of dirty pages is done by
+	 * the checkpoint process and not by backends.
+	 */
+	TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_START(true);
+	SimpleLruFlush(GlobalCsnlogCtl, true);
+	TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_DONE(true);
+}
+
+/*
+ * Make sure that GlobalCSNLog has room for a newly-allocated XID.
+ *
+ * NB: this is called while holding XidGenLock.  We want it to be very fast
+ * most of the time; even when it's not so fast, no actual I/O need happen
+ * unless we're forced to write out a dirty clog or xlog page to make room
+ * in shared memory.
+ */
+void
+ExtendGlobalCSNLog(TransactionId newestXact)
+{
+	int			pageno;
+
+	if (!track_global_snapshots)
+		return;
+
+	/*
+	 * No work except at first XID of a page.  But beware: just after
+	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
+	 */
+	if (TransactionIdToPgIndex(newestXact) != 0 &&
+		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
+		return;
+
+	pageno = TransactionIdToPage(newestXact);
+
+	LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+	/* Zero the page and make an XLOG entry about it */
+	ZeroGlobalCSNLogPage(pageno);
+
+	LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * Remove all GlobalCSNLog segments before the one holding the passed
+ * transaction ID.
+ *
+ * This is normally called during checkpoint, with oldestXact being the
+ * oldest TransactionXmin of any running transaction.
+ */
+void
+TruncateGlobalCSNLog(TransactionId oldestXact)
+{
+	int			cutoffPage;
+
+	if (!track_global_snapshots)
+		return;
+
+	/*
+	 * The cutoff point is the start of the segment containing oldestXact. We
+	 * pass the *page* containing oldestXact to SimpleLruTruncate. We step
+	 * back one transaction to avoid passing a cutoff page that hasn't been
+	 * created yet in the rare case that oldestXact would be the first item on
+	 * a page and oldestXact == next XID.  In that case, if we didn't subtract
+	 * one, we'd trigger SimpleLruTruncate's wraparound detection.
+	 */
+	TransactionIdRetreat(oldestXact);
+	cutoffPage = TransactionIdToPage(oldestXact);
+
+	SimpleLruTruncate(GlobalCsnlogCtl, cutoffPage);
+}
+
+/*
+ * Decide which of two GlobalCSNLog page numbers is "older" for truncation
+ * purposes.
+ *
+ * We need to use comparison of TransactionIds here in order to do the right
+ * thing with wraparound XID arithmetic.  However, if we are asked about
+ * page number zero, we don't want to hand InvalidTransactionId to
+ * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
+ * offset both xids by FirstNormalTransactionId to avoid that.
+ */
+static bool
+GlobalCSNLogPagePrecedes(int page1, int page2)
+{
+	TransactionId xid1;
+	TransactionId xid2;
+
+	xid1 = ((TransactionId) page1) * GCSNLOG_XACTS_PER_PAGE;
+	xid1 += FirstNormalTransactionId;
+	xid2 = ((TransactionId) page2) * GCSNLOG_XACTS_PER_PAGE;
+	xid2 += FirstNormalTransactionId;
+
+	return TransactionIdPrecedes(xid1, xid2);
+}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 2f7d4ed59a..0ecc02a3dd 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -77,6 +77,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/global_csn_log.h"
 #include "access/htup_details.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 2570e7086a..882bc66825 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -15,6 +15,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/global_csn_log.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -173,6 +174,7 @@ GetNewTransactionId(bool isSubXact)
 	 * Extend pg_subtrans and pg_commit_ts too.
 	 */
 	ExtendCLOG(xid);
+	ExtendGlobalCSNLog(xid);
 	ExtendCommitTs(xid);
 	ExtendSUBTRANS(xid);
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0d3d670928..285d9d442e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -24,6 +24,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/global_csn_log.h"
 #include "access/heaptoast.h"
 #include "access/multixact.h"
 #include "access/rewriteheap.h"
@@ -5345,6 +5346,7 @@ BootStrapXLOG(void)
 
 	/* Bootstrap the commit log, too */
 	BootStrapCLOG();
+	BootStrapGlobalCSNLog();
 	BootStrapCommitTs();
 	BootStrapSUBTRANS();
 	BootStrapMultiXact();
@@ -7054,6 +7056,7 @@ StartupXLOG(void)
 			 * maintained during recovery and need not be started yet.
 			 */
 			StartupCLOG();
+			StartupGlobalCSNLog(oldestActiveXID);
 			StartupSUBTRANS(oldestActiveXID);
 
 			/*
@@ -7871,6 +7874,7 @@ StartupXLOG(void)
 	if (standbyState == STANDBY_DISABLED)
 	{
 		StartupCLOG();
+		StartupGlobalCSNLog(oldestActiveXID);
 		StartupSUBTRANS(oldestActiveXID);
 	}
 
@@ -8518,6 +8522,7 @@ ShutdownXLOG(int code, Datum arg)
 		CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
 	}
 	ShutdownCLOG();
+	ShutdownGlobalCSNLog();
 	ShutdownCommitTs();
 	ShutdownSUBTRANS();
 	ShutdownMultiXact();
@@ -9090,7 +9095,10 @@ CreateCheckPoint(int flags)
 	 * StartupSUBTRANS hasn't been called yet.
 	 */
 	if (!RecoveryInProgress())
+	{
 		TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+		TruncateGlobalCSNLog(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+	}
 
 	/* Real work is done, but log and update stats before releasing lock. */
 	LogCheckpointEnd(false);
@@ -9166,6 +9174,7 @@ static void
 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointCLOG();
+	CheckPointGlobalCSNLog();
 	CheckPointCommitTs();
 	CheckPointSUBTRANS();
 	CheckPointMultiXact();
@@ -9450,7 +9459,10 @@ CreateRestartPoint(int flags)
 	 * this because StartupSUBTRANS hasn't been called yet.
 	 */
 	if (EnableHotStandby)
+	{
 		TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+		TruncateGlobalCSNLog(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+	}
 
 	/* Real work is done, but log and update before releasing lock. */
 	LogCheckpointEnd(true);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cd..dc2d2959c4 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -16,6 +16,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/global_csn_log.h"
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/nbtree.h"
@@ -125,6 +126,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, ProcGlobalShmemSize());
 		size = add_size(size, XLOGShmemSize());
 		size = add_size(size, CLOGShmemSize());
+		size = add_size(size, GlobalCSNLogShmemSize());
 		size = add_size(size, CommitTsShmemSize());
 		size = add_size(size, SUBTRANSShmemSize());
 		size = add_size(size, TwoPhaseShmemSize());
@@ -213,6 +215,7 @@ CreateSharedMemoryAndSemaphores(void)
 	 */
 	XLOGShmemInit();
 	CLOGShmemInit();
+	GlobalCSNLogShmemInit();
 	CommitTsShmemInit();
 	SUBTRANSShmemInit();
 	MultiXactShmemInit();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 363000670b..8ae4906474 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -46,6 +46,7 @@
 #include <signal.h>
 
 #include "access/clog.h"
+#include "access/global_csn_log.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -835,6 +836,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
 	while (TransactionIdPrecedes(latestObservedXid, running->nextXid))
 	{
 		ExtendSUBTRANS(latestObservedXid);
+		ExtendGlobalCSNLog(latestObservedXid);
 		TransactionIdAdvance(latestObservedXid);
 	}
 	TransactionIdRetreat(latestObservedXid);	/* = running->nextXid - 1 */
@@ -3337,6 +3339,7 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
 		while (TransactionIdPrecedes(next_expected_xid, xid))
 		{
 			TransactionIdAdvance(next_expected_xid);
+			ExtendGlobalCSNLog(next_expected_xid);
 			ExtendSUBTRANS(next_expected_xid);
 		}
 		Assert(next_expected_xid == xid);
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index db47843229..fe18c93b61 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -49,3 +49,4 @@ MultiXactTruncationLock				41
 OldSnapshotTimeMapLock				42
 LogicalRepWorkerLock				43
 CLogTruncationLock					44
+GlobalCSNLogControlLock				45
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 802b1ec22f..d0bb64870c 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -42,6 +42,7 @@
 #include "catalog/pg_type.h"
 #include "commands/async.h"
 #include "commands/prepare.h"
+#include "common/hashfn.h"
 #include "executor/spi.h"
 #include "jit/jit.h"
 #include "libpq/libpq.h"
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 53a6cd2436..0ca331c6f9 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1175,6 +1175,15 @@ static struct config_bool ConfigureNamesBool[] =
 		false,
 		NULL, NULL, NULL
 	},
+	{
+		{"track_global_snapshots", PGC_POSTMASTER, RESOURCES_MEM,
+			gettext_noop("Enable global snapshot tracking."),
+			gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.")
+		},
+		&track_global_snapshots,
+		true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */
+		NULL, NULL, NULL
+	},
 	{
 		{"ssl", PGC_SIGHUP, CONN_AUTH_SSL,
 			gettext_noop("Enables SSL connections."),
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index a0b0458108..f900e7f3b4 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -77,6 +77,8 @@ provider postgresql {
 	probe clog__checkpoint__done(bool);
 	probe subtrans__checkpoint__start(bool);
 	probe subtrans__checkpoint__done(bool);
+	probe globalcsnlog__checkpoint__start(bool);
+	probe globalcsnlog__checkpoint__done(bool);
 	probe multixact__checkpoint__start(bool);
 	probe multixact__checkpoint__done(bool);
 	probe twophase__checkpoint__start();
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index a6577486ce..d0afab9d33 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -220,7 +220,8 @@ static const char *const subdirs[] = {
 	"pg_xact",
 	"pg_logical",
 	"pg_logical/snapshots",
-	"pg_logical/mappings"
+	"pg_logical/mappings",
+	"pg_global_csn"
 };
 
 
diff --git a/src/include/access/global_csn_log.h b/src/include/access/global_csn_log.h
new file mode 100644
index 0000000000..417c26c8a3
--- /dev/null
+++ b/src/include/access/global_csn_log.h
@@ -0,0 +1,30 @@
+/*
+ * global_csn_log.h
+ *
+ * Commit-Sequence-Number log.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/global_csn_log.h
+ */
+#ifndef CSNLOG_H
+#define CSNLOG_H
+
+#include "access/xlog.h"
+#include "utils/snapshot.h"
+
+extern void GlobalCSNLogSetCSN(TransactionId xid, int nsubxids,
+							   TransactionId *subxids, GlobalCSN csn);
+extern GlobalCSN GlobalCSNLogGetCSN(TransactionId xid);
+
+extern Size GlobalCSNLogShmemSize(void);
+extern void GlobalCSNLogShmemInit(void);
+extern void BootStrapGlobalCSNLog(void);
+extern void StartupGlobalCSNLog(TransactionId oldestActiveXID);
+extern void ShutdownGlobalCSNLog(void);
+extern void CheckPointGlobalCSNLog(void);
+extern void ExtendGlobalCSNLog(TransactionId newestXact);
+extern void TruncateGlobalCSNLog(TransactionId oldestXact);
+
+#endif   /* CSNLOG_H */
\ No newline at end of file
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 8fda8e4f78..c303042663 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -198,6 +198,7 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_CLOG_BUFFERS = NUM_INDIVIDUAL_LWLOCKS,
 	LWTRANCHE_COMMITTS_BUFFERS,
 	LWTRANCHE_SUBTRANS_BUFFERS,
+	LWTRANCHE_GLOBAL_CSN_LOG_BUFFERS,
 	LWTRANCHE_MXACTOFFSET_BUFFERS,
 	LWTRANCHE_MXACTMEMBER_BUFFERS,
 	LWTRANCHE_ASYNC_BUFFERS,
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 4796edb63a..57d2dfaa67 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -20,6 +20,9 @@
 #include "storage/buf.h"
 
 
+typedef uint64 GlobalCSN;
+extern bool track_global_snapshots;
+
 /*
  * The different snapshot types.  We use SnapshotData structures to represent
  * both "regular" (MVCC) snapshots and "special" snapshots that have non-MVCC
-- 
2.17.1

>From 25a5288764e9e70a0a61a4a1b32111ce8b29c966 Mon Sep 17 00:00:00 2001
From: "Andrey V. Lepikhov" <a.lepik...@postgrespro.ru>
Date: Tue, 12 May 2020 08:30:46 +0500
Subject: [PATCH 2/3] Global-snapshots-v3

---
 src/backend/access/transam/Makefile           |   1 +
 src/backend/access/transam/global_snapshot.c  | 755 ++++++++++++++++++
 src/backend/access/transam/twophase.c         | 156 ++++
 src/backend/access/transam/xact.c             |  29 +
 src/backend/access/transam/xlog.c             |   2 +
 src/backend/storage/ipc/ipci.c                |   3 +
 src/backend/storage/ipc/procarray.c           |  92 ++-
 src/backend/storage/lmgr/lwlocknames.txt      |   1 +
 src/backend/storage/lmgr/proc.c               |   5 +
 src/backend/utils/misc/guc.c                  |  13 +-
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/backend/utils/time/snapmgr.c              | 167 +++-
 src/include/access/global_snapshot.h          |  72 ++
 src/include/access/twophase.h                 |   1 +
 src/include/catalog/pg_proc.dat               |  14 +
 src/include/datatype/timestamp.h              |   3 +
 src/include/fmgr.h                            |   1 +
 src/include/portability/instr_time.h          |  10 +
 src/include/storage/proc.h                    |  15 +
 src/include/storage/procarray.h               |   8 +
 src/include/utils/snapmgr.h                   |   3 +
 src/include/utils/snapshot.h                  |   8 +
 22 files changed, 1354 insertions(+), 7 deletions(-)
 create mode 100644 src/backend/access/transam/global_snapshot.c
 create mode 100644 src/include/access/global_snapshot.h

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 60ff8b141e..6de567a79b 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -16,6 +16,7 @@ OBJS = \
 	clog.o \
 	commit_ts.o \
 	global_csn_log.o \
+	global_snapshot.o \
 	generic_xlog.o \
 	multixact.o \
 	parallel.o \
diff --git a/src/backend/access/transam/global_snapshot.c b/src/backend/access/transam/global_snapshot.c
new file mode 100644
index 0000000000..bac16828bb
--- /dev/null
+++ b/src/backend/access/transam/global_snapshot.c
@@ -0,0 +1,755 @@
+/*-------------------------------------------------------------------------
+ *
+ * global_snapshot.c
+ *		Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/global_snapshot.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/global_csn_log.h"
+#include "access/global_snapshot.h"
+#include "access/transam.h"
+#include "access/twophase.h"
+#include "access/xact.h"
+#include "portability/instr_time.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "miscadmin.h"
+
+/* Raise a warning if imported global_csn exceeds ours by this value. */
+#define SNAP_DESYNC_COMPLAIN (1*NSECS_PER_SEC) /* 1 second */
+
+/*
+ * GlobalSnapshotState
+ *
+ * Do not trust local clocks to be strictly monotonical and save last acquired
+ * value so later we can compare next timestamp with it. Accessed through
+ * GlobalSnapshotGenerate() and GlobalSnapshotSync().
+ */
+typedef struct
+{
+	GlobalCSN		 last_global_csn;
+	volatile slock_t lock;
+} GlobalSnapshotState;
+
+static GlobalSnapshotState *gsState;
+
+
+/*
+ * GUC to delay advance of oldestXid for this amount of time. Also determines
+ * the size GlobalSnapshotXidMap circular buffer.
+ */
+int global_snapshot_defer_time;
+
+/*
+ * Enables this module.
+ */
+extern bool track_global_snapshots;
+
+/*
+ * GlobalSnapshotXidMap
+ *
+ * To be able to install global snapshot that points to past we need to keep
+ * old versions of tuples and therefore delay advance of oldestXid.  Here we
+ * keep track of correspondence between snapshot's global_csn and oldestXid
+ * that was set at the time when the snapshot was taken.  Much like the
+ * snapshot too old's OldSnapshotControlData does, but with finer granularity
+ * to seconds.
+ *
+ * Different strategies can be employed to hold oldestXid (e.g. we can track
+ * oldest global_csn-based snapshot among cluster nodes and map it oldestXid
+ * on each node) but here implemented one that tries to avoid cross-node
+ * communications which are tricky in case of postgres_fdw.
+ *
+ * On each snapshot acquisition GlobalSnapshotMapXmin() is called and stores
+ * correspondence between current global_csn and oldestXmin in a sparse way:
+ * global_csn is rounded to seconds (and here we use the fact that global_csn
+ * is just a timestamp) and oldestXmin is stored in the circular buffer where
+ * rounded global_csn acts as an offset from current circular buffer head.
+ * Size of the circular buffer is controlled by global_snapshot_defer_time GUC.
+ *
+ * When global snapshot arrives from different node we check that its
+ * global_csn is still in our map, otherwise we'll error out with "snapshot too
+ * old" message.  If global_csn is successfully mapped to oldestXid we move
+ * backend's pgxact->xmin to proc->originalXmin and fill pgxact->xmin to
+ * mapped oldestXid.  That way GetOldestXmin() can take into account backends
+ * with imported global snapshot and old tuple versions will be preserved.
+ *
+ * Also while calculating oldestXmin for our map in presence of imported
+ * global snapshots we should use proc->originalXmin instead of pgxact->xmin
+ * that was set during import.  Otherwise, we can create a feedback loop:
+ * xmin's of imported global snapshots were calculated using our map and new
+ * entries in map going to be calculated based on that xmin's, and there is
+ * a risk to stuck forever with one non-increasing oldestXmin.  All other
+ * callers of GetOldestXmin() are using pgxact->xmin so the old tuple versions
+ * are preserved.
+ */
+typedef struct GlobalSnapshotXidMap
+{
+	int				 head;				/* offset of current freshest value */
+	int				 size;				/* total size of circular buffer */
+	GlobalCSN_atomic last_csn_seconds;	/* last rounded global_csn that changed
+										 * xmin_by_second[] */
+	TransactionId   *xmin_by_second;	/* circular buffer of oldestXmin's */
+}
+GlobalSnapshotXidMap;
+
+static GlobalSnapshotXidMap *gsXidMap;
+
+
+/* Estimate shared memory space needed */
+Size
+GlobalSnapshotShmemSize(void)
+{
+	Size	size = 0;
+
+	if (track_global_snapshots || global_snapshot_defer_time > 0)
+	{
+		size += MAXALIGN(sizeof(GlobalSnapshotState));
+	}
+
+	if (global_snapshot_defer_time > 0)
+	{
+		size += sizeof(GlobalSnapshotXidMap);
+		size += global_snapshot_defer_time*sizeof(TransactionId);
+		size = MAXALIGN(size);
+	}
+
+	return size;
+}
+
+/* Init shared memory structures */
+void
+GlobalSnapshotShmemInit()
+{
+	bool found;
+
+	if (track_global_snapshots || global_snapshot_defer_time > 0)
+	{
+		gsState = ShmemInitStruct("gsState",
+								sizeof(GlobalSnapshotState),
+								&found);
+		if (!found)
+		{
+			gsState->last_global_csn = 0;
+			SpinLockInit(&gsState->lock);
+		}
+	}
+
+	if (global_snapshot_defer_time > 0)
+	{
+		gsXidMap = ShmemInitStruct("gsXidMap",
+								   sizeof(GlobalSnapshotXidMap),
+								   &found);
+		if (!found)
+		{
+			int i;
+
+			pg_atomic_init_u64(&gsXidMap->last_csn_seconds, 0);
+			gsXidMap->head = 0;
+			gsXidMap->size = global_snapshot_defer_time;
+			gsXidMap->xmin_by_second =
+							ShmemAlloc(sizeof(TransactionId)*gsXidMap->size);
+
+			for (i = 0; i < gsXidMap->size; i++)
+				gsXidMap->xmin_by_second[i] = InvalidTransactionId;
+		}
+	}
+}
+
+/*
+ * GlobalSnapshotStartup
+ *
+ * Set gsXidMap entries to oldestActiveXID during startup.
+ */
+void
+GlobalSnapshotStartup(TransactionId oldestActiveXID)
+{
+	/*
+	 * Run only if we have initialized shared memory and gsXidMap
+	 * is enabled.
+	 */
+	if (IsNormalProcessingMode() &&
+		track_global_snapshots && global_snapshot_defer_time > 0)
+	{
+		int i;
+
+		Assert(TransactionIdIsValid(oldestActiveXID));
+		for (i = 0; i < gsXidMap->size; i++)
+			gsXidMap->xmin_by_second[i] = oldestActiveXID;
+		ProcArraySetGlobalSnapshotXmin(oldestActiveXID);
+	}
+}
+
+/*
+ * GlobalSnapshotMapXmin
+ *
+ * Maintain circular buffer of oldestXmins for several seconds in past. This
+ * buffer allows to shift oldestXmin in the past when backend is importing
+ * global transaction. Otherwise old versions of tuples that were needed for
+ * this transaction can be recycled by other processes (vacuum, HOT, etc).
+ *
+ * Locking here is not trivial. Called upon each snapshot creation after
+ * ProcArrayLock is released. Such usage creates several race conditions. It
+ * is possible that backend who got global_csn called GlobalSnapshotMapXmin()
+ * only after other backends managed to get snapshot and complete
+ * GlobalSnapshotMapXmin() call, or even committed. This is safe because
+ *
+ *		* We already hold our xmin in MyPgXact, so our snapshot will not be
+ *		  harmed even though ProcArrayLock is released.
+ *
+ *		* snapshot_global_csn is always pessmistically rounded up to the next
+ *		  second.
+ *
+ *		* For performance reasons, xmin value for particular second is filled
+ *		  only once. Because of that instead of writing to buffer just our
+ *		  xmin (which is enough for our snapshot), we bump oldestXmin there --
+ *		  it mitigates the possibility of damaging someone else's snapshot by
+ *		  writing to the buffer too advanced value in case of slowness of
+ *		  another backend who generated csn earlier, but didn't manage to
+ *		  insert it before us.
+ *
+ *		* if GlobalSnapshotMapXmin() founds a gap in several seconds between
+ *		  current call and latest completed call then it should fill that gap
+ *		  with latest known values instead of new one. Otherwise it is
+ *		  possible (however highly unlikely) that this gap also happend
+ *		  between taking snapshot and call to GlobalSnapshotMapXmin() for some
+ *		  backend. And we are at risk to fill circullar buffer with
+ *		  oldestXmin's that are bigger then they actually were.
+ */
+void
+GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn)
+{
+	int offset, gap, i;
+	GlobalCSN csn_seconds;
+	GlobalCSN last_csn_seconds;
+	volatile TransactionId oldest_deferred_xmin;
+	TransactionId current_oldest_xmin, previous_oldest_xmin;
+
+	/* Callers should check config values */
+	Assert(global_snapshot_defer_time > 0);
+	Assert(gsXidMap != NULL);
+
+	/*
+	 * Round up global_csn to the next second -- pessimistically and safely.
+	 */
+	csn_seconds = (snapshot_global_csn / NSECS_PER_SEC + 1);
+
+	/*
+	 * Fast-path check. Avoid taking exclusive GlobalSnapshotXidMapLock lock
+	 * if oldestXid was already written to xmin_by_second[] for this rounded
+	 * global_csn.
+	 */
+	if (pg_atomic_read_u64(&gsXidMap->last_csn_seconds) >= csn_seconds)
+		return;
+
+	/* Ok, we have new entry (or entries) */
+	LWLockAcquire(GlobalSnapshotXidMapLock, LW_EXCLUSIVE);
+
+	/* Re-check last_csn_seconds under lock */
+	last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds);
+	if (last_csn_seconds >= csn_seconds)
+	{
+		LWLockRelease(GlobalSnapshotXidMapLock);
+		return;
+	}
+	pg_atomic_write_u64(&gsXidMap->last_csn_seconds, csn_seconds);
+
+	/*
+	 * Count oldest_xmin.
+	 *
+	 * It was possible to calculate oldest_xmin during corresponding snapshot
+	 * creation, but GetSnapshotData() intentionally reads only PgXact, but not
+	 * PgProc. And we need info about originalXmin (see comment to gsXidMap)
+	 * which is stored in PgProc because of threats in comments around PgXact
+	 * about extending it with new fields. So just calculate oldest_xmin again,
+	 * that anyway happens quite rarely.
+	 */
+	current_oldest_xmin = GetOldestXmin(NULL, PROCARRAY_NON_IMPORTED_XMIN);
+
+	previous_oldest_xmin = gsXidMap->xmin_by_second[gsXidMap->head];
+
+	Assert(TransactionIdIsNormal(current_oldest_xmin));
+	Assert(TransactionIdIsNormal(previous_oldest_xmin) || !track_global_snapshots);
+
+	gap = csn_seconds - last_csn_seconds;
+	offset = csn_seconds % gsXidMap->size;
+
+	/* Sanity check before we update head and gap */
+	Assert( gap >= 1 );
+	Assert( (gsXidMap->head + gap) % gsXidMap->size == offset );
+
+	gap = gap > gsXidMap->size ? gsXidMap->size : gap;
+	gsXidMap->head = offset;
+
+	/* Fill new entry with current_oldest_xmin */
+	gsXidMap->xmin_by_second[offset] = current_oldest_xmin;
+
+	/*
+	 * If we have gap then fill it with previous_oldest_xmin for reasons
+	 * outlined in comment above this function.
+	 */
+	for (i = 1; i < gap; i++)
+	{
+		offset = (offset + gsXidMap->size - 1) % gsXidMap->size;
+		gsXidMap->xmin_by_second[offset] = previous_oldest_xmin;
+	}
+
+	oldest_deferred_xmin =
+		gsXidMap->xmin_by_second[ (gsXidMap->head + 1) % gsXidMap->size ];
+
+	LWLockRelease(GlobalSnapshotXidMapLock);
+
+	/*
+	 * Advance procArray->global_snapshot_xmin after we released
+	 * GlobalSnapshotXidMapLock. Since we gather not xmin but oldestXmin, it
+	 * never goes backwards regardless of how slow we can do that.
+	 */
+	Assert(TransactionIdFollowsOrEquals(oldest_deferred_xmin,
+										ProcArrayGetGlobalSnapshotXmin()));
+	ProcArraySetGlobalSnapshotXmin(oldest_deferred_xmin);
+}
+
+
+/*
+ * GlobalSnapshotToXmin
+ *
+ * Get oldestXmin that took place when snapshot_global_csn was taken.
+ */
+TransactionId
+GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn)
+{
+	TransactionId xmin;
+	GlobalCSN csn_seconds;
+	volatile GlobalCSN last_csn_seconds;
+
+	/* Callers should check config values */
+	Assert(global_snapshot_defer_time > 0);
+	Assert(gsXidMap != NULL);
+
+	/* Round down to get conservative estimates */
+	csn_seconds = (snapshot_global_csn / NSECS_PER_SEC);
+
+	LWLockAcquire(GlobalSnapshotXidMapLock, LW_SHARED);
+	last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds);
+	if (csn_seconds > last_csn_seconds)
+	{
+		/* we don't have entry for this global_csn yet, return latest known */
+		xmin = gsXidMap->xmin_by_second[gsXidMap->head];
+	}
+	else if (last_csn_seconds - csn_seconds < gsXidMap->size)
+	{
+		/* we are good, retrieve value from our map */
+		Assert(last_csn_seconds % gsXidMap->size == gsXidMap->head);
+		xmin = gsXidMap->xmin_by_second[csn_seconds % gsXidMap->size];
+	}
+	else
+	{
+		/* requested global_csn is too old, let caller know */
+		xmin = InvalidTransactionId;
+	}
+	LWLockRelease(GlobalSnapshotXidMapLock);
+
+	return xmin;
+}
+
+/*
+ * GlobalSnapshotGenerate
+ *
+ * Generate GlobalCSN which is actually a local time. Also we are forcing
+ * this time to be always increasing. Since now it is not uncommon to have
+ * millions of read transactions per second we are trying to use nanoseconds
+ * if such time resolution is available.
+ */
+GlobalCSN
+GlobalSnapshotGenerate(bool locked)
+{
+	instr_time	current_time;
+	GlobalCSN	global_csn;
+
+	Assert(track_global_snapshots || global_snapshot_defer_time > 0);
+
+	/*
+	 * TODO: create some macro that add small random shift to current time.
+	 */
+	INSTR_TIME_SET_CURRENT(current_time);
+	global_csn = (GlobalCSN) INSTR_TIME_GET_NANOSEC(current_time);
+
+	/* TODO: change to atomics? */
+	if (!locked)
+		SpinLockAcquire(&gsState->lock);
+
+	if (global_csn <= gsState->last_global_csn)
+		global_csn = ++gsState->last_global_csn;
+	else
+		gsState->last_global_csn = global_csn;
+
+	if (!locked)
+		SpinLockRelease(&gsState->lock);
+
+	return global_csn;
+}
+
+/*
+ * GlobalSnapshotSync
+ *
+ * Due to time desynchronization on different nodes we can receive global_csn
+ * which is greater than global_csn on this node. To preserve proper isolation
+ * this node needs to wait when such global_csn comes on local clock.
+ *
+ * This should happend relatively rare if nodes have running NTP/PTP/etc.
+ * Complain if wait time is more than SNAP_SYNC_COMPLAIN.
+ */
+void
+GlobalSnapshotSync(GlobalCSN remote_gcsn)
+{
+	GlobalCSN	local_gcsn;
+	GlobalCSN	delta;
+
+	Assert(track_global_snapshots);
+
+	for(;;)
+	{
+		SpinLockAcquire(&gsState->lock);
+		if (gsState->last_global_csn > remote_gcsn)
+		{
+			/* Everything is fine */
+			SpinLockRelease(&gsState->lock);
+			return;
+		}
+		else if ((local_gcsn = GlobalSnapshotGenerate(true)) >= remote_gcsn)
+		{
+			/*
+			 * Everything is fine too, but last_global_csn wasn't updated for
+			 * some time.
+			 */
+			SpinLockRelease(&gsState->lock);
+			return;
+		}
+		SpinLockRelease(&gsState->lock);
+
+		/* Okay we need to sleep now */
+		delta = remote_gcsn - local_gcsn;
+		if (delta > SNAP_DESYNC_COMPLAIN)
+			ereport(WARNING,
+				(errmsg("remote global snapshot exceeds ours by more than a second"),
+				 errhint("Consider running NTPd on servers participating in global transaction")));
+
+		/* TODO: report this sleeptime somewhere? */
+		pg_usleep((long) (delta/NSECS_PER_USEC));
+
+		/*
+		 * Loop that checks to ensure that we actually slept for specified
+		 * amount of time.
+		 */
+	}
+
+	Assert(false); /* Should not happend */
+	return;
+}
+
+/*
+ * TransactionIdGetGlobalCSN
+ *
+ * Get GlobalCSN for specified TransactionId taking care about special xids,
+ * xids beyond TransactionXmin and InDoubt states.
+ */
+GlobalCSN
+TransactionIdGetGlobalCSN(TransactionId xid)
+{
+	GlobalCSN global_csn;
+
+	Assert(track_global_snapshots);
+
+	/* Handle permanent TransactionId's for which we don't have mapping */
+	if (!TransactionIdIsNormal(xid))
+	{
+		if (xid == InvalidTransactionId)
+			return AbortedGlobalCSN;
+		if (xid == FrozenTransactionId || xid == BootstrapTransactionId)
+			return FrozenGlobalCSN;
+		Assert(false); /* Should not happend */
+	}
+
+	/*
+	 * For xids which less then TransactionXmin GlobalCSNLog can be already
+	 * trimmed but we know that such transaction is definetly not concurrently
+	 * running according to any snapshot including timetravel ones. Callers
+	 * should check TransactionDidCommit after.
+	 */
+	if (TransactionIdPrecedes(xid, TransactionXmin))
+		return FrozenGlobalCSN;
+
+	/* Read GlobalCSN from SLRU */
+	global_csn = GlobalCSNLogGetCSN(xid);
+
+	/*
+	 * If we faced InDoubt state then transaction is beeing committed and we
+	 * should wait until GlobalCSN will be assigned so that visibility check
+	 * could decide whether tuple is in snapshot. See also comments in
+	 * GlobalSnapshotPrecommit().
+	 */
+	if (GlobalCSNIsInDoubt(global_csn))
+	{
+		XactLockTableWait(xid, NULL, NULL, XLTW_None);
+		global_csn = GlobalCSNLogGetCSN(xid);
+		Assert(GlobalCSNIsNormal(global_csn) ||
+				GlobalCSNIsAborted(global_csn));
+	}
+
+	Assert(GlobalCSNIsNormal(global_csn) ||
+			GlobalCSNIsInProgress(global_csn) ||
+			GlobalCSNIsAborted(global_csn));
+
+	return global_csn;
+}
+
+/*
+ * XidInvisibleInGlobalSnapshot
+ *
+ * Version of XidInMVCCSnapshot for global transactions. For non-imported
+ * global snapshots this should give same results as XidInLocalMVCCSnapshot
+ * (except that aborts will be shown as invisible without going to clog) and to
+ * ensure such behaviour XidInMVCCSnapshot is coated with asserts that checks
+ * identicalness of XidInvisibleInGlobalSnapshot/XidInLocalMVCCSnapshot in
+ * case of ordinary snapshot.
+ */
+bool
+XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot)
+{
+	GlobalCSN csn;
+
+	Assert(track_global_snapshots);
+
+	csn = TransactionIdGetGlobalCSN(xid);
+
+	if (GlobalCSNIsNormal(csn))
+	{
+		if (csn < snapshot->global_csn)
+			return false;
+		else
+			return true;
+	}
+	else if (GlobalCSNIsFrozen(csn))
+	{
+		/* It is bootstrap or frozen transaction */
+		return false;
+	}
+	else
+	{
+		/* It is aborted or in-progress */
+		Assert(GlobalCSNIsAborted(csn) || GlobalCSNIsInProgress(csn));
+		if (GlobalCSNIsAborted(csn))
+			Assert(TransactionIdDidAbort(xid));
+		return true;
+	}
+}
+
+
+/*****************************************************************************
+ * Functions to handle distributed commit on transaction coordinator:
+ * GlobalSnapshotPrepareCurrent() / GlobalSnapshotAssignCsnCurrent().
+ * Correspoding functions for remote nodes are defined in twophase.c:
+ * pg_global_snapshot_prepare/pg_global_snapshot_assign.
+ *****************************************************************************/
+
+
+/*
+ * GlobalSnapshotPrepareCurrent
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ */
+GlobalCSN
+GlobalSnapshotPrepareCurrent()
+{
+	TransactionId xid = GetCurrentTransactionIdIfAny();
+
+	if (!track_global_snapshots)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not prepare transaction for global commit"),
+				errhint("Make sure the configuration parameter \"%s\" is enabled.",
+						"track_global_snapshots")));
+
+	if (TransactionIdIsValid(xid))
+	{
+		TransactionId *subxids;
+		int nsubxids = xactGetCommittedChildren(&subxids);
+		GlobalCSNLogSetCSN(xid, nsubxids,
+									subxids, InDoubtGlobalCSN);
+	}
+
+	/* Nothing to write if we don't have xid */
+
+	return GlobalSnapshotGenerate(false);
+}
+
+/*
+ * GlobalSnapshotAssignCsnCurrent
+ *
+ * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly
+ * maximal among of values returned by GlobalSnapshotPrepareCurrent and
+ * pg_global_snapshot_prepare.
+ */
+void
+GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn)
+{
+	if (!track_global_snapshots)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not prepare transaction for global commit"),
+				errhint("Make sure the configuration parameter \"%s\" is enabled.",
+						"track_global_snapshots")));
+
+	if (!GlobalCSNIsNormal(global_csn))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("pg_global_snapshot_assign expects normal global_csn")));
+
+	/* Skip emtpty transactions */
+	if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+		return;
+
+	/* Set global_csn and defuse ProcArrayEndTransaction from assigning one */
+	pg_atomic_write_u64(&MyProc->assignedGlobalCsn, global_csn);
+}
+
+
+/*****************************************************************************
+ * Functions to handle global and local transactions commit.
+ *
+ * For local transactions GlobalSnapshotPrecommit sets InDoubt state before
+ * ProcArrayEndTransaction is called and transaction data potetntially becomes
+ * visible to other backends. ProcArrayEndTransaction (or ProcArrayRemove in
+ * twophase case) then acquires global_csn under ProcArray lock and stores it
+ * in proc->assignedGlobalCsn. It's important that global_csn for commit is
+ * generated under ProcArray lock, otherwise global and local snapshots won't
+ * be equivalent. Consequent call to GlobalSnapshotCommit will write
+ * proc->assignedGlobalCsn to GlobalCSNLog.
+ *
+ * Same rules applies to global transaction, except that global_csn is already
+ * assigned by GlobalSnapshotAssignCsnCurrent/pg_global_snapshot_assign and
+ * GlobalSnapshotPrecommit is basically no-op.
+ *
+ * GlobalSnapshotAbort is slightly different comparing to commit because abort
+ * can skip InDoubt phase and can be called for transaction subtree.
+ *****************************************************************************/
+
+
+/*
+ * GlobalSnapshotAbort
+ *
+ * Abort transaction in GlobalCsnLog. We can skip InDoubt state for aborts
+ * since no concurrent transactions allowed to see aborted data anyway.
+ */
+void
+GlobalSnapshotAbort(PGPROC *proc, TransactionId xid,
+					int nsubxids, TransactionId *subxids)
+{
+	if (!track_global_snapshots)
+		return;
+
+	GlobalCSNLogSetCSN(xid, nsubxids, subxids, AbortedGlobalCSN);
+
+	/*
+	 * Clean assignedGlobalCsn anyway, as it was possibly set in
+	 * GlobalSnapshotAssignCsnCurrent.
+	 */
+	pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN);
+}
+
+/*
+ * GlobalSnapshotPrecommit
+ *
+ * Set InDoubt status for local transaction that we are going to commit.
+ * This step is needed to achieve consistency between local snapshots and
+ * global csn-based snapshots. We don't hold ProcArray lock while writing
+ * csn for transaction in SLRU but instead we set InDoubt status before
+ * transaction is deleted from ProcArray so the readers who will read csn
+ * in the gap between ProcArray removal and GlobalCSN assignment can wait
+ * until GlobalCSN is finally assigned. See also TransactionIdGetGlobalCSN().
+ *
+ * For global transaction this does nothing as InDoubt state was written
+ * earlier.
+ *
+ * This should be called only from parallel group leader before backend is
+ * deleted from ProcArray.
+ */
+void
+GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid,
+					int nsubxids, TransactionId *subxids)
+{
+	GlobalCSN oldAssignedGlobalCsn = InProgressGlobalCSN;
+	bool in_progress;
+
+	if (!track_global_snapshots)
+		return;
+
+	/* Set InDoubt status if it is local transaction */
+	in_progress = pg_atomic_compare_exchange_u64(&proc->assignedGlobalCsn,
+												 &oldAssignedGlobalCsn,
+												 InDoubtGlobalCSN);
+	if (in_progress)
+	{
+		Assert(GlobalCSNIsInProgress(oldAssignedGlobalCsn));
+		GlobalCSNLogSetCSN(xid, nsubxids,
+						   subxids, InDoubtGlobalCSN);
+	}
+	else
+	{
+		/* Otherwise we should have valid GlobalCSN by this time */
+		Assert(GlobalCSNIsNormal(oldAssignedGlobalCsn));
+		/* Also global transaction should already be in InDoubt state */
+		Assert(GlobalCSNIsInDoubt(GlobalCSNLogGetCSN(xid)));
+	}
+}
+
+/*
+ * GlobalSnapshotCommit
+ *
+ * Write GlobalCSN that were acquired earlier to GlobalCsnLog. Should be
+ * preceded by GlobalSnapshotPrecommit() so readers can wait until we finally
+ * finished writing to SLRU.
+ *
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks, so that TransactionIdGetGlobalCSN can wait on this
+ * lock for GlobalCSN.
+ */
+void
+GlobalSnapshotCommit(PGPROC *proc, TransactionId xid,
+					int nsubxids, TransactionId *subxids)
+{
+	volatile GlobalCSN assigned_global_csn;
+
+	if (!track_global_snapshots)
+		return;
+
+	if (!TransactionIdIsValid(xid))
+	{
+		assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn);
+		Assert(GlobalCSNIsInProgress(assigned_global_csn));
+		return;
+	}
+
+	/* Finally write resulting GlobalCSN in SLRU */
+	assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn);
+	Assert(GlobalCSNIsNormal(assigned_global_csn));
+	GlobalCSNLogSetCSN(xid, nsubxids,
+						   subxids, assigned_global_csn);
+
+	/* Reset for next transaction */
+	pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN);
+}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 0ecc02a3dd..c5d59fa2f2 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -77,6 +77,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/global_snapshot.h"
 #include "access/global_csn_log.h"
 #include "access/htup_details.h"
 #include "access/subtrans.h"
@@ -1477,8 +1478,34 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 									   hdr->nabortrels, abortrels,
 									   gid);
 
+	/*
+	 * GlobalSnapshot callbacks that should be called right before we are
+	 * going to become visible. Details in comments to this functions.
+	 */
+	if (isCommit)
+		GlobalSnapshotPrecommit(proc, xid, hdr->nsubxacts, children);
+	else
+		GlobalSnapshotAbort(proc, xid, hdr->nsubxacts, children);
+
+
 	ProcArrayRemove(proc, latestXid);
 
+	/*
+	 * Stamp our transaction with GlobalCSN in GlobalCsnLog.
+	 * Should be called after ProcArrayEndTransaction, but before releasing
+	 * transaction locks, since TransactionIdGetGlobalCSN relies on
+	 * XactLockTableWait to await global_csn.
+	 */
+	if (isCommit)
+	{
+		GlobalSnapshotCommit(proc, xid, hdr->nsubxacts, children);
+	}
+	else
+	{
+		Assert(GlobalCSNIsInProgress(
+				   pg_atomic_read_u64(&proc->assignedGlobalCsn)));
+	}
+
 	/*
 	 * In case we fail while running the callbacks, mark the gxact invalid so
 	 * no one else will try to commit/rollback, and so it will be recycled if
@@ -2439,3 +2466,132 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
 		RemoveTwoPhaseFile(xid, giveWarning);
 	RemoveGXact(gxact);
 }
+
+/*
+ * GlobalSnapshotPrepareTwophase
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ *
+ * This function is a counterpart of GlobalSnapshotPrepareCurrent() for
+ * twophase transactions.
+ */
+static GlobalCSN
+GlobalSnapshotPrepareTwophase(const char *gid)
+{
+	GlobalTransaction gxact;
+	PGXACT	   *pgxact;
+	char	   *buf;
+	TransactionId xid;
+	xl_xact_parsed_prepare parsed;
+
+	if (!track_global_snapshots)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not prepare transaction for global commit"),
+				errhint("Make sure the configuration parameter \"%s\" is enabled.",
+						"track_global_snapshots")));
+
+	/*
+	 * Validate the GID, and lock the GXACT to ensure that two backends do not
+	 * try to access the same GID at once.
+	 */
+	gxact = LockGXact(gid, GetUserId());
+	pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+	xid = pgxact->xid;
+
+	if (gxact->ondisk)
+		buf = ReadTwoPhaseFile(xid, true);
+	else
+		XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+
+	ParsePrepareRecord(0, (xl_xact_prepare *) buf, &parsed);
+
+	GlobalCSNLogSetCSN(xid, parsed.nsubxacts,
+					parsed.subxacts, InDoubtGlobalCSN);
+
+	/* Unlock our GXACT */
+	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+	gxact->locking_backend = InvalidBackendId;
+	LWLockRelease(TwoPhaseStateLock);
+
+	pfree(buf);
+
+	return GlobalSnapshotGenerate(false);
+}
+
+/*
+ * SQL interface to GlobalSnapshotPrepareTwophase()
+ *
+ * TODO: Rewrite this as PREPARE TRANSACTION 'gid' RETURNING SNAPSHOT
+ */
+Datum
+pg_global_snapshot_prepare(PG_FUNCTION_ARGS)
+{
+	const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	GlobalCSN	global_csn;
+
+	global_csn = GlobalSnapshotPrepareTwophase(gid);
+
+	PG_RETURN_INT64(global_csn);
+}
+
+
+/*
+ * TwoPhaseAssignGlobalCsn
+ *
+ * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly
+ * maximal among of values returned by GlobalSnapshotPrepareCurrent and
+ * pg_global_snapshot_prepare.
+ *
+ * This function is a counterpart of GlobalSnapshotAssignCsnCurrent() for
+ * twophase transactions.
+ */
+static void
+GlobalSnapshotAssignCsnTwoPhase(const char *gid, GlobalCSN global_csn)
+{
+	GlobalTransaction gxact;
+	PGPROC	   *proc;
+
+	if (!track_global_snapshots)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not prepare transaction for global commit"),
+				errhint("Make sure the configuration parameter \"%s\" is enabled.",
+						"track_global_snapshots")));
+
+	if (!GlobalCSNIsNormal(global_csn))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("pg_global_snapshot_assign expects normal global_csn")));
+
+	/*
+	 * Validate the GID, and lock the GXACT to ensure that two backends do not
+	 * try to access the same GID at once.
+	 */
+	gxact = LockGXact(gid, GetUserId());
+	proc = &ProcGlobal->allProcs[gxact->pgprocno];
+
+	/* Set global_csn and defuse ProcArrayRemove from assigning one. */
+	pg_atomic_write_u64(&proc->assignedGlobalCsn, global_csn);
+
+	/* Unlock our GXACT */
+	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+	gxact->locking_backend = InvalidBackendId;
+	LWLockRelease(TwoPhaseStateLock);
+}
+
+/*
+ * SQL interface to GlobalSnapshotAssignCsnTwoPhase()
+ *
+ * TODO: Rewrite this as COMMIT PREPARED 'gid' SNAPSHOT 'global_csn'
+ */
+Datum
+pg_global_snapshot_assign(PG_FUNCTION_ARGS)
+{
+	const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	GlobalCSN	global_csn = PG_GETARG_INT64(1);
+
+	GlobalSnapshotAssignCsnTwoPhase(gid, global_csn);
+	PG_RETURN_VOID();
+}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 3984dd3e1a..8fddb6edaf 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -21,6 +21,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/global_snapshot.h"
 #include "access/multixact.h"
 #include "access/parallel.h"
 #include "access/subtrans.h"
@@ -1433,6 +1434,14 @@ RecordTransactionCommit(void)
 
 	/* Reset XactLastRecEnd until the next transaction writes something */
 	XactLastRecEnd = 0;
+
+	/*
+	 * Mark our transaction as InDoubt in GlobalCsnLog and get ready for
+	 * commit.
+	 */
+	if (markXidCommitted)
+		GlobalSnapshotPrecommit(MyProc, xid, nchildren, children);
+
 cleanup:
 	/* Clean up local data */
 	if (rels)
@@ -1694,6 +1703,11 @@ RecordTransactionAbort(bool isSubXact)
 	 */
 	TransactionIdAbortTree(xid, nchildren, children);
 
+	/*
+	 * Mark our transaction as Aborted in GlobalCsnLog.
+	 */
+	GlobalSnapshotAbort(MyProc, xid, nchildren, children);
+
 	END_CRIT_SECTION();
 
 	/* Compute latestXid while we have the child XIDs handy */
@@ -2183,6 +2197,21 @@ CommitTransaction(void)
 	 */
 	ProcArrayEndTransaction(MyProc, latestXid);
 
+	/*
+	 * Stamp our transaction with GlobalCSN in GlobalCsnLog.
+	 * Should be called after ProcArrayEndTransaction, but before releasing
+	 * transaction locks.
+	 */
+	if (!is_parallel_worker)
+	{
+		TransactionId  xid = GetTopTransactionIdIfAny();
+		TransactionId *subxids;
+		int			   nsubxids;
+
+		nsubxids = xactGetCommittedChildren(&subxids);
+		GlobalSnapshotCommit(MyProc, xid, nsubxids, subxids);
+	}
+
 	/*
 	 * This is all post-commit cleanup.  Note that if an error is raised here,
 	 * it's too late to abort the transaction.  This should be just
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 285d9d442e..b485db5456 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7058,6 +7058,7 @@ StartupXLOG(void)
 			StartupCLOG();
 			StartupGlobalCSNLog(oldestActiveXID);
 			StartupSUBTRANS(oldestActiveXID);
+			GlobalSnapshotStartup(oldestActiveXID);
 
 			/*
 			 * If we're beginning at a shutdown checkpoint, we know that
@@ -7876,6 +7877,7 @@ StartupXLOG(void)
 		StartupCLOG();
 		StartupGlobalCSNLog(oldestActiveXID);
 		StartupSUBTRANS(oldestActiveXID);
+		GlobalSnapshotStartup(oldestActiveXID);
 	}
 
 	/*
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index dc2d2959c4..d1819dc2c8 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/nbtree.h"
 #include "access/subtrans.h"
 #include "access/twophase.h"
+#include "access/global_snapshot.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -145,6 +146,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, WalSndShmemSize());
 		size = add_size(size, WalRcvShmemSize());
 		size = add_size(size, ApplyLauncherShmemSize());
+		size = add_size(size, GlobalSnapshotShmemSize());
 		size = add_size(size, SnapMgrShmemSize());
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
@@ -266,6 +268,7 @@ CreateSharedMemoryAndSemaphores(void)
 	BTreeShmemInit();
 	SyncScanShmemInit();
 	AsyncShmemInit();
+	GlobalSnapshotShmemInit();
 
 #ifdef EXEC_BACKEND
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 8ae4906474..23db5039a4 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -47,6 +47,7 @@
 
 #include "access/clog.h"
 #include "access/global_csn_log.h"
+#include "access/global_snapshot.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -95,6 +96,8 @@ typedef struct ProcArrayStruct
 	TransactionId replication_slot_xmin;
 	/* oldest catalog xmin of any replication slot */
 	TransactionId replication_slot_catalog_xmin;
+	/* xmin of oldest active global snapshot */
+	TransactionId global_snapshot_xmin;
 
 	/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
 	int			pgprocnos[FLEXIBLE_ARRAY_MEMBER];
@@ -250,6 +253,7 @@ CreateSharedProcArray(void)
 		procArray->lastOverflowedXid = InvalidTransactionId;
 		procArray->replication_slot_xmin = InvalidTransactionId;
 		procArray->replication_slot_catalog_xmin = InvalidTransactionId;
+		procArray->global_snapshot_xmin = InvalidTransactionId;
 	}
 
 	allProcs = ProcGlobal->allProcs;
@@ -355,6 +359,17 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
 		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
 								  latestXid))
 			ShmemVariableCache->latestCompletedXid = latestXid;
+
+		/*
+		 * Assign global csn while holding ProcArrayLock for non-global
+		 * COMMIT PREPARED. After lock is released consequent
+		 * GlobalSnapshotCommit() will write this value to GlobalCsnLog.
+		 *
+		 * In case of global commit proc->assignedGlobalCsn is already set
+		 * by prior AssignGlobalCsn().
+		 */
+		if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn)))
+			pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false));
 	}
 	else
 	{
@@ -435,6 +450,8 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 
 		proc->lxid = InvalidLocalTransactionId;
 		pgxact->xmin = InvalidTransactionId;
+		proc->originalXmin = InvalidTransactionId;
+
 		/* must be cleared with xid/xmin: */
 		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
 		proc->delayChkpt = false; /* be sure this is cleared in abort */
@@ -457,6 +474,8 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
 	pgxact->xid = InvalidTransactionId;
 	proc->lxid = InvalidLocalTransactionId;
 	pgxact->xmin = InvalidTransactionId;
+	proc->originalXmin = InvalidTransactionId;
+
 	/* must be cleared with xid/xmin: */
 	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
 	proc->delayChkpt = false; /* be sure this is cleared in abort */
@@ -470,6 +489,20 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
 							  latestXid))
 		ShmemVariableCache->latestCompletedXid = latestXid;
+
+	/*
+	 * Assign global csn while holding ProcArrayLock for non-global
+	 * COMMIT. After lock is released consequent GlobalSnapshotFinish() will
+	 * write this value to GlobalCsnLog.
+	 *
+	 * In case of global commit MyProc->assignedGlobalCsn is already set
+	 * by prior AssignGlobalCsn().
+	 *
+	 * TODO: in case of group commit we can generate one GlobalSnapshot for
+	 * whole group to save time on timestamp aquisition.
+	 */
+	if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn)))
+		pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false));
 }
 
 /*
@@ -613,6 +646,7 @@ ProcArrayClearTransaction(PGPROC *proc)
 	pgxact->xid = InvalidTransactionId;
 	proc->lxid = InvalidLocalTransactionId;
 	pgxact->xmin = InvalidTransactionId;
+	proc->originalXmin = InvalidTransactionId;
 	proc->recoveryConflictPending = false;
 
 	/* redundant, but just in case */
@@ -1315,6 +1349,7 @@ GetOldestXmin(Relation rel, int flags)
 
 	TransactionId replication_slot_xmin = InvalidTransactionId;
 	TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+	TransactionId global_snapshot_xmin = InvalidTransactionId;
 
 	/*
 	 * If we're not computing a relation specific limit, or if a shared
@@ -1351,8 +1386,9 @@ GetOldestXmin(Relation rel, int flags)
 			proc->databaseId == MyDatabaseId ||
 			proc->databaseId == 0)	/* always include WalSender */
 		{
-			/* Fetch xid just once - see GetNewTransactionId */
+			/* Fetch both xids just once - see GetNewTransactionId */
 			TransactionId xid = UINT32_ACCESS_ONCE(pgxact->xid);
+			TransactionId original_xmin = UINT32_ACCESS_ONCE(proc->originalXmin);
 
 			/* First consider the transaction's own Xid, if any */
 			if (TransactionIdIsNormal(xid) &&
@@ -1365,8 +1401,17 @@ GetOldestXmin(Relation rel, int flags)
 			 * We must check both Xid and Xmin because a transaction might
 			 * have an Xmin but not (yet) an Xid; conversely, if it has an
 			 * Xid, that could determine some not-yet-set Xmin.
+			 *
+			 * In case of oldestXmin calculation for GlobalSnapshotMapXmin()
+			 * pgxact->xmin should be changed to proc->originalXmin. Details
+			 * in commets to GlobalSnapshotMapXmin.
 			 */
-			xid = UINT32_ACCESS_ONCE(pgxact->xmin);
+			if ((flags & PROCARRAY_NON_IMPORTED_XMIN) &&
+					TransactionIdIsValid(original_xmin))
+				xid = original_xmin;
+			else
+				xid = UINT32_ACCESS_ONCE(pgxact->xmin);
+
 			if (TransactionIdIsNormal(xid) &&
 				TransactionIdPrecedes(xid, result))
 				result = xid;
@@ -1380,6 +1425,7 @@ GetOldestXmin(Relation rel, int flags)
 	 */
 	replication_slot_xmin = procArray->replication_slot_xmin;
 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+	global_snapshot_xmin = ProcArrayGetGlobalSnapshotXmin();
 
 	if (RecoveryInProgress())
 	{
@@ -1421,6 +1467,11 @@ GetOldestXmin(Relation rel, int flags)
 			result = FirstNormalTransactionId;
 	}
 
+	if (!(flags & PROCARRAY_NON_IMPORTED_XMIN) &&
+		TransactionIdIsValid(global_snapshot_xmin) &&
+		NormalTransactionIdPrecedes(global_snapshot_xmin, result))
+		result = global_snapshot_xmin;
+
 	/*
 	 * Check whether there are replication slots requiring an older xmin.
 	 */
@@ -1515,8 +1566,10 @@ GetSnapshotData(Snapshot snapshot)
 	int			count = 0;
 	int			subcount = 0;
 	bool		suboverflowed = false;
+	GlobalCSN	global_csn = FrozenGlobalCSN;
 	TransactionId replication_slot_xmin = InvalidTransactionId;
 	TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+	TransactionId global_snapshot_xmin = InvalidTransactionId;
 
 	Assert(snapshot != NULL);
 
@@ -1708,10 +1761,18 @@ GetSnapshotData(Snapshot snapshot)
 	 */
 	replication_slot_xmin = procArray->replication_slot_xmin;
 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+	global_snapshot_xmin = ProcArrayGetGlobalSnapshotXmin();
 
 	if (!TransactionIdIsValid(MyPgXact->xmin))
 		MyPgXact->xmin = TransactionXmin = xmin;
 
+	/*
+	 * Take GlobalCSN under ProcArrayLock so the local/global snapshot stays
+	 * synchronized.
+	 */
+	if (track_global_snapshots)
+		global_csn = GlobalSnapshotGenerate(false);
+
 	LWLockRelease(ProcArrayLock);
 
 	/*
@@ -1727,6 +1788,10 @@ GetSnapshotData(Snapshot snapshot)
 	if (!TransactionIdIsNormal(RecentGlobalXmin))
 		RecentGlobalXmin = FirstNormalTransactionId;
 
+	if (/*track_global_snapshots && */TransactionIdIsValid(global_snapshot_xmin) &&
+		TransactionIdPrecedes(global_snapshot_xmin, RecentGlobalXmin))
+		RecentGlobalXmin = global_snapshot_xmin;
+
 	/* Check whether there's a replication slot requiring an older xmin. */
 	if (TransactionIdIsValid(replication_slot_xmin) &&
 		NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
@@ -1782,6 +1847,11 @@ GetSnapshotData(Snapshot snapshot)
 		MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
 	}
 
+	snapshot->imported_global_csn = false;
+	snapshot->global_csn = global_csn;
+	if (global_snapshot_defer_time > 0 && IsUnderPostmaster)
+		GlobalSnapshotMapXmin(snapshot->global_csn);
+
 	return snapshot;
 }
 
@@ -3129,6 +3199,24 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 	LWLockRelease(ProcArrayLock);
 }
 
+/*
+ * ProcArraySetGlobalSnapshotXmin
+ */
+void
+ProcArraySetGlobalSnapshotXmin(TransactionId xmin)
+{
+	/* We rely on atomic fetch/store of xid */
+	procArray->global_snapshot_xmin = xmin;
+}
+
+/*
+ * ProcArrayGetGlobalSnapshotXmin
+ */
+TransactionId
+ProcArrayGetGlobalSnapshotXmin(void)
+{
+	return procArray->global_snapshot_xmin;
+}
 
 #define XidCacheRemove(i) \
 	do { \
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index fe18c93b61..86d0a0acae 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ OldSnapshotTimeMapLock				42
 LogicalRepWorkerLock				43
 CLogTruncationLock					44
 GlobalCSNLogControlLock				45
+GlobalSnapshotXidMapLock			46
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 5aa19d3f78..8a47a2d375 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -37,6 +37,7 @@
 
 #include "access/transam.h"
 #include "access/twophase.h"
+#include "access/global_snapshot.h"
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -441,6 +442,9 @@ InitProcess(void)
 	MyProc->clogGroupMemberLsn = InvalidXLogRecPtr;
 	Assert(pg_atomic_read_u32(&MyProc->clogGroupNext) == INVALID_PGPROCNO);
 
+	MyProc->originalXmin = InvalidTransactionId;
+	pg_atomic_init_u64(&MyProc->assignedGlobalCsn, InProgressGlobalCSN);
+
 	/*
 	 * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch
 	 * on it.  That allows us to repoint the process latch, which so far
@@ -584,6 +588,7 @@ InitAuxiliaryProcess(void)
 	MyProc->lwWaitMode = 0;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
+	MyProc->originalXmin = InvalidTransactionId;
 #ifdef USE_ASSERT_CHECKING
 	{
 		int			i;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 0ca331c6f9..7154c3499e 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -28,6 +28,7 @@
 
 #include "access/commit_ts.h"
 #include "access/gin.h"
+#include "access/global_snapshot.h"
 #include "access/rmgr.h"
 #include "access/tableam.h"
 #include "access/transam.h"
@@ -1181,7 +1182,7 @@ static struct config_bool ConfigureNamesBool[] =
 			gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.")
 		},
 		&track_global_snapshots,
-		true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */
+		false, /* XXX: Seems that RESOURCES_MEM isn't the best catagory */
 		NULL, NULL, NULL
 	},
 	{
@@ -2495,6 +2496,16 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"global_snapshot_defer_time", PGC_POSTMASTER, REPLICATION_MASTER,
+			gettext_noop("Minimal age of records which allowed to be vacuumed, in seconds."),
+			NULL
+		},
+		&global_snapshot_defer_time,
+		5, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	/*
 	 * See also CheckRequiredParameterValues() if this parameter changes
 	 */
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 995b6ca155..0fd7d8501c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -306,6 +306,8 @@
 				# and comma-separated list of application_name
 				# from standby(s); '*' = all
 #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
+#global_snapshot_defer_time = 0	# minimal age of records which allowed to be
+				# vacuumed, in seconds
 
 # - Standby Servers -
 
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 1c063c592c..3d925a7866 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -48,6 +48,7 @@
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include "access/global_snapshot.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -247,6 +248,8 @@ typedef struct SerializedSnapshotData
 	CommandId	curcid;
 	TimestampTz whenTaken;
 	XLogRecPtr	lsn;
+	GlobalCSN	global_csn;
+	bool		imported_global_csn;
 } SerializedSnapshotData;
 
 Size
@@ -1024,7 +1027,9 @@ SnapshotResetXmin(void)
 										pairingheap_first(&RegisteredSnapshots));
 
 	if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin))
+	{
 		MyPgXact->xmin = minSnapshot->xmin;
+	}
 }
 
 /*
@@ -2115,6 +2120,8 @@ SerializeSnapshot(Snapshot snapshot, char *start_address)
 	serialized_snapshot.curcid = snapshot->curcid;
 	serialized_snapshot.whenTaken = snapshot->whenTaken;
 	serialized_snapshot.lsn = snapshot->lsn;
+	serialized_snapshot.global_csn = snapshot->global_csn;
+	serialized_snapshot.imported_global_csn = snapshot->imported_global_csn;
 
 	/*
 	 * Ignore the SubXID array if it has overflowed, unless the snapshot was
@@ -2189,6 +2196,8 @@ RestoreSnapshot(char *start_address)
 	snapshot->curcid = serialized_snapshot.curcid;
 	snapshot->whenTaken = serialized_snapshot.whenTaken;
 	snapshot->lsn = serialized_snapshot.lsn;
+	snapshot->global_csn = serialized_snapshot.global_csn;
+	snapshot->imported_global_csn = serialized_snapshot.imported_global_csn;
 
 	/* Copy XIDs, if present. */
 	if (serialized_snapshot.xcnt > 0)
@@ -2228,8 +2237,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
 }
 
 /*
- * XidInMVCCSnapshot
- *		Is the given XID still-in-progress according to the snapshot?
+ * XidInLocalMVCCSnapshot
+ *		Is the given XID still-in-progress according to the local snapshot?
  *
  * Note: GetSnapshotData never stores either top xid or subxids of our own
  * backend into a snapshot, so these xids will not be reported as "running"
@@ -2237,8 +2246,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
  * TransactionIdIsCurrentTransactionId first, except when it's known the
  * XID could not be ours anyway.
  */
-bool
-XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+static bool
+XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 {
 	uint32		i;
 
@@ -2348,3 +2357,153 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 
 	return false;
 }
+
+/*
+ * XidInMVCCSnapshot
+ *
+ * Check whether this xid is in snapshot, taking into account fact that
+ * snapshot can be global. When track_global_snapshots is switched off
+ * just call XidInLocalMVCCSnapshot().
+ */
+bool
+XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+{
+	bool in_snapshot;
+
+	if (snapshot->imported_global_csn)
+	{
+		Assert(track_global_snapshots);
+		/* No point to using snapshot info except CSN */
+		return XidInvisibleInGlobalSnapshot(xid, snapshot);
+	}
+
+	in_snapshot = XidInLocalMVCCSnapshot(xid, snapshot);
+
+	if (!track_global_snapshots)
+	{
+		Assert(GlobalCSNIsFrozen(snapshot->global_csn));
+		return in_snapshot;
+	}
+
+	if (in_snapshot)
+	{
+		/*
+		 * This xid may be already in unknown state and in that case
+		 * we must wait and recheck.
+		 *
+		 * TODO: this check can be skipped if we know for sure that there were
+		 * no global transactions when this snapshot was taken. That requires
+		 * some changes to mechanisms of global snapshots exprot/import (if
+		 * backend set xmin then we should have a-priori knowledge that this
+		 * transaction going to be global or local -- right now this is not
+		 * enforced). Leave that for future and don't complicate this patch.
+		 */
+		return XidInvisibleInGlobalSnapshot(xid, snapshot);
+	}
+	else
+	{
+#ifdef USE_ASSERT_CHECKING
+		/* Check that global snapshot gives the same results as local one */
+		if (XidInvisibleInGlobalSnapshot(xid, snapshot))
+		{
+			GlobalCSN gcsn = TransactionIdGetGlobalCSN(xid);
+			Assert(GlobalCSNIsAborted(gcsn));
+		}
+#endif
+		return false;
+	}
+}
+
+/*
+ * ExportGlobalSnapshot
+ *
+ * Export global_csn so that caller can expand this transaction to other
+ * nodes.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+GlobalCSN
+ExportGlobalSnapshot()
+{
+	if (!track_global_snapshots)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			 errmsg("could not export global snapshot"),
+			 errhint("Make sure the configuration parameter \"%s\" is enabled.",
+					 "track_global_snapshots")));
+
+	return CurrentSnapshot->global_csn;
+}
+
+/* SQL accessor to ExportGlobalSnapshot() */
+Datum
+pg_global_snapshot_export(PG_FUNCTION_ARGS)
+{
+	GlobalCSN	global_csn = ExportGlobalSnapshot();
+	PG_RETURN_UINT64(global_csn);
+}
+
+/*
+ * ImportGlobalSnapshot
+ *
+ * Import global_csn and retract this backends xmin to the value that was
+ * actual when we had such global_csn.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+void
+ImportGlobalSnapshot(GlobalCSN snap_global_csn)
+{
+	volatile TransactionId xmin;
+
+	if (!track_global_snapshots)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			 errmsg("could not import global snapshot"),
+			 errhint("Make sure the configuration parameter \"%s\" is enabled.",
+					 "track_global_snapshots")));
+
+	if (global_snapshot_defer_time <= 0)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			 errmsg("could not import global snapshot"),
+			 errhint("Make sure the configuration parameter \"%s\" is positive.",
+					 "global_snapshot_defer_time")));
+
+	/*
+	 * Call GlobalSnapshotToXmin under ProcArrayLock to avoid situation that
+	 * resulting xmin will be evicted from map before we will set it into our
+	 * backend's xmin.
+	 */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	xmin = GlobalSnapshotToXmin(snap_global_csn);
+	if (!TransactionIdIsValid(xmin))
+	{
+		LWLockRelease(ProcArrayLock);
+		elog(ERROR, "GlobalSnapshotToXmin: global snapshot too old");
+	}
+	MyProc->originalXmin = MyPgXact->xmin;
+	MyPgXact->xmin = TransactionXmin = xmin;
+	LWLockRelease(ProcArrayLock);
+
+	CurrentSnapshot->xmin = xmin; /* defuse SnapshotResetXmin() */
+	CurrentSnapshot->global_csn = snap_global_csn;
+	CurrentSnapshot->imported_global_csn = true;
+	GlobalSnapshotSync(snap_global_csn);
+
+	Assert(TransactionIdPrecedesOrEquals(RecentGlobalXmin, xmin));
+	Assert(TransactionIdPrecedesOrEquals(RecentGlobalDataXmin, xmin));
+}
+
+/* SQL accessor to ImportGlobalSnapshot() */
+Datum
+pg_global_snapshot_import(PG_FUNCTION_ARGS)
+{
+	GlobalCSN	global_csn = PG_GETARG_UINT64(0);
+	ImportGlobalSnapshot(global_csn);
+	PG_RETURN_VOID();
+}
diff --git a/src/include/access/global_snapshot.h b/src/include/access/global_snapshot.h
new file mode 100644
index 0000000000..246b180cfd
--- /dev/null
+++ b/src/include/access/global_snapshot.h
@@ -0,0 +1,72 @@
+/*-------------------------------------------------------------------------
+ *
+ * global_snapshot.h
+ *	  Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/global_snapshot.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef GLOBAL_SNAPSHOT_H
+#define GLOBAL_SNAPSHOT_H
+
+#include "port/atomics.h"
+#include "storage/lock.h"
+#include "utils/snapshot.h"
+#include "utils/guc.h"
+
+/*
+ * snapshot.h is used in frontend code so atomic variant of GlobalCSN type
+ * is defined here.
+ */
+typedef pg_atomic_uint64 GlobalCSN_atomic;
+
+#define InProgressGlobalCSN	 UINT64CONST(0x0)
+#define AbortedGlobalCSN	 UINT64CONST(0x1)
+#define FrozenGlobalCSN		 UINT64CONST(0x2)
+#define InDoubtGlobalCSN	 UINT64CONST(0x3)
+#define FirstNormalGlobalCSN UINT64CONST(0x4)
+
+#define GlobalCSNIsInProgress(csn)	((csn) == InProgressGlobalCSN)
+#define GlobalCSNIsAborted(csn)		((csn) == AbortedGlobalCSN)
+#define GlobalCSNIsFrozen(csn)		((csn) == FrozenGlobalCSN)
+#define GlobalCSNIsInDoubt(csn)		((csn) == InDoubtGlobalCSN)
+#define GlobalCSNIsNormal(csn)		((csn) >= FirstNormalGlobalCSN)
+
+
+extern int global_snapshot_defer_time;
+
+
+extern Size GlobalSnapshotShmemSize(void);
+extern void GlobalSnapshotShmemInit(void);
+extern void GlobalSnapshotStartup(TransactionId oldestActiveXID);
+
+extern void GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn);
+extern TransactionId GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn);
+
+extern GlobalCSN GlobalSnapshotGenerate(bool locked);
+
+extern bool XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot);
+
+extern void GlobalSnapshotSync(GlobalCSN remote_gcsn);
+
+extern GlobalCSN TransactionIdGetGlobalCSN(TransactionId xid);
+
+extern GlobalCSN GlobalSnapshotPrepareGlobal(const char *gid);
+extern void GlobalSnapshotAssignCsnGlobal(const char *gid,
+										  GlobalCSN global_csn);
+
+extern GlobalCSN GlobalSnapshotPrepareCurrent(void);
+extern void GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn);
+
+extern void GlobalSnapshotAbort(PGPROC *proc, TransactionId xid, int nsubxids,
+								TransactionId *subxids);
+extern void GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids,
+									TransactionId *subxids);
+extern void GlobalSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids,
+									TransactionId *subxids);
+
+#endif							/* GLOBAL_SNAPSHOT_H */
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 2ca71c3445..b4899f3754 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -18,6 +18,7 @@
 #include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
+#include "utils/snapshot.h"
 
 /*
  * GlobalTransactionData is defined in twophase.c; other places have no
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a85c78e796..64c3c71df3 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10945,4 +10945,18 @@
   proname => 'is_normalized', prorettype => 'bool',
   proargtypes => 'text text', prosrc => 'unicode_is_normalized' },
 
+# global transaction handling
+{ oid => '4388', descr => 'export global transaction snapshot',
+  proname => 'pg_global_snapshot_export', provolatile => 'v', proparallel => 'u',
+  prorettype => 'int8', proargtypes => '', prosrc => 'pg_global_snapshot_export' },
+{ oid => '4389', descr => 'import global transaction snapshot',
+  proname => 'pg_global_snapshot_import', provolatile => 'v', proparallel => 'u',
+  prorettype => 'void', proargtypes => 'int8', prosrc => 'pg_global_snapshot_import' },
+{ oid => '4390', descr => 'prepare distributed transaction for commit, get global_csn',
+  proname => 'pg_global_snapshot_prepare', provolatile => 'v', proparallel => 'u',
+  prorettype => 'int8', proargtypes => 'text', prosrc => 'pg_global_snapshot_prepare' },
+{ oid => '4391', descr => 'assign global_csn to distributed transaction',
+  proname => 'pg_global_snapshot_assign', provolatile => 'v', proparallel => 'u',
+  prorettype => 'void', proargtypes => 'text int8', prosrc => 'pg_global_snapshot_assign' },
+
 ]
diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h
index 6be6d35d1e..583b1beea5 100644
--- a/src/include/datatype/timestamp.h
+++ b/src/include/datatype/timestamp.h
@@ -93,6 +93,9 @@ typedef struct
 #define USECS_PER_MINUTE INT64CONST(60000000)
 #define USECS_PER_SEC	INT64CONST(1000000)
 
+#define NSECS_PER_SEC	INT64CONST(1000000000)
+#define NSECS_PER_USEC	INT64CONST(1000)
+
 /*
  * We allow numeric timezone offsets up to 15:59:59 either way from Greenwich.
  * Currently, the record holders for wackiest offsets in actual use are zones
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index d349510b7c..5cdf2e17cb 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -280,6 +280,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum);
 #define PG_GETARG_FLOAT4(n)  DatumGetFloat4(PG_GETARG_DATUM(n))
 #define PG_GETARG_FLOAT8(n)  DatumGetFloat8(PG_GETARG_DATUM(n))
 #define PG_GETARG_INT64(n)	 DatumGetInt64(PG_GETARG_DATUM(n))
+#define PG_GETARG_UINT64(n)	 DatumGetUInt64(PG_GETARG_DATUM(n))
 /* use this if you want the raw, possibly-toasted input datum: */
 #define PG_GETARG_RAW_VARLENA_P(n)	((struct varlena *) PG_GETARG_POINTER(n))
 /* use this if you want the input datum de-toasted: */
diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h
index d6459327cc..4ac23da654 100644
--- a/src/include/portability/instr_time.h
+++ b/src/include/portability/instr_time.h
@@ -141,6 +141,9 @@ typedef struct timespec instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
 	(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) ((t).tv_nsec / 1000))
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+	(((uint64) (t).tv_sec * (uint64) 1000000000) + (uint64) ((t).tv_nsec))
+
 #else							/* !HAVE_CLOCK_GETTIME */
 
 /* Use gettimeofday() */
@@ -205,6 +208,10 @@ typedef struct timeval instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
 	(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) (t).tv_usec)
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+	(((uint64) (t).tv_sec * (uint64) 1000000000) + \
+		(uint64) (t).tv_usec * (uint64) 1000)
+
 #endif							/* HAVE_CLOCK_GETTIME */
 
 #else							/* WIN32 */
@@ -237,6 +244,9 @@ typedef LARGE_INTEGER instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
 	((uint64) (((double) (t).QuadPart * 1000000.0) / GetTimerFrequency()))
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+	((uint64) (((double) (t).QuadPart * 1000000000.0) / GetTimerFrequency()))
+
 static inline double
 GetTimerFrequency(void)
 {
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index ae4f573ab4..da84dbf04c 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -15,8 +15,10 @@
 #define _PROC_H_
 
 #include "access/clog.h"
+#include "access/global_snapshot.h"
 #include "access/xlogdefs.h"
 #include "lib/ilist.h"
+#include "utils/snapshot.h"
 #include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
@@ -57,6 +59,7 @@ struct XidCache
 #define		PROC_IN_LOGICAL_DECODING	0x10	/* currently doing logical
 												 * decoding outside xact */
 #define		PROC_RESERVED				0x20	/* reserved for procarray */
+#define		PROC_RESERVED2				0x40	/* reserved for procarray */
 
 /* flags reset at EOXact */
 #define		PROC_VACUUM_STATE_MASK \
@@ -205,6 +208,18 @@ struct PGPROC
 	PGPROC	   *lockGroupLeader;	/* lock group leader, if I'm a member */
 	dlist_head	lockGroupMembers;	/* list of members, if I'm a leader */
 	dlist_node	lockGroupLink;	/* my member link, if I'm a member */
+
+	/*
+	 * assignedGlobalCsn holds GlobalCSN for this transaction.  It is generated
+	 * under a ProcArray lock and later is writter to a GlobalCSNLog.  This
+	 * variable defined as atomic only for case of group commit, in all other
+	 * scenarios only backend responsible for this proc entry is working with
+	 * this variable.
+	 */
+	GlobalCSN_atomic assignedGlobalCsn;
+
+	/* Original xmin of this backend before global snapshot was imported */
+	TransactionId originalXmin;
 };
 
 /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index a5c7d0c064..452ae5d547 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -36,6 +36,10 @@
 
 #define		PROCARRAY_SLOTS_XMIN			0x20	/* replication slot xmin,
 													 * catalog_xmin */
+
+#define		PROCARRAY_NON_IMPORTED_XMIN		0x40	/* use originalXmin instead
+													 * of xmin to properly
+													 * maintain gsXidMap */
 /*
  * Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching
  * PGXACT->vacuumFlags. Other flags are used for different purposes and
@@ -125,4 +129,8 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 											TransactionId *catalog_xmin);
 
+extern void ProcArraySetGlobalSnapshotXmin(TransactionId xmin);
+
+extern TransactionId ProcArrayGetGlobalSnapshotXmin(void);
+
 #endif							/* PROCARRAY_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index b28d13ce84..f4768bc6d4 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -127,6 +127,9 @@ extern void AtSubCommit_Snapshot(int level);
 extern void AtSubAbort_Snapshot(int level);
 extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin);
 
+extern GlobalCSN ExportGlobalSnapshot(void);
+extern void ImportGlobalSnapshot(GlobalCSN snap_global_csn);
+
 extern void ImportSnapshot(const char *idstr);
 extern bool XactHasExportedSnapshots(void);
 extern void DeleteAllExportedSnapshotFiles(void);
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 57d2dfaa67..71c92c69f4 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -204,6 +204,14 @@ typedef struct SnapshotData
 
 	TimestampTz whenTaken;		/* timestamp when snapshot was taken */
 	XLogRecPtr	lsn;			/* position in the WAL stream when taken */
+
+	/*
+	 * GlobalCSN for cross-node snapshot isolation support.
+	 * Will be used only if track_global_snapshots is enabled.
+	 */
+	GlobalCSN	global_csn;
+	/* Did we have our own global_csn or imported one from different node */
+	bool		imported_global_csn;
 } SnapshotData;
 
 #endif							/* SNAPSHOT_H */
-- 
2.17.1

>From d3b8cb68ca1bb8721f738c4993083ea4cca3d255 Mon Sep 17 00:00:00 2001
From: "Andrey V. Lepikhov" <a.lepik...@postgrespro.ru>
Date: Tue, 12 May 2020 08:31:59 +0500
Subject: [PATCH 3/3] postgres_fdw-support-for-global-snapshots-v3

---
 contrib/postgres_fdw/Makefile                 |   9 +
 contrib/postgres_fdw/connection.c             | 290 ++++++++++++++++--
 contrib/postgres_fdw/postgres_fdw.c           |  12 +
 contrib/postgres_fdw/postgres_fdw.h           |   2 +
 .../postgres_fdw/t/001_bank_coordinator.pl    | 264 ++++++++++++++++
 .../postgres_fdw/t/002_bank_participant.pl    | 240 +++++++++++++++
 src/test/perl/PostgresNode.pm                 |  35 +++
 7 files changed, 826 insertions(+), 26 deletions(-)
 create mode 100644 contrib/postgres_fdw/t/001_bank_coordinator.pl
 create mode 100644 contrib/postgres_fdw/t/002_bank_participant.pl

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index ee8a80a392..07091f630e 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -29,3 +29,12 @@ top_builddir = ../..
 include $(top_builddir)/src/Makefile.global
 include $(top_srcdir)/contrib/contrib-global.mk
 endif
+
+# Global makefile will do temp-install for 'check'. Since REGRESS is defined,
+# PGXS (included from contrib-global.mk or directly) will care to add
+# postgres_fdw to it as EXTRA_INSTALL and build pg_regress. It will also
+# actually run pg_regress, so the only thing left is tap tests.
+check: tapcheck
+
+tapcheck: temp-install
+	$(prove_check)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index e45647f3ea..8e33ae0af7 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -12,8 +12,10 @@
  */
 #include "postgres.h"
 
+#include "access/global_snapshot.h"
 #include "access/htup_details.h"
 #include "access/xact.h"
+#include "access/xlog.h" /* GetSystemIdentifier() */
 #include "catalog/pg_user_mapping.h"
 #include "commands/defrem.h"
 #include "mb/pg_wchar.h"
@@ -25,6 +27,8 @@
 #include "utils/hsearch.h"
 #include "utils/inval.h"
 #include "utils/memutils.h"
+#include "utils/snapmgr.h"
+#include "utils/snapshot.h"
 #include "utils/syscache.h"
 
 /*
@@ -65,6 +69,21 @@ typedef struct ConnCacheEntry
  */
 static HTAB *ConnectionHash = NULL;
 
+/*
+ * FdwTransactionState
+ *
+ * Holds number of open remote transactions and shared state
+ * needed for all connection entries.
+ */
+typedef struct FdwTransactionState
+{
+	char		*gid;
+	int			nparticipants;
+	GlobalCSN	global_csn;
+	bool		two_phase_commit;
+} FdwTransactionState;
+static FdwTransactionState *fdwTransState;
+
 /* for assigning cursor numbers and prepared statement numbers */
 static unsigned int cursor_number = 0;
 static unsigned int prep_stmt_number = 0;
@@ -72,6 +91,9 @@ static unsigned int prep_stmt_number = 0;
 /* tracks whether any work is needed in callback functions */
 static bool xact_got_connection = false;
 
+/* counter of prepared tx made by this backend */
+static int two_phase_xact_count = 0;
+
 /* prototypes of private functions */
 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void disconnect_pg_server(ConnCacheEntry *entry);
@@ -80,6 +102,7 @@ static void configure_remote_session(PGconn *conn);
 static void do_sql_command(PGconn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
+static void deallocate_prepared_stmts(ConnCacheEntry *entry);
 static void pgfdw_subxact_callback(SubXactEvent event,
 								   SubTransactionId mySubid,
 								   SubTransactionId parentSubid,
@@ -136,6 +159,15 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 									  pgfdw_inval_callback, (Datum) 0);
 	}
 
+	/* allocate FdwTransactionState */
+	if (fdwTransState == NULL)
+	{
+		MemoryContext oldcxt;
+		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+		fdwTransState = palloc0(sizeof(FdwTransactionState));
+		MemoryContextSwitchTo(oldcxt);
+	}
+
 	/* Set flag that we did GetConnection during the current transaction */
 	xact_got_connection = true;
 
@@ -446,7 +478,8 @@ configure_remote_session(PGconn *conn)
 }
 
 /*
- * Convenience subroutine to issue a non-data-returning SQL command to remote
+ * Convenience subroutine to issue a non-data-returning SQL command or
+ * statement to remote node.
  */
 static void
 do_sql_command(PGconn *conn, const char *sql)
@@ -456,7 +489,8 @@ do_sql_command(PGconn *conn, const char *sql)
 	if (!PQsendQuery(conn, sql))
 		pgfdw_report_error(ERROR, NULL, conn, false, sql);
 	res = pgfdw_get_result(conn, sql);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+			PQresultStatus(res) != PGRES_TUPLES_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
 }
@@ -484,6 +518,10 @@ begin_remote_xact(ConnCacheEntry *entry)
 		elog(DEBUG3, "starting remote transaction on connection %p",
 			 entry->conn);
 
+		if (UseGlobalSnapshots && (!IsolationUsesXactSnapshot() ||
+								   IsolationIsSerializable()))
+			elog(ERROR, "Global snapshots support only REPEATABLE READ");
+
 		if (IsolationIsSerializable())
 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
 		else
@@ -492,6 +530,23 @@ begin_remote_xact(ConnCacheEntry *entry)
 		do_sql_command(entry->conn, sql);
 		entry->xact_depth = 1;
 		entry->changing_xact_state = false;
+
+		if (UseGlobalSnapshots)
+		{
+			char import_sql[128];
+
+			/* Export our snapshot */
+			if (fdwTransState->global_csn == 0)
+				fdwTransState->global_csn = ExportGlobalSnapshot();
+
+			snprintf(import_sql, sizeof(import_sql),
+				"SELECT pg_global_snapshot_import("UINT64_FORMAT")",
+				fdwTransState->global_csn);
+
+			do_sql_command(entry->conn, import_sql);
+		}
+
+		fdwTransState->nparticipants += 1;
 	}
 
 	/*
@@ -699,6 +754,94 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 	PG_END_TRY();
 }
 
+/* Callback typedef for BroadcastStmt */
+typedef bool (*BroadcastCmdResHandler) (PGresult *result, void *arg);
+
+/* Broadcast sql in parallel to all ConnectionHash entries */
+static bool
+BroadcastStmt(char const * sql, unsigned expectedStatus,
+				BroadcastCmdResHandler handler, void *arg)
+{
+	HASH_SEQ_STATUS scan;
+	ConnCacheEntry *entry;
+	bool		allOk = true;
+
+	/* Broadcast sql */
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		pgfdw_reject_incomplete_xact_state_change(entry);
+
+		if (entry->xact_depth > 0 && entry->conn != NULL)
+		{
+			if (!PQsendQuery(entry->conn, sql))
+			{
+				PGresult   *res = PQgetResult(entry->conn);
+
+				elog(WARNING, "Failed to send command %s", sql);
+				pgfdw_report_error(WARNING, res, entry->conn, true, sql);
+				PQclear(res);
+			}
+		}
+	}
+
+	/* Collect responses */
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		if (entry->xact_depth > 0 && entry->conn != NULL)
+		{
+			PGresult   *result = PQgetResult(entry->conn);
+
+			if (PQresultStatus(result) != expectedStatus ||
+				(handler && !handler(result, arg)))
+			{
+				elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus);
+				pgfdw_report_error(ERROR, result, entry->conn, true, sql);
+				allOk = false;
+			}
+			PQclear(result);
+			PQgetResult(entry->conn);	/* consume NULL result */
+		}
+	}
+
+	return allOk;
+}
+
+/* Wrapper for broadcasting commands */
+static bool
+BroadcastCmd(char const *sql)
+{
+	return BroadcastStmt(sql, PGRES_COMMAND_OK, NULL, NULL);
+}
+
+/* Wrapper for broadcasting statements */
+static bool
+BroadcastFunc(char const *sql)
+{
+	return BroadcastStmt(sql, PGRES_TUPLES_OK, NULL, NULL);
+}
+
+/* Callback for selecting maximal csn */
+static bool
+MaxCsnCB(PGresult *result, void *arg)
+{
+	char		   *resp;
+	GlobalCSN	   *max_csn = (GlobalCSN *) arg;
+	GlobalCSN		csn = 0;
+
+	resp = PQgetvalue(result, 0, 0);
+
+	if (resp == NULL || (*resp) == '\0' ||
+			sscanf(resp, UINT64_FORMAT, &csn) != 1)
+		return false;
+
+	if (*max_csn < csn)
+		*max_csn = csn;
+
+	return true;
+}
+
 /*
  * pgfdw_xact_callback --- cleanup at main-transaction end.
  */
@@ -712,6 +855,86 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	if (!xact_got_connection)
 		return;
 
+	/* Handle possible two-phase commit */
+	if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT)
+	{
+		bool include_local_tx = false;
+
+		/* Should we take into account this node? */
+		if (TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+		{
+			include_local_tx = true;
+			fdwTransState->nparticipants += 1;
+		}
+
+		/* Switch to 2PC mode there were more than one participant */
+		if (UseGlobalSnapshots && fdwTransState->nparticipants > 1)
+			fdwTransState->two_phase_commit = true;
+
+		if (fdwTransState->two_phase_commit)
+		{
+			GlobalCSN	max_csn = InProgressGlobalCSN,
+						my_csn = InProgressGlobalCSN;
+			bool	res;
+			char   *sql;
+
+			fdwTransState->gid = psprintf("pgfdw:%lld:%llu:%d:%u:%d:%d",
+										  (long long) GetCurrentTimestamp(),
+										  (long long) GetSystemIdentifier(),
+										  MyProcPid,
+										  GetCurrentTransactionIdIfAny(),
+										  ++two_phase_xact_count,
+										  fdwTransState->nparticipants);
+
+			/* Broadcast PREPARE */
+			sql = psprintf("PREPARE TRANSACTION '%s'", fdwTransState->gid);
+			res = BroadcastCmd(sql);
+			if (!res)
+				goto error;
+
+			/* Broadcast pg_global_snapshot_prepare() */
+			if (include_local_tx)
+				my_csn = GlobalSnapshotPrepareCurrent();
+
+			sql = psprintf("SELECT pg_global_snapshot_prepare('%s')",
+														fdwTransState->gid);
+			res = BroadcastStmt(sql, PGRES_TUPLES_OK, MaxCsnCB, &max_csn);
+			if (!res)
+				goto error;
+
+			/* select maximal global csn */
+			if (include_local_tx && my_csn > max_csn)
+				max_csn = my_csn;
+
+			/* Broadcast pg_global_snapshot_assign() */
+			if (include_local_tx)
+				GlobalSnapshotAssignCsnCurrent(max_csn);
+			sql = psprintf("SELECT pg_global_snapshot_assign('%s',"UINT64_FORMAT")",
+							fdwTransState->gid, max_csn);
+			res = BroadcastFunc(sql);
+
+error:
+			if (!res)
+			{
+				sql = psprintf("ABORT PREPARED '%s'", fdwTransState->gid);
+				BroadcastCmd(sql);
+				elog(ERROR, "Failed to PREPARE transaction on remote node");
+			}
+
+			/*
+			 * Do not fall down. Consequent COMMIT event will clean thing up.
+			 */
+			return;
+		}
+	}
+
+	/* COMMIT open transaction of we were doing 2PC */
+	if (fdwTransState->two_phase_commit &&
+		(event == XACT_EVENT_PARALLEL_COMMIT || event == XACT_EVENT_COMMIT))
+	{
+		BroadcastCmd(psprintf("COMMIT PREPARED '%s'", fdwTransState->gid));
+	}
+
 	/*
 	 * Scan all connection cache entries to find open remote transactions, and
 	 * close them.
@@ -719,8 +942,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	hash_seq_init(&scan, ConnectionHash);
 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
 	{
-		PGresult   *res;
-
 		/* Ignore cache entry if no open connection right now */
 		if (entry->conn == NULL)
 			continue;
@@ -737,6 +958,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 			{
 				case XACT_EVENT_PARALLEL_PRE_COMMIT:
 				case XACT_EVENT_PRE_COMMIT:
+					Assert(!fdwTransState->two_phase_commit);
 
 					/*
 					 * If abort cleanup previously failed for this connection,
@@ -749,28 +971,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					do_sql_command(entry->conn, "COMMIT TRANSACTION");
 					entry->changing_xact_state = false;
 
-					/*
-					 * If there were any errors in subtransactions, and we
-					 * made prepared statements, do a DEALLOCATE ALL to make
-					 * sure we get rid of all prepared statements. This is
-					 * annoying and not terribly bulletproof, but it's
-					 * probably not worth trying harder.
-					 *
-					 * DEALLOCATE ALL only exists in 8.3 and later, so this
-					 * constrains how old a server postgres_fdw can
-					 * communicate with.  We intentionally ignore errors in
-					 * the DEALLOCATE, so that we can hobble along to some
-					 * extent with older servers (leaking prepared statements
-					 * as we go; but we don't really support update operations
-					 * pre-8.3 anyway).
-					 */
-					if (entry->have_prep_stmt && entry->have_error)
-					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
-						PQclear(res);
-					}
-					entry->have_prep_stmt = false;
-					entry->have_error = false;
+					deallocate_prepared_stmts(entry);
 					break;
 				case XACT_EVENT_PRE_PREPARE:
 
@@ -789,6 +990,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					break;
 				case XACT_EVENT_PARALLEL_COMMIT:
 				case XACT_EVENT_COMMIT:
+					if (fdwTransState->two_phase_commit)
+						deallocate_prepared_stmts(entry);
+					else /* Pre-commit should have closed the open transaction */
+						elog(ERROR, "missed cleaning up connection during pre-commit");
+					break;
 				case XACT_EVENT_PREPARE:
 					/* Pre-commit should have closed the open transaction */
 					elog(ERROR, "missed cleaning up connection during pre-commit");
@@ -884,6 +1090,38 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 	/* Also reset cursor numbering for next transaction */
 	cursor_number = 0;
+
+	/* Reset fdwTransState */
+	memset(fdwTransState, '\0', sizeof(FdwTransactionState));
+}
+
+/*
+ * If there were any errors in subtransactions, and we
+ * made prepared statements, do a DEALLOCATE ALL to make
+ * sure we get rid of all prepared statements. This is
+ * annoying and not terribly bulletproof, but it's
+ * probably not worth trying harder.
+ *
+ * DEALLOCATE ALL only exists in 8.3 and later, so this
+ * constrains how old a server postgres_fdw can
+ * communicate with.  We intentionally ignore errors in
+ * the DEALLOCATE, so that we can hobble along to some
+ * extent with older servers (leaking prepared statements
+ * as we go; but we don't really support update operations
+ * pre-8.3 anyway).
+ */
+static void
+deallocate_prepared_stmts(ConnCacheEntry *entry)
+{
+	PGresult   *res;
+
+	if (entry->have_prep_stmt && entry->have_error)
+	{
+		res = PQexec(entry->conn, "DEALLOCATE ALL");
+		PQclear(res);
+	}
+	entry->have_prep_stmt = false;
+	entry->have_error = false;
 }
 
 /*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9fc53cad68..03c5b0093a 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -301,6 +301,9 @@ typedef struct
 	List	   *already_used;	/* expressions already dealt with */
 } ec_member_foreign_arg;
 
+bool		UseGlobalSnapshots;
+void		_PG_init(void);
+
 /*
  * SQL functions
  */
@@ -6584,3 +6587,12 @@ find_em_expr_for_input_target(PlannerInfo *root,
 	elog(ERROR, "could not find pathkey item to sort");
 	return NULL;				/* keep compiler quiet */
 }
+
+void
+_PG_init(void)
+{
+	DefineCustomBoolVariable("postgres_fdw.use_global_snapshots",
+							 "Use global snapshots for FDW transactions", NULL,
+							 &UseGlobalSnapshots, false, PGC_USERSET, 0, NULL,
+							 NULL, NULL);
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..9d3ea077a1 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -208,4 +208,6 @@ extern const char *get_jointype_name(JoinType jointype);
 extern bool is_builtin(Oid objectId);
 extern bool is_shippable(Oid objectId, Oid classId, PgFdwRelationInfo *fpinfo);
 
+extern bool UseGlobalSnapshots;
+
 #endif							/* POSTGRES_FDW_H */
diff --git a/contrib/postgres_fdw/t/001_bank_coordinator.pl b/contrib/postgres_fdw/t/001_bank_coordinator.pl
new file mode 100644
index 0000000000..1e31f33349
--- /dev/null
+++ b/contrib/postgres_fdw/t/001_bank_coordinator.pl
@@ -0,0 +1,264 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+my $master = get_new_node("master");
+$master->init;
+$master->append_conf('postgresql.conf', qq(
+	max_prepared_transactions = 30
+	log_checkpoints = true
+	postgres_fdw.use_global_snapshots = on
+	track_global_snapshots = on
+	default_transaction_isolation = 'REPEATABLE READ'
+));
+$master->start;
+
+my $shard1 = get_new_node("shard1");
+$shard1->init;
+$shard1->append_conf('postgresql.conf', qq(
+	max_prepared_transactions = 30
+	global_snapshot_defer_time = 15
+	track_global_snapshots = on
+));
+$shard1->start;
+
+my $shard2 = get_new_node("shard2");
+$shard2->init;
+$shard2->append_conf('postgresql.conf', qq(
+	max_prepared_transactions = 30
+	global_snapshot_defer_time = 15
+	track_global_snapshots = on
+));
+$shard2->start;
+
+###############################################################################
+# Prepare nodes
+###############################################################################
+
+$master->safe_psql('postgres', qq[
+	CREATE EXTENSION postgres_fdw;
+	CREATE TABLE accounts(id integer primary key, amount integer);
+	CREATE TABLE global_transactions(tx_time timestamp);
+]);
+
+foreach my $node ($shard1, $shard2)
+{
+	my $port = $node->port;
+	my $host = $node->host;
+
+	$node->safe_psql('postgres',
+			"CREATE TABLE accounts(id integer primary key, amount integer)");
+
+	$master->safe_psql('postgres', qq[
+		CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw options(dbname 'postgres', host '$host', port '$port');
+		CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts) server shard_$port options(table_name 'accounts');
+		CREATE USER MAPPING for CURRENT_USER SERVER shard_$port;
+	])
+}
+
+$shard1->safe_psql('postgres', qq[
+	insert into accounts select 2*id-1, 0 from generate_series(1, 10010) as id;
+	CREATE TABLE local_transactions(tx_time timestamp);
+]);
+
+$shard2->safe_psql('postgres', qq[
+	insert into accounts select 2*id, 0 from generate_series(1, 10010) as id;
+	CREATE TABLE local_transactions(tx_time timestamp);
+]);
+
+diag("master: @{[$master->connstr('postgres')]}");
+diag("shard1: @{[$shard1->connstr('postgres')]}");
+diag("shard2: @{[$shard2->connstr('postgres')]}");
+
+###############################################################################
+# pgbench scripts
+###############################################################################
+
+my $bank = File::Temp->new();
+append_to_file($bank, q{
+	\set id random(1, 20000)
+	BEGIN;
+	WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *)
+		INSERT into global_transactions SELECT now() FROM upd;
+	UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
+	COMMIT;
+});
+
+my $bank1 = File::Temp->new();
+append_to_file($bank1, q{
+	\set id random(1, 10000)
+	BEGIN;
+	WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = (2*:id + 1) RETURNING *)
+		INSERT into local_transactions SELECT now() FROM upd;
+	UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 3);
+	COMMIT;
+});
+
+my $bank2 = File::Temp->new();
+append_to_file($bank2, q{
+	\set id random(1, 10000)
+
+	BEGIN;
+	WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = 2*:id RETURNING *)
+		INSERT into local_transactions SELECT now() FROM upd;
+	UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 2);
+	COMMIT;
+});
+
+###############################################################################
+# Helpers
+###############################################################################
+
+sub count_and_delete_rows
+{
+	my ($node, $table) = @_;
+	my $count;
+
+	$count = $node->safe_psql('postgres',"select count(*) from $table");
+	$node->safe_psql('postgres',"delete from $table");
+	diag($node->name, ": completed $count transactions");
+	return $count;
+}
+
+###############################################################################
+# Concurrent global transactions
+###############################################################################
+
+my ($err, $rc);
+my $started;
+my $seconds = 30;
+my $selects;
+my $total = '0';
+my $oldtotal = '0';
+my $isolation_errors = 0;
+
+
+my $pgb_handle;
+
+$pgb_handle = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+while (time() - $started < $seconds)
+{
+	$total = $master->safe_psql('postgres', "select sum(amount) from accounts");
+	if ( ($total ne $oldtotal) and ($total ne '') )
+	{
+		$isolation_errors++;
+		$oldtotal = $total;
+		diag("Isolation error. Total = $total");
+	}
+	if ($total ne '') { $selects++; }
+}
+
+$master->pgbench_await($pgb_handle);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+	count_and_delete_rows($master, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction');
+
+###############################################################################
+# Concurrent global and local transactions
+###############################################################################
+
+my ($pgb_handle1, $pgb_handle2, $pgb_handle3);
+
+# global txses
+$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+# concurrent local
+$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' );
+$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' );
+
+$started = time();
+$selects = 0;
+$oldtotal = 0;
+while (time() - $started < $seconds)
+{
+	$total = $master->safe_psql('postgres', "select sum(amount) from accounts");
+	if ( ($total ne $oldtotal) and ($total ne '') )
+	{
+		$isolation_errors++;
+		$oldtotal = $total;
+		diag("Isolation error. Total = $total");
+	}
+	if ($total ne '') { $selects++; }
+}
+
+diag("selects = $selects");
+$master->pgbench_await($pgb_handle1);
+$shard1->pgbench_await($pgb_handle2);
+$shard2->pgbench_await($pgb_handle3);
+
+diag("completed $selects selects");
+die "" unless ( $selects > 0 &&
+	count_and_delete_rows($master, 'global_transactions') > 0 &&
+	count_and_delete_rows($shard1, 'local_transactions') > 0 &&
+	count_and_delete_rows($shard2, 'local_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global and local transactions');
+
+
+###############################################################################
+# Snapshot stability
+###############################################################################
+
+my ($hashes, $hash1, $hash2);
+my $stability_errors = 0;
+
+# global txses
+$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+# concurrent local
+$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' );
+$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' );
+
+$selects = 0;
+$started = time();
+while (time() - $started < $seconds)
+{
+	foreach my $node ($master, $shard1, $shard2)
+	{
+		($hash1, $_, $hash2) = split "\n", $node->safe_psql('postgres', qq[
+			begin isolation level repeatable read;
+			select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t;
+			select pg_sleep(3);
+			select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t;
+			commit;
+		]);
+
+		if ($hash1 ne $hash2)
+		{
+			diag("oops");
+			$stability_errors++;
+		}
+		elsif ($hash1 eq '' or $hash2 eq '')
+		{
+			die;
+		}
+		else
+		{
+			$selects++;
+		}
+	}
+}
+
+$master->pgbench_await($pgb_handle1);
+$shard1->pgbench_await($pgb_handle2);
+$shard2->pgbench_await($pgb_handle3);
+
+die "" unless ( $selects > 0 &&
+	count_and_delete_rows($master, 'global_transactions') > 0 &&
+	count_and_delete_rows($shard1, 'local_transactions') > 0 &&
+	count_and_delete_rows($shard2, 'local_transactions') > 0);
+
+is($stability_errors, 0, 'snapshot is stable during concurrent global and local transactions');
+
+$master->stop;
+$shard1->stop;
+$shard2->stop;
diff --git a/contrib/postgres_fdw/t/002_bank_participant.pl b/contrib/postgres_fdw/t/002_bank_participant.pl
new file mode 100644
index 0000000000..04a2f1ba85
--- /dev/null
+++ b/contrib/postgres_fdw/t/002_bank_participant.pl
@@ -0,0 +1,240 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+my $shard1 = get_new_node("shard1");
+$shard1->init;
+$shard1->append_conf('postgresql.conf', qq(
+	max_prepared_transactions = 30
+	postgres_fdw.use_global_snapshots = on
+	global_snapshot_defer_time = 15
+	track_global_snapshots = on
+	default_transaction_isolation = 'REPEATABLE READ'
+));
+$shard1->start;
+
+my $shard2 = get_new_node("shard2");
+$shard2->init;
+$shard2->append_conf('postgresql.conf', qq(
+	max_prepared_transactions = 30
+	postgres_fdw.use_global_snapshots = on
+	global_snapshot_defer_time = 15
+	track_global_snapshots = on
+	default_transaction_isolation = 'REPEATABLE READ'
+));
+$shard2->start;
+
+###############################################################################
+# Prepare nodes
+###############################################################################
+
+my @shards = ($shard1, $shard2);
+
+foreach my $node (@shards)
+{
+	$node->safe_psql('postgres', qq[
+		CREATE EXTENSION postgres_fdw;
+		CREATE TABLE accounts(id integer primary key, amount integer);
+		CREATE TABLE accounts_local() inherits(accounts);
+		CREATE TABLE global_transactions(tx_time timestamp);
+		CREATE TABLE local_transactions(tx_time timestamp);
+	]);
+
+	foreach my $neighbor (@shards)
+	{
+		next if ($neighbor eq $node);
+
+		my $port = $neighbor->port;
+		my $host = $neighbor->host;
+
+		$node->safe_psql('postgres', qq[
+			CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw
+					options(dbname 'postgres', host '$host', port '$port');
+			CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts)
+					server shard_$port options(table_name 'accounts_local');
+			CREATE USER MAPPING for CURRENT_USER SERVER shard_$port;
+		]);
+	}
+}
+
+$shard1->psql('postgres', "insert into accounts_local select 2*id-1, 0 from generate_series(1, 10010) as id;");
+$shard2->psql('postgres', "insert into accounts_local select 2*id,   0 from generate_series(1, 10010) as id;");
+
+###############################################################################
+# pgbench scripts
+###############################################################################
+
+my $bank = File::Temp->new();
+append_to_file($bank, q{
+	\set id random(1, 20000)
+	BEGIN;
+	WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *)
+		INSERT into global_transactions SELECT now() FROM upd;
+	UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
+	COMMIT;
+});
+
+###############################################################################
+# Helpers
+###############################################################################
+
+sub count_and_delete_rows
+{
+	my ($node, $table) = @_;
+	my $count;
+
+	$count = $node->safe_psql('postgres',"select count(*) from $table");
+	$node->safe_psql('postgres',"delete from $table");
+	diag($node->name, ": completed $count transactions");
+	return $count;
+}
+
+###############################################################################
+# Concurrent global transactions
+###############################################################################
+
+my ($err, $rc);
+my $started;
+my $seconds = 30;
+my $selects;
+my $total = '0';
+my $oldtotal = '0';
+my $isolation_errors = 0;
+my $i;
+
+
+my ($pgb_handle1, $pgb_handle2);
+
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+while (time() - $started < $seconds)
+{
+	my $shard = $shard1;
+	foreach my $shard (@shards)
+	{
+		$total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+		if ( ($total ne $oldtotal) and ($total ne '') )
+		{
+			$isolation_errors++;
+			$oldtotal = $total;
+			diag("$i: Isolation error. Total = $total");
+		}
+		if ($total ne '') { $selects++; }
+	}
+	$i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+	count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+	count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction');
+
+###############################################################################
+# And do the same after soft restart
+###############################################################################
+
+$shard1->restart;
+$shard2->restart;
+$shard1->poll_query_until('postgres', "select 't'")
+	or die "Timed out waiting for shard1 to became online";
+$shard2->poll_query_until('postgres', "select 't'")
+	or die "Timed out waiting for shard2 to became online";
+
+$seconds = 15;
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+
+while (time() - $started < $seconds)
+{
+	my $shard = $shard1;
+	foreach my $shard (@shards)
+	{
+		$total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+		if ( ($total ne $oldtotal) and ($total ne '') )
+		{
+			$isolation_errors++;
+			$oldtotal = $total;
+			diag("$i: Isolation error. Total = $total");
+		}
+		if ($total ne '') { $selects++; }
+	}
+	$i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+	count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+	count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction after restart');
+
+###############################################################################
+# And do the same after hard restart
+###############################################################################
+
+$shard1->teardown_node;
+$shard2->teardown_node;
+$shard1->start;
+$shard2->start;
+$shard1->poll_query_until('postgres', "select 't'")
+	or die "Timed out waiting for shard1 to became online";
+$shard2->poll_query_until('postgres', "select 't'")
+	or die "Timed out waiting for shard2 to became online";
+
+
+$seconds = 15;
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+
+while (time() - $started < $seconds)
+{
+	my $shard = $shard1;
+	foreach my $shard (@shards)
+	{
+		$total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+		if ( ($total ne $oldtotal) and ($total ne '') )
+		{
+			$isolation_errors++;
+			$oldtotal = $total;
+			diag("$i: Isolation error. Total = $total");
+		}
+		if ($total ne '') { $selects++; }
+	}
+	$i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+	count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+	count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction after hard restart');
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 1d5450758e..ef4472170c 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -2115,6 +2115,41 @@ sub pg_recvlogical_upto
 	}
 }
 
+sub pgbench()
+{
+	my ($self, $node, @args) = @_;
+	my $pgbench_handle = $self->pgbench_async($node, @args);
+	$self->pgbench_await($pgbench_handle);
+}
+
+sub pgbench_async()
+{
+	my ($self, @args) = @_;
+
+	my ($in, $out, $err, $rc);
+	$in = '';
+	$out = '';
+
+	my @pgbench_command = (
+		'pgbench',
+		-h => $self->host,
+		-p => $self->port,
+		@args
+	);
+	my $handle = IPC::Run::start(\@pgbench_command, $in, $out);
+	return $handle;
+}
+
+sub pgbench_await()
+{
+	my ($self, $pgbench_handle) = @_;
+
+	# During run some pgbench threads can exit (for example due to
+	# serialization error). That will set non-zero returning code.
+	# So don't check return code here and leave it to a caller.
+	my $rc = IPC::Run::finish($pgbench_handle);
+}
+
 =pod
 
 =back
-- 
2.17.1

Reply via email to