On 09.06.2020 11:41, Fujii Masao wrote:
On 2020/05/12 19:24, Andrey Lepikhov wrote:
Rebased onto current master (fb544735f1).
Thanks for the patches!
These patches are no longer applied cleanly and caused the compilation
failure.
So could you rebase and update them?
Rebased onto 57cb806308 (see attachment).
The patches seem not to be registered in CommitFest yet.
Are you planning to do that?
Not now. It is a sharding-related feature. I'm not sure that this
approach is fully consistent with the sharding way now.
--
Andrey Lepikhov
Postgres Professional
https://postgrespro.com
>From cd6a8585f9814b7e465abb2649ac84e80e7c726b Mon Sep 17 00:00:00 2001
From: Andrey Lepikhov <a.lepik...@postgrespro.ru>
Date: Tue, 9 Jun 2020 14:55:38 +0500
Subject: [PATCH 1/3] GlobalCSNLog-SLRU
---
src/backend/access/transam/Makefile | 1 +
src/backend/access/transam/global_csn_log.c | 439 ++++++++++++++++++++
src/backend/access/transam/twophase.c | 1 +
src/backend/access/transam/varsup.c | 2 +
src/backend/access/transam/xlog.c | 12 +
src/backend/storage/ipc/ipci.c | 3 +
src/backend/storage/ipc/procarray.c | 3 +
src/backend/storage/lmgr/lwlocknames.txt | 1 +
src/backend/tcop/postgres.c | 1 +
src/backend/utils/misc/guc.c | 9 +
src/backend/utils/probes.d | 2 +
src/bin/initdb/initdb.c | 3 +-
src/include/access/global_csn_log.h | 30 ++
src/include/storage/lwlock.h | 1 +
src/include/utils/snapshot.h | 3 +
15 files changed, 510 insertions(+), 1 deletion(-)
create mode 100644 src/backend/access/transam/global_csn_log.c
create mode 100644 src/include/access/global_csn_log.h
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de72..60ff8b141e 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
clog.o \
commit_ts.o \
+ global_csn_log.o \
generic_xlog.o \
multixact.o \
parallel.o \
diff --git a/src/backend/access/transam/global_csn_log.c b/src/backend/access/transam/global_csn_log.c
new file mode 100644
index 0000000000..6f7fded350
--- /dev/null
+++ b/src/backend/access/transam/global_csn_log.c
@@ -0,0 +1,439 @@
+/*-----------------------------------------------------------------------------
+ *
+ * global_csn_log.c
+ * Track global commit sequence numbers of finished transactions
+ *
+ * Implementation of cross-node transaction isolation relies on commit sequence
+ * number (CSN) based visibility rules. This module provides SLRU to store
+ * CSN for each transaction. This mapping need to be kept only for xid's
+ * greater then oldestXid, but that can require arbitrary large amounts of
+ * memory in case of long-lived transactions. Because of same lifetime and
+ * persistancy requirements this module is quite similar to subtrans.c
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/global_csn_log.c
+ *
+ *-----------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/global_csn_log.h"
+#include "access/slru.h"
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "utils/snapmgr.h"
+
+bool track_global_snapshots;
+
+/*
+ * Defines for GlobalCSNLog page sizes. A page is the same BLCKSZ as is used
+ * everywhere else in Postgres.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * GlobalCSNLog page numbering also wraps around at
+ * 0xFFFFFFFF/GLOBAL_CSN_LOG_XACTS_PER_PAGE, and GlobalCSNLog segment numbering at
+ * 0xFFFFFFFF/CLOG_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
+ * explicit notice of that fact in this module, except when comparing segment
+ * and page numbers in TruncateGlobalCSNLog (see GlobalCSNLogPagePrecedes).
+ */
+
+/* We store the commit GlobalCSN for each xid */
+#define GCSNLOG_XACTS_PER_PAGE (BLCKSZ / sizeof(GlobalCSN))
+
+#define TransactionIdToPage(xid) ((xid) / (TransactionId) GCSNLOG_XACTS_PER_PAGE)
+#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) GCSNLOG_XACTS_PER_PAGE)
+
+/*
+ * Link to shared-memory data structures for CLOG control
+ */
+static SlruCtlData GlobalCSNLogCtlData;
+#define GlobalCsnlogCtl (&GlobalCSNLogCtlData)
+
+static int ZeroGlobalCSNLogPage(int pageno);
+static bool GlobalCSNLogPagePrecedes(int page1, int page2);
+static void GlobalCSNLogSetPageStatus(TransactionId xid, int nsubxids,
+ TransactionId *subxids,
+ GlobalCSN csn, int pageno);
+static void GlobalCSNLogSetCSNInSlot(TransactionId xid, GlobalCSN csn,
+ int slotno);
+
+/*
+ * GlobalCSNLogSetCSN
+ *
+ * Record GlobalCSN of transaction and its subtransaction tree.
+ *
+ * xid is a single xid to set status for. This will typically be the top level
+ * transactionid for a top level commit or abort. It can also be a
+ * subtransaction when we record transaction aborts.
+ *
+ * subxids is an array of xids of length nsubxids, representing subtransactions
+ * in the tree of xid. In various cases nsubxids may be zero.
+ *
+ * csn is the commit sequence number of the transaction. It should be
+ * AbortedGlobalCSN for abort cases.
+ */
+void
+GlobalCSNLogSetCSN(TransactionId xid, int nsubxids,
+ TransactionId *subxids, GlobalCSN csn)
+{
+ int pageno;
+ int i = 0;
+ int offset = 0;
+
+ /* Callers of GlobalCSNLogSetCSN() must check GUC params */
+ Assert(track_global_snapshots);
+
+ Assert(TransactionIdIsValid(xid));
+
+ pageno = TransactionIdToPage(xid); /* get page of parent */
+ for (;;)
+ {
+ int num_on_page = 0;
+
+ while (i < nsubxids && TransactionIdToPage(subxids[i]) == pageno)
+ {
+ num_on_page++;
+ i++;
+ }
+
+ GlobalCSNLogSetPageStatus(xid,
+ num_on_page, subxids + offset,
+ csn, pageno);
+ if (i >= nsubxids)
+ break;
+
+ offset = i;
+ pageno = TransactionIdToPage(subxids[offset]);
+ xid = InvalidTransactionId;
+ }
+}
+
+/*
+ * Record the final state of transaction entries in the csn log for
+ * all entries on a single page. Atomic only on this page.
+ *
+ * Otherwise API is same as TransactionIdSetTreeStatus()
+ */
+static void
+GlobalCSNLogSetPageStatus(TransactionId xid, int nsubxids,
+ TransactionId *subxids,
+ GlobalCSN csn, int pageno)
+{
+ int slotno;
+ int i;
+
+ LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+ slotno = SimpleLruReadPage(GlobalCsnlogCtl, pageno, true, xid);
+
+ /* Subtransactions first, if needed ... */
+ for (i = 0; i < nsubxids; i++)
+ {
+ Assert(GlobalCsnlogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
+ GlobalCSNLogSetCSNInSlot(subxids[i], csn, slotno);
+ }
+
+ /* ... then the main transaction */
+ if (TransactionIdIsValid(xid))
+ GlobalCSNLogSetCSNInSlot(xid, csn, slotno);
+
+ GlobalCsnlogCtl->shared->page_dirty[slotno] = true;
+
+ LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * Sets the commit status of a single transaction.
+ */
+static void
+GlobalCSNLogSetCSNInSlot(TransactionId xid, GlobalCSN csn, int slotno)
+{
+ int entryno = TransactionIdToPgIndex(xid);
+ GlobalCSN *ptr;
+
+ Assert(LWLockHeldByMe(GlobalCSNLogControlLock));
+
+ ptr = (GlobalCSN *) (GlobalCsnlogCtl->shared->page_buffer[slotno] + entryno * sizeof(XLogRecPtr));
+
+ *ptr = csn;
+}
+
+/*
+ * Interrogate the state of a transaction in the log.
+ *
+ * NB: this is a low-level routine and is NOT the preferred entry point
+ * for most uses; TransactionIdGetGlobalCSN() in global_snapshot.c is the
+ * intended caller.
+ */
+GlobalCSN
+GlobalCSNLogGetCSN(TransactionId xid)
+{
+ int pageno = TransactionIdToPage(xid);
+ int entryno = TransactionIdToPgIndex(xid);
+ int slotno;
+ GlobalCSN *ptr;
+ GlobalCSN global_csn;
+
+ /* Callers of GlobalCSNLogGetCSN() must check GUC params */
+ Assert(track_global_snapshots);
+
+ /* Can't ask about stuff that might not be around anymore */
+ Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
+
+ /* lock is acquired by SimpleLruReadPage_ReadOnly */
+
+ slotno = SimpleLruReadPage_ReadOnly(GlobalCsnlogCtl, pageno, xid);
+ ptr = (GlobalCSN *) (GlobalCsnlogCtl->shared->page_buffer[slotno] + entryno * sizeof(XLogRecPtr));
+ global_csn = *ptr;
+
+ LWLockRelease(GlobalCSNLogControlLock);
+
+ return global_csn;
+}
+
+/*
+ * Number of shared GlobalCSNLog buffers.
+ */
+static Size
+GlobalCSNLogShmemBuffers(void)
+{
+ return Min(32, Max(4, NBuffers / 512));
+}
+
+/*
+ * Reserve shared memory for GlobalCsnlogCtl.
+ */
+Size
+GlobalCSNLogShmemSize(void)
+{
+ if (!track_global_snapshots)
+ return 0;
+
+ return SimpleLruShmemSize(GlobalCSNLogShmemBuffers(), 0);
+}
+
+/*
+ * Initialization of shared memory for GlobalCSNLog.
+ */
+void
+GlobalCSNLogShmemInit(void)
+{
+ if (!track_global_snapshots)
+ return;
+
+ GlobalCsnlogCtl->PagePrecedes = GlobalCSNLogPagePrecedes;
+ SimpleLruInit(GlobalCsnlogCtl, "GlobalCSNLog Ctl", GlobalCSNLogShmemBuffers(), 0,
+ GlobalCSNLogControlLock, "pg_global_csn", LWTRANCHE_GLOBAL_CSN_LOG_BUFFERS);
+}
+
+/*
+ * This func must be called ONCE on system install. It creates the initial
+ * GlobalCSNLog segment. The pg_global_csn directory is assumed to have been
+ * created by initdb, and GlobalCSNLogShmemInit must have been called already.
+ */
+void
+BootStrapGlobalCSNLog(void)
+{
+ int slotno;
+
+ if (!track_global_snapshots)
+ return;
+
+ LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+ /* Create and zero the first page of the commit log */
+ slotno = ZeroGlobalCSNLogPage(0);
+
+ /* Make sure it's written out */
+ SimpleLruWritePage(GlobalCsnlogCtl, slotno);
+ Assert(!GlobalCsnlogCtl->shared->page_dirty[slotno]);
+
+ LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * Initialize (or reinitialize) a page of GlobalCSNLog to zeroes.
+ *
+ * The page is not actually written, just set up in shared memory.
+ * The slot number of the new page is returned.
+ *
+ * Control lock must be held at entry, and will be held at exit.
+ */
+static int
+ZeroGlobalCSNLogPage(int pageno)
+{
+ Assert(LWLockHeldByMe(GlobalCSNLogControlLock));
+ return SimpleLruZeroPage(GlobalCsnlogCtl, pageno);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ * after StartupXLOG has initialized ShmemVariableCache->nextXid.
+ *
+ * oldestActiveXID is the oldest XID of any prepared transaction, or nextXid
+ * if there are none.
+ */
+void
+StartupGlobalCSNLog(TransactionId oldestActiveXID)
+{
+ int startPage;
+ int endPage;
+
+ if (!track_global_snapshots)
+ return;
+
+ /*
+ * Since we don't expect pg_global_csn to be valid across crashes, we
+ * initialize the currently-active page(s) to zeroes during startup.
+ * Whenever we advance into a new page, ExtendGlobalCSNLog will likewise
+ * zero the new page without regard to whatever was previously on disk.
+ */
+ LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+ startPage = TransactionIdToPage(oldestActiveXID);
+ endPage = TransactionIdToPage(XidFromFullTransactionId(ShmemVariableCache->nextFullXid));
+
+ while (startPage != endPage)
+ {
+ (void) ZeroGlobalCSNLogPage(startPage);
+ startPage++;
+ /* must account for wraparound */
+ if (startPage > TransactionIdToPage(MaxTransactionId))
+ startPage = 0;
+ }
+ (void) ZeroGlobalCSNLogPage(startPage);
+
+ LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend shutdown
+ */
+void
+ShutdownGlobalCSNLog(void)
+{
+ if (!track_global_snapshots)
+ return;
+
+ /*
+ * Flush dirty GlobalCSNLog pages to disk.
+ *
+ * This is not actually necessary from a correctness point of view. We do
+ * it merely as a debugging aid.
+ */
+ TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_START(false);
+ SimpleLruFlush(GlobalCsnlogCtl, false);
+ TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_DONE(false);
+}
+
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
+void
+CheckPointGlobalCSNLog(void)
+{
+ if (!track_global_snapshots)
+ return;
+
+ /*
+ * Flush dirty GlobalCSNLog pages to disk.
+ *
+ * This is not actually necessary from a correctness point of view. We do
+ * it merely to improve the odds that writing of dirty pages is done by
+ * the checkpoint process and not by backends.
+ */
+ TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_START(true);
+ SimpleLruFlush(GlobalCsnlogCtl, true);
+ TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_DONE(true);
+}
+
+/*
+ * Make sure that GlobalCSNLog has room for a newly-allocated XID.
+ *
+ * NB: this is called while holding XidGenLock. We want it to be very fast
+ * most of the time; even when it's not so fast, no actual I/O need happen
+ * unless we're forced to write out a dirty clog or xlog page to make room
+ * in shared memory.
+ */
+void
+ExtendGlobalCSNLog(TransactionId newestXact)
+{
+ int pageno;
+
+ if (!track_global_snapshots)
+ return;
+
+ /*
+ * No work except at first XID of a page. But beware: just after
+ * wraparound, the first XID of page zero is FirstNormalTransactionId.
+ */
+ if (TransactionIdToPgIndex(newestXact) != 0 &&
+ !TransactionIdEquals(newestXact, FirstNormalTransactionId))
+ return;
+
+ pageno = TransactionIdToPage(newestXact);
+
+ LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+ /* Zero the page and make an XLOG entry about it */
+ ZeroGlobalCSNLogPage(pageno);
+
+ LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * Remove all GlobalCSNLog segments before the one holding the passed
+ * transaction ID.
+ *
+ * This is normally called during checkpoint, with oldestXact being the
+ * oldest TransactionXmin of any running transaction.
+ */
+void
+TruncateGlobalCSNLog(TransactionId oldestXact)
+{
+ int cutoffPage;
+
+ if (!track_global_snapshots)
+ return;
+
+ /*
+ * The cutoff point is the start of the segment containing oldestXact. We
+ * pass the *page* containing oldestXact to SimpleLruTruncate. We step
+ * back one transaction to avoid passing a cutoff page that hasn't been
+ * created yet in the rare case that oldestXact would be the first item on
+ * a page and oldestXact == next XID. In that case, if we didn't subtract
+ * one, we'd trigger SimpleLruTruncate's wraparound detection.
+ */
+ TransactionIdRetreat(oldestXact);
+ cutoffPage = TransactionIdToPage(oldestXact);
+
+ SimpleLruTruncate(GlobalCsnlogCtl, cutoffPage);
+}
+
+/*
+ * Decide which of two GlobalCSNLog page numbers is "older" for truncation
+ * purposes.
+ *
+ * We need to use comparison of TransactionIds here in order to do the right
+ * thing with wraparound XID arithmetic. However, if we are asked about
+ * page number zero, we don't want to hand InvalidTransactionId to
+ * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
+ * offset both xids by FirstNormalTransactionId to avoid that.
+ */
+static bool
+GlobalCSNLogPagePrecedes(int page1, int page2)
+{
+ TransactionId xid1;
+ TransactionId xid2;
+
+ xid1 = ((TransactionId) page1) * GCSNLOG_XACTS_PER_PAGE;
+ xid1 += FirstNormalTransactionId;
+ xid2 = ((TransactionId) page2) * GCSNLOG_XACTS_PER_PAGE;
+ xid2 += FirstNormalTransactionId;
+
+ return TransactionIdPrecedes(xid1, xid2);
+}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index e1904877fa..9a69fc1e09 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -77,6 +77,7 @@
#include <unistd.h>
#include "access/commit_ts.h"
+#include "access/global_csn_log.h"
#include "access/htup_details.h"
#include "access/subtrans.h"
#include "access/transam.h"
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index e14b53bf9e..3de3d99683 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -15,6 +15,7 @@
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/global_csn_log.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
@@ -173,6 +174,7 @@ GetNewTransactionId(bool isSubXact)
* Extend pg_subtrans and pg_commit_ts too.
*/
ExtendCLOG(xid);
+ ExtendGlobalCSNLog(xid);
ExtendCommitTs(xid);
ExtendSUBTRANS(xid);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 55cac186dc..4ffe4aad03 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -24,6 +24,7 @@
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/global_csn_log.h"
#include "access/heaptoast.h"
#include "access/multixact.h"
#include "access/rewriteheap.h"
@@ -5342,6 +5343,7 @@ BootStrapXLOG(void)
/* Bootstrap the commit log, too */
BootStrapCLOG();
+ BootStrapGlobalCSNLog();
BootStrapCommitTs();
BootStrapSUBTRANS();
BootStrapMultiXact();
@@ -7059,6 +7061,7 @@ StartupXLOG(void)
* maintained during recovery and need not be started yet.
*/
StartupCLOG();
+ StartupGlobalCSNLog(oldestActiveXID);
StartupSUBTRANS(oldestActiveXID);
/*
@@ -7876,6 +7879,7 @@ StartupXLOG(void)
if (standbyState == STANDBY_DISABLED)
{
StartupCLOG();
+ StartupGlobalCSNLog(oldestActiveXID);
StartupSUBTRANS(oldestActiveXID);
}
@@ -8523,6 +8527,7 @@ ShutdownXLOG(int code, Datum arg)
CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
}
ShutdownCLOG();
+ ShutdownGlobalCSNLog();
ShutdownCommitTs();
ShutdownSUBTRANS();
ShutdownMultiXact();
@@ -9095,7 +9100,10 @@ CreateCheckPoint(int flags)
* StartupSUBTRANS hasn't been called yet.
*/
if (!RecoveryInProgress())
+ {
TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+ TruncateGlobalCSNLog(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+ }
/* Real work is done, but log and update stats before releasing lock. */
LogCheckpointEnd(false);
@@ -9171,6 +9179,7 @@ static void
CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
{
CheckPointCLOG();
+ CheckPointGlobalCSNLog();
CheckPointCommitTs();
CheckPointSUBTRANS();
CheckPointMultiXact();
@@ -9455,7 +9464,10 @@ CreateRestartPoint(int flags)
* this because StartupSUBTRANS hasn't been called yet.
*/
if (EnableHotStandby)
+ {
TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+ TruncateGlobalCSNLog(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+ }
/* Real work is done, but log and update before releasing lock. */
LogCheckpointEnd(true);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cd..dc2d2959c4 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -16,6 +16,7 @@
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/global_csn_log.h"
#include "access/heapam.h"
#include "access/multixact.h"
#include "access/nbtree.h"
@@ -125,6 +126,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, ProcGlobalShmemSize());
size = add_size(size, XLOGShmemSize());
size = add_size(size, CLOGShmemSize());
+ size = add_size(size, GlobalCSNLogShmemSize());
size = add_size(size, CommitTsShmemSize());
size = add_size(size, SUBTRANSShmemSize());
size = add_size(size, TwoPhaseShmemSize());
@@ -213,6 +215,7 @@ CreateSharedMemoryAndSemaphores(void)
*/
XLOGShmemInit();
CLOGShmemInit();
+ GlobalCSNLogShmemInit();
CommitTsShmemInit();
SUBTRANSShmemInit();
MultiXactShmemInit();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 3c2b369615..486da77f68 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -46,6 +46,7 @@
#include <signal.h>
#include "access/clog.h"
+#include "access/global_csn_log.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/twophase.h"
@@ -833,6 +834,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
while (TransactionIdPrecedes(latestObservedXid, running->nextXid))
{
ExtendSUBTRANS(latestObservedXid);
+ ExtendGlobalCSNLog(latestObservedXid);
TransactionIdAdvance(latestObservedXid);
}
TransactionIdRetreat(latestObservedXid); /* = running->nextXid - 1 */
@@ -3335,6 +3337,7 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
while (TransactionIdPrecedes(next_expected_xid, xid))
{
TransactionIdAdvance(next_expected_xid);
+ ExtendGlobalCSNLog(next_expected_xid);
ExtendSUBTRANS(next_expected_xid);
}
Assert(next_expected_xid == xid);
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index e6985e8eed..aa904b1f17 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ MultiXactTruncationLock 41
OldSnapshotTimeMapLock 42
LogicalRepWorkerLock 43
XactTruncationLock 44
+GlobalCSNLogControlLock 45
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index c9424f167c..9fec9dcd59 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -42,6 +42,7 @@
#include "catalog/pg_type.h"
#include "commands/async.h"
#include "commands/prepare.h"
+#include "common/hashfn.h"
#include "executor/spi.h"
#include "jit/jit.h"
#include "libpq/libpq.h"
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 2f3e0a70e0..4910e4fc66 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1172,6 +1172,15 @@ static struct config_bool ConfigureNamesBool[] =
false,
NULL, NULL, NULL
},
+ {
+ {"track_global_snapshots", PGC_POSTMASTER, RESOURCES_MEM,
+ gettext_noop("Enable global snapshot tracking."),
+ gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.")
+ },
+ &track_global_snapshots,
+ true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */
+ NULL, NULL, NULL
+ },
{
{"ssl", PGC_SIGHUP, CONN_AUTH_SSL,
gettext_noop("Enables SSL connections."),
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index a0b0458108..f900e7f3b4 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -77,6 +77,8 @@ provider postgresql {
probe clog__checkpoint__done(bool);
probe subtrans__checkpoint__start(bool);
probe subtrans__checkpoint__done(bool);
+ probe globalcsnlog__checkpoint__start(bool);
+ probe globalcsnlog__checkpoint__done(bool);
probe multixact__checkpoint__start(bool);
probe multixact__checkpoint__done(bool);
probe twophase__checkpoint__start();
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 4ff0c6c700..68c44d3f70 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -220,7 +220,8 @@ static const char *const subdirs[] = {
"pg_xact",
"pg_logical",
"pg_logical/snapshots",
- "pg_logical/mappings"
+ "pg_logical/mappings",
+ "pg_global_csn"
};
diff --git a/src/include/access/global_csn_log.h b/src/include/access/global_csn_log.h
new file mode 100644
index 0000000000..618edfc691
--- /dev/null
+++ b/src/include/access/global_csn_log.h
@@ -0,0 +1,30 @@
+/*
+ * global_csn_log.h
+ *
+ * Commit-Sequence-Number log.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/global_csn_log.h
+ */
+#ifndef CSNLOG_H
+#define CSNLOG_H
+
+#include "access/xlog.h"
+#include "utils/snapshot.h"
+
+extern void GlobalCSNLogSetCSN(TransactionId xid, int nsubxids,
+ TransactionId *subxids, GlobalCSN csn);
+extern GlobalCSN GlobalCSNLogGetCSN(TransactionId xid);
+
+extern Size GlobalCSNLogShmemSize(void);
+extern void GlobalCSNLogShmemInit(void);
+extern void BootStrapGlobalCSNLog(void);
+extern void StartupGlobalCSNLog(TransactionId oldestActiveXID);
+extern void ShutdownGlobalCSNLog(void);
+extern void CheckPointGlobalCSNLog(void);
+extern void ExtendGlobalCSNLog(TransactionId newestXact);
+extern void TruncateGlobalCSNLog(TransactionId oldestXact);
+
+#endif /* CSNLOG_H */
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index c04ae97148..0d56f6de61 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -197,6 +197,7 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_XACT_BUFFER = NUM_INDIVIDUAL_LWLOCKS,
LWTRANCHE_COMMITTS_BUFFER,
LWTRANCHE_SUBTRANS_BUFFER,
+ LWTRANCHE_GLOBAL_CSN_LOG_BUFFERS,
LWTRANCHE_MULTIXACTOFFSET_BUFFER,
LWTRANCHE_MULTIXACTMEMBER_BUFFER,
LWTRANCHE_NOTIFY_BUFFER,
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 4796edb63a..57d2dfaa67 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -20,6 +20,9 @@
#include "storage/buf.h"
+typedef uint64 GlobalCSN;
+extern bool track_global_snapshots;
+
/*
* The different snapshot types. We use SnapshotData structures to represent
* both "regular" (MVCC) snapshots and "special" snapshots that have non-MVCC
--
2.25.1
>From 1e27ac62763e112810db7b6279d0862e962db403 Mon Sep 17 00:00:00 2001
From: Andrey Lepikhov <a.lepik...@postgrespro.ru>
Date: Tue, 9 Jun 2020 15:02:39 +0500
Subject: [PATCH 2/3] Global-snapshots
---
src/backend/access/transam/Makefile | 1 +
src/backend/access/transam/global_snapshot.c | 755 ++++++++++++++++++
src/backend/access/transam/twophase.c | 156 ++++
src/backend/access/transam/xact.c | 29 +
src/backend/access/transam/xlog.c | 2 +
src/backend/storage/ipc/ipci.c | 3 +
src/backend/storage/ipc/procarray.c | 92 ++-
src/backend/storage/lmgr/lwlocknames.txt | 1 +
src/backend/storage/lmgr/proc.c | 5 +
src/backend/utils/misc/guc.c | 13 +-
src/backend/utils/misc/postgresql.conf.sample | 2 +
src/backend/utils/time/snapmgr.c | 167 +++-
src/include/access/global_snapshot.h | 72 ++
src/include/access/twophase.h | 1 +
src/include/catalog/pg_proc.dat | 13 +
src/include/datatype/timestamp.h | 3 +
src/include/fmgr.h | 1 +
src/include/portability/instr_time.h | 10 +
src/include/storage/proc.h | 15 +
src/include/storage/procarray.h | 8 +
src/include/utils/snapmgr.h | 3 +
src/include/utils/snapshot.h | 8 +
22 files changed, 1353 insertions(+), 7 deletions(-)
create mode 100644 src/backend/access/transam/global_snapshot.c
create mode 100644 src/include/access/global_snapshot.h
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 60ff8b141e..6de567a79b 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -16,6 +16,7 @@ OBJS = \
clog.o \
commit_ts.o \
global_csn_log.o \
+ global_snapshot.o \
generic_xlog.o \
multixact.o \
parallel.o \
diff --git a/src/backend/access/transam/global_snapshot.c b/src/backend/access/transam/global_snapshot.c
new file mode 100644
index 0000000000..bac16828bb
--- /dev/null
+++ b/src/backend/access/transam/global_snapshot.c
@@ -0,0 +1,755 @@
+/*-------------------------------------------------------------------------
+ *
+ * global_snapshot.c
+ * Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/global_snapshot.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/global_csn_log.h"
+#include "access/global_snapshot.h"
+#include "access/transam.h"
+#include "access/twophase.h"
+#include "access/xact.h"
+#include "portability/instr_time.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "miscadmin.h"
+
+/* Raise a warning if imported global_csn exceeds ours by this value. */
+#define SNAP_DESYNC_COMPLAIN (1*NSECS_PER_SEC) /* 1 second */
+
+/*
+ * GlobalSnapshotState
+ *
+ * Do not trust local clocks to be strictly monotonical and save last acquired
+ * value so later we can compare next timestamp with it. Accessed through
+ * GlobalSnapshotGenerate() and GlobalSnapshotSync().
+ */
+typedef struct
+{
+ GlobalCSN last_global_csn;
+ volatile slock_t lock;
+} GlobalSnapshotState;
+
+static GlobalSnapshotState *gsState;
+
+
+/*
+ * GUC to delay advance of oldestXid for this amount of time. Also determines
+ * the size GlobalSnapshotXidMap circular buffer.
+ */
+int global_snapshot_defer_time;
+
+/*
+ * Enables this module.
+ */
+extern bool track_global_snapshots;
+
+/*
+ * GlobalSnapshotXidMap
+ *
+ * To be able to install global snapshot that points to past we need to keep
+ * old versions of tuples and therefore delay advance of oldestXid. Here we
+ * keep track of correspondence between snapshot's global_csn and oldestXid
+ * that was set at the time when the snapshot was taken. Much like the
+ * snapshot too old's OldSnapshotControlData does, but with finer granularity
+ * to seconds.
+ *
+ * Different strategies can be employed to hold oldestXid (e.g. we can track
+ * oldest global_csn-based snapshot among cluster nodes and map it oldestXid
+ * on each node) but here implemented one that tries to avoid cross-node
+ * communications which are tricky in case of postgres_fdw.
+ *
+ * On each snapshot acquisition GlobalSnapshotMapXmin() is called and stores
+ * correspondence between current global_csn and oldestXmin in a sparse way:
+ * global_csn is rounded to seconds (and here we use the fact that global_csn
+ * is just a timestamp) and oldestXmin is stored in the circular buffer where
+ * rounded global_csn acts as an offset from current circular buffer head.
+ * Size of the circular buffer is controlled by global_snapshot_defer_time GUC.
+ *
+ * When global snapshot arrives from different node we check that its
+ * global_csn is still in our map, otherwise we'll error out with "snapshot too
+ * old" message. If global_csn is successfully mapped to oldestXid we move
+ * backend's pgxact->xmin to proc->originalXmin and fill pgxact->xmin to
+ * mapped oldestXid. That way GetOldestXmin() can take into account backends
+ * with imported global snapshot and old tuple versions will be preserved.
+ *
+ * Also while calculating oldestXmin for our map in presence of imported
+ * global snapshots we should use proc->originalXmin instead of pgxact->xmin
+ * that was set during import. Otherwise, we can create a feedback loop:
+ * xmin's of imported global snapshots were calculated using our map and new
+ * entries in map going to be calculated based on that xmin's, and there is
+ * a risk to stuck forever with one non-increasing oldestXmin. All other
+ * callers of GetOldestXmin() are using pgxact->xmin so the old tuple versions
+ * are preserved.
+ */
+typedef struct GlobalSnapshotXidMap
+{
+ int head; /* offset of current freshest value */
+ int size; /* total size of circular buffer */
+ GlobalCSN_atomic last_csn_seconds; /* last rounded global_csn that changed
+ * xmin_by_second[] */
+ TransactionId *xmin_by_second; /* circular buffer of oldestXmin's */
+}
+GlobalSnapshotXidMap;
+
+static GlobalSnapshotXidMap *gsXidMap;
+
+
+/* Estimate shared memory space needed */
+Size
+GlobalSnapshotShmemSize(void)
+{
+ Size size = 0;
+
+ if (track_global_snapshots || global_snapshot_defer_time > 0)
+ {
+ size += MAXALIGN(sizeof(GlobalSnapshotState));
+ }
+
+ if (global_snapshot_defer_time > 0)
+ {
+ size += sizeof(GlobalSnapshotXidMap);
+ size += global_snapshot_defer_time*sizeof(TransactionId);
+ size = MAXALIGN(size);
+ }
+
+ return size;
+}
+
+/* Init shared memory structures */
+void
+GlobalSnapshotShmemInit()
+{
+ bool found;
+
+ if (track_global_snapshots || global_snapshot_defer_time > 0)
+ {
+ gsState = ShmemInitStruct("gsState",
+ sizeof(GlobalSnapshotState),
+ &found);
+ if (!found)
+ {
+ gsState->last_global_csn = 0;
+ SpinLockInit(&gsState->lock);
+ }
+ }
+
+ if (global_snapshot_defer_time > 0)
+ {
+ gsXidMap = ShmemInitStruct("gsXidMap",
+ sizeof(GlobalSnapshotXidMap),
+ &found);
+ if (!found)
+ {
+ int i;
+
+ pg_atomic_init_u64(&gsXidMap->last_csn_seconds, 0);
+ gsXidMap->head = 0;
+ gsXidMap->size = global_snapshot_defer_time;
+ gsXidMap->xmin_by_second =
+ ShmemAlloc(sizeof(TransactionId)*gsXidMap->size);
+
+ for (i = 0; i < gsXidMap->size; i++)
+ gsXidMap->xmin_by_second[i] = InvalidTransactionId;
+ }
+ }
+}
+
+/*
+ * GlobalSnapshotStartup
+ *
+ * Set gsXidMap entries to oldestActiveXID during startup.
+ */
+void
+GlobalSnapshotStartup(TransactionId oldestActiveXID)
+{
+ /*
+ * Run only if we have initialized shared memory and gsXidMap
+ * is enabled.
+ */
+ if (IsNormalProcessingMode() &&
+ track_global_snapshots && global_snapshot_defer_time > 0)
+ {
+ int i;
+
+ Assert(TransactionIdIsValid(oldestActiveXID));
+ for (i = 0; i < gsXidMap->size; i++)
+ gsXidMap->xmin_by_second[i] = oldestActiveXID;
+ ProcArraySetGlobalSnapshotXmin(oldestActiveXID);
+ }
+}
+
+/*
+ * GlobalSnapshotMapXmin
+ *
+ * Maintain circular buffer of oldestXmins for several seconds in past. This
+ * buffer allows to shift oldestXmin in the past when backend is importing
+ * global transaction. Otherwise old versions of tuples that were needed for
+ * this transaction can be recycled by other processes (vacuum, HOT, etc).
+ *
+ * Locking here is not trivial. Called upon each snapshot creation after
+ * ProcArrayLock is released. Such usage creates several race conditions. It
+ * is possible that backend who got global_csn called GlobalSnapshotMapXmin()
+ * only after other backends managed to get snapshot and complete
+ * GlobalSnapshotMapXmin() call, or even committed. This is safe because
+ *
+ * * We already hold our xmin in MyPgXact, so our snapshot will not be
+ * harmed even though ProcArrayLock is released.
+ *
+ * * snapshot_global_csn is always pessmistically rounded up to the next
+ * second.
+ *
+ * * For performance reasons, xmin value for particular second is filled
+ * only once. Because of that instead of writing to buffer just our
+ * xmin (which is enough for our snapshot), we bump oldestXmin there --
+ * it mitigates the possibility of damaging someone else's snapshot by
+ * writing to the buffer too advanced value in case of slowness of
+ * another backend who generated csn earlier, but didn't manage to
+ * insert it before us.
+ *
+ * * if GlobalSnapshotMapXmin() founds a gap in several seconds between
+ * current call and latest completed call then it should fill that gap
+ * with latest known values instead of new one. Otherwise it is
+ * possible (however highly unlikely) that this gap also happend
+ * between taking snapshot and call to GlobalSnapshotMapXmin() for some
+ * backend. And we are at risk to fill circullar buffer with
+ * oldestXmin's that are bigger then they actually were.
+ */
+void
+GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn)
+{
+ int offset, gap, i;
+ GlobalCSN csn_seconds;
+ GlobalCSN last_csn_seconds;
+ volatile TransactionId oldest_deferred_xmin;
+ TransactionId current_oldest_xmin, previous_oldest_xmin;
+
+ /* Callers should check config values */
+ Assert(global_snapshot_defer_time > 0);
+ Assert(gsXidMap != NULL);
+
+ /*
+ * Round up global_csn to the next second -- pessimistically and safely.
+ */
+ csn_seconds = (snapshot_global_csn / NSECS_PER_SEC + 1);
+
+ /*
+ * Fast-path check. Avoid taking exclusive GlobalSnapshotXidMapLock lock
+ * if oldestXid was already written to xmin_by_second[] for this rounded
+ * global_csn.
+ */
+ if (pg_atomic_read_u64(&gsXidMap->last_csn_seconds) >= csn_seconds)
+ return;
+
+ /* Ok, we have new entry (or entries) */
+ LWLockAcquire(GlobalSnapshotXidMapLock, LW_EXCLUSIVE);
+
+ /* Re-check last_csn_seconds under lock */
+ last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds);
+ if (last_csn_seconds >= csn_seconds)
+ {
+ LWLockRelease(GlobalSnapshotXidMapLock);
+ return;
+ }
+ pg_atomic_write_u64(&gsXidMap->last_csn_seconds, csn_seconds);
+
+ /*
+ * Count oldest_xmin.
+ *
+ * It was possible to calculate oldest_xmin during corresponding snapshot
+ * creation, but GetSnapshotData() intentionally reads only PgXact, but not
+ * PgProc. And we need info about originalXmin (see comment to gsXidMap)
+ * which is stored in PgProc because of threats in comments around PgXact
+ * about extending it with new fields. So just calculate oldest_xmin again,
+ * that anyway happens quite rarely.
+ */
+ current_oldest_xmin = GetOldestXmin(NULL, PROCARRAY_NON_IMPORTED_XMIN);
+
+ previous_oldest_xmin = gsXidMap->xmin_by_second[gsXidMap->head];
+
+ Assert(TransactionIdIsNormal(current_oldest_xmin));
+ Assert(TransactionIdIsNormal(previous_oldest_xmin) || !track_global_snapshots);
+
+ gap = csn_seconds - last_csn_seconds;
+ offset = csn_seconds % gsXidMap->size;
+
+ /* Sanity check before we update head and gap */
+ Assert( gap >= 1 );
+ Assert( (gsXidMap->head + gap) % gsXidMap->size == offset );
+
+ gap = gap > gsXidMap->size ? gsXidMap->size : gap;
+ gsXidMap->head = offset;
+
+ /* Fill new entry with current_oldest_xmin */
+ gsXidMap->xmin_by_second[offset] = current_oldest_xmin;
+
+ /*
+ * If we have gap then fill it with previous_oldest_xmin for reasons
+ * outlined in comment above this function.
+ */
+ for (i = 1; i < gap; i++)
+ {
+ offset = (offset + gsXidMap->size - 1) % gsXidMap->size;
+ gsXidMap->xmin_by_second[offset] = previous_oldest_xmin;
+ }
+
+ oldest_deferred_xmin =
+ gsXidMap->xmin_by_second[ (gsXidMap->head + 1) % gsXidMap->size ];
+
+ LWLockRelease(GlobalSnapshotXidMapLock);
+
+ /*
+ * Advance procArray->global_snapshot_xmin after we released
+ * GlobalSnapshotXidMapLock. Since we gather not xmin but oldestXmin, it
+ * never goes backwards regardless of how slow we can do that.
+ */
+ Assert(TransactionIdFollowsOrEquals(oldest_deferred_xmin,
+ ProcArrayGetGlobalSnapshotXmin()));
+ ProcArraySetGlobalSnapshotXmin(oldest_deferred_xmin);
+}
+
+
+/*
+ * GlobalSnapshotToXmin
+ *
+ * Get oldestXmin that took place when snapshot_global_csn was taken.
+ */
+TransactionId
+GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn)
+{
+ TransactionId xmin;
+ GlobalCSN csn_seconds;
+ volatile GlobalCSN last_csn_seconds;
+
+ /* Callers should check config values */
+ Assert(global_snapshot_defer_time > 0);
+ Assert(gsXidMap != NULL);
+
+ /* Round down to get conservative estimates */
+ csn_seconds = (snapshot_global_csn / NSECS_PER_SEC);
+
+ LWLockAcquire(GlobalSnapshotXidMapLock, LW_SHARED);
+ last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds);
+ if (csn_seconds > last_csn_seconds)
+ {
+ /* we don't have entry for this global_csn yet, return latest known */
+ xmin = gsXidMap->xmin_by_second[gsXidMap->head];
+ }
+ else if (last_csn_seconds - csn_seconds < gsXidMap->size)
+ {
+ /* we are good, retrieve value from our map */
+ Assert(last_csn_seconds % gsXidMap->size == gsXidMap->head);
+ xmin = gsXidMap->xmin_by_second[csn_seconds % gsXidMap->size];
+ }
+ else
+ {
+ /* requested global_csn is too old, let caller know */
+ xmin = InvalidTransactionId;
+ }
+ LWLockRelease(GlobalSnapshotXidMapLock);
+
+ return xmin;
+}
+
+/*
+ * GlobalSnapshotGenerate
+ *
+ * Generate GlobalCSN which is actually a local time. Also we are forcing
+ * this time to be always increasing. Since now it is not uncommon to have
+ * millions of read transactions per second we are trying to use nanoseconds
+ * if such time resolution is available.
+ */
+GlobalCSN
+GlobalSnapshotGenerate(bool locked)
+{
+ instr_time current_time;
+ GlobalCSN global_csn;
+
+ Assert(track_global_snapshots || global_snapshot_defer_time > 0);
+
+ /*
+ * TODO: create some macro that add small random shift to current time.
+ */
+ INSTR_TIME_SET_CURRENT(current_time);
+ global_csn = (GlobalCSN) INSTR_TIME_GET_NANOSEC(current_time);
+
+ /* TODO: change to atomics? */
+ if (!locked)
+ SpinLockAcquire(&gsState->lock);
+
+ if (global_csn <= gsState->last_global_csn)
+ global_csn = ++gsState->last_global_csn;
+ else
+ gsState->last_global_csn = global_csn;
+
+ if (!locked)
+ SpinLockRelease(&gsState->lock);
+
+ return global_csn;
+}
+
+/*
+ * GlobalSnapshotSync
+ *
+ * Due to time desynchronization on different nodes we can receive global_csn
+ * which is greater than global_csn on this node. To preserve proper isolation
+ * this node needs to wait when such global_csn comes on local clock.
+ *
+ * This should happend relatively rare if nodes have running NTP/PTP/etc.
+ * Complain if wait time is more than SNAP_SYNC_COMPLAIN.
+ */
+void
+GlobalSnapshotSync(GlobalCSN remote_gcsn)
+{
+ GlobalCSN local_gcsn;
+ GlobalCSN delta;
+
+ Assert(track_global_snapshots);
+
+ for(;;)
+ {
+ SpinLockAcquire(&gsState->lock);
+ if (gsState->last_global_csn > remote_gcsn)
+ {
+ /* Everything is fine */
+ SpinLockRelease(&gsState->lock);
+ return;
+ }
+ else if ((local_gcsn = GlobalSnapshotGenerate(true)) >= remote_gcsn)
+ {
+ /*
+ * Everything is fine too, but last_global_csn wasn't updated for
+ * some time.
+ */
+ SpinLockRelease(&gsState->lock);
+ return;
+ }
+ SpinLockRelease(&gsState->lock);
+
+ /* Okay we need to sleep now */
+ delta = remote_gcsn - local_gcsn;
+ if (delta > SNAP_DESYNC_COMPLAIN)
+ ereport(WARNING,
+ (errmsg("remote global snapshot exceeds ours by more than a second"),
+ errhint("Consider running NTPd on servers participating in global transaction")));
+
+ /* TODO: report this sleeptime somewhere? */
+ pg_usleep((long) (delta/NSECS_PER_USEC));
+
+ /*
+ * Loop that checks to ensure that we actually slept for specified
+ * amount of time.
+ */
+ }
+
+ Assert(false); /* Should not happend */
+ return;
+}
+
+/*
+ * TransactionIdGetGlobalCSN
+ *
+ * Get GlobalCSN for specified TransactionId taking care about special xids,
+ * xids beyond TransactionXmin and InDoubt states.
+ */
+GlobalCSN
+TransactionIdGetGlobalCSN(TransactionId xid)
+{
+ GlobalCSN global_csn;
+
+ Assert(track_global_snapshots);
+
+ /* Handle permanent TransactionId's for which we don't have mapping */
+ if (!TransactionIdIsNormal(xid))
+ {
+ if (xid == InvalidTransactionId)
+ return AbortedGlobalCSN;
+ if (xid == FrozenTransactionId || xid == BootstrapTransactionId)
+ return FrozenGlobalCSN;
+ Assert(false); /* Should not happend */
+ }
+
+ /*
+ * For xids which less then TransactionXmin GlobalCSNLog can be already
+ * trimmed but we know that such transaction is definetly not concurrently
+ * running according to any snapshot including timetravel ones. Callers
+ * should check TransactionDidCommit after.
+ */
+ if (TransactionIdPrecedes(xid, TransactionXmin))
+ return FrozenGlobalCSN;
+
+ /* Read GlobalCSN from SLRU */
+ global_csn = GlobalCSNLogGetCSN(xid);
+
+ /*
+ * If we faced InDoubt state then transaction is beeing committed and we
+ * should wait until GlobalCSN will be assigned so that visibility check
+ * could decide whether tuple is in snapshot. See also comments in
+ * GlobalSnapshotPrecommit().
+ */
+ if (GlobalCSNIsInDoubt(global_csn))
+ {
+ XactLockTableWait(xid, NULL, NULL, XLTW_None);
+ global_csn = GlobalCSNLogGetCSN(xid);
+ Assert(GlobalCSNIsNormal(global_csn) ||
+ GlobalCSNIsAborted(global_csn));
+ }
+
+ Assert(GlobalCSNIsNormal(global_csn) ||
+ GlobalCSNIsInProgress(global_csn) ||
+ GlobalCSNIsAborted(global_csn));
+
+ return global_csn;
+}
+
+/*
+ * XidInvisibleInGlobalSnapshot
+ *
+ * Version of XidInMVCCSnapshot for global transactions. For non-imported
+ * global snapshots this should give same results as XidInLocalMVCCSnapshot
+ * (except that aborts will be shown as invisible without going to clog) and to
+ * ensure such behaviour XidInMVCCSnapshot is coated with asserts that checks
+ * identicalness of XidInvisibleInGlobalSnapshot/XidInLocalMVCCSnapshot in
+ * case of ordinary snapshot.
+ */
+bool
+XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot)
+{
+ GlobalCSN csn;
+
+ Assert(track_global_snapshots);
+
+ csn = TransactionIdGetGlobalCSN(xid);
+
+ if (GlobalCSNIsNormal(csn))
+ {
+ if (csn < snapshot->global_csn)
+ return false;
+ else
+ return true;
+ }
+ else if (GlobalCSNIsFrozen(csn))
+ {
+ /* It is bootstrap or frozen transaction */
+ return false;
+ }
+ else
+ {
+ /* It is aborted or in-progress */
+ Assert(GlobalCSNIsAborted(csn) || GlobalCSNIsInProgress(csn));
+ if (GlobalCSNIsAborted(csn))
+ Assert(TransactionIdDidAbort(xid));
+ return true;
+ }
+}
+
+
+/*****************************************************************************
+ * Functions to handle distributed commit on transaction coordinator:
+ * GlobalSnapshotPrepareCurrent() / GlobalSnapshotAssignCsnCurrent().
+ * Correspoding functions for remote nodes are defined in twophase.c:
+ * pg_global_snapshot_prepare/pg_global_snapshot_assign.
+ *****************************************************************************/
+
+
+/*
+ * GlobalSnapshotPrepareCurrent
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ */
+GlobalCSN
+GlobalSnapshotPrepareCurrent()
+{
+ TransactionId xid = GetCurrentTransactionIdIfAny();
+
+ if (!track_global_snapshots)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not prepare transaction for global commit"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "track_global_snapshots")));
+
+ if (TransactionIdIsValid(xid))
+ {
+ TransactionId *subxids;
+ int nsubxids = xactGetCommittedChildren(&subxids);
+ GlobalCSNLogSetCSN(xid, nsubxids,
+ subxids, InDoubtGlobalCSN);
+ }
+
+ /* Nothing to write if we don't have xid */
+
+ return GlobalSnapshotGenerate(false);
+}
+
+/*
+ * GlobalSnapshotAssignCsnCurrent
+ *
+ * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly
+ * maximal among of values returned by GlobalSnapshotPrepareCurrent and
+ * pg_global_snapshot_prepare.
+ */
+void
+GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn)
+{
+ if (!track_global_snapshots)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not prepare transaction for global commit"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "track_global_snapshots")));
+
+ if (!GlobalCSNIsNormal(global_csn))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("pg_global_snapshot_assign expects normal global_csn")));
+
+ /* Skip emtpty transactions */
+ if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+ return;
+
+ /* Set global_csn and defuse ProcArrayEndTransaction from assigning one */
+ pg_atomic_write_u64(&MyProc->assignedGlobalCsn, global_csn);
+}
+
+
+/*****************************************************************************
+ * Functions to handle global and local transactions commit.
+ *
+ * For local transactions GlobalSnapshotPrecommit sets InDoubt state before
+ * ProcArrayEndTransaction is called and transaction data potetntially becomes
+ * visible to other backends. ProcArrayEndTransaction (or ProcArrayRemove in
+ * twophase case) then acquires global_csn under ProcArray lock and stores it
+ * in proc->assignedGlobalCsn. It's important that global_csn for commit is
+ * generated under ProcArray lock, otherwise global and local snapshots won't
+ * be equivalent. Consequent call to GlobalSnapshotCommit will write
+ * proc->assignedGlobalCsn to GlobalCSNLog.
+ *
+ * Same rules applies to global transaction, except that global_csn is already
+ * assigned by GlobalSnapshotAssignCsnCurrent/pg_global_snapshot_assign and
+ * GlobalSnapshotPrecommit is basically no-op.
+ *
+ * GlobalSnapshotAbort is slightly different comparing to commit because abort
+ * can skip InDoubt phase and can be called for transaction subtree.
+ *****************************************************************************/
+
+
+/*
+ * GlobalSnapshotAbort
+ *
+ * Abort transaction in GlobalCsnLog. We can skip InDoubt state for aborts
+ * since no concurrent transactions allowed to see aborted data anyway.
+ */
+void
+GlobalSnapshotAbort(PGPROC *proc, TransactionId xid,
+ int nsubxids, TransactionId *subxids)
+{
+ if (!track_global_snapshots)
+ return;
+
+ GlobalCSNLogSetCSN(xid, nsubxids, subxids, AbortedGlobalCSN);
+
+ /*
+ * Clean assignedGlobalCsn anyway, as it was possibly set in
+ * GlobalSnapshotAssignCsnCurrent.
+ */
+ pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN);
+}
+
+/*
+ * GlobalSnapshotPrecommit
+ *
+ * Set InDoubt status for local transaction that we are going to commit.
+ * This step is needed to achieve consistency between local snapshots and
+ * global csn-based snapshots. We don't hold ProcArray lock while writing
+ * csn for transaction in SLRU but instead we set InDoubt status before
+ * transaction is deleted from ProcArray so the readers who will read csn
+ * in the gap between ProcArray removal and GlobalCSN assignment can wait
+ * until GlobalCSN is finally assigned. See also TransactionIdGetGlobalCSN().
+ *
+ * For global transaction this does nothing as InDoubt state was written
+ * earlier.
+ *
+ * This should be called only from parallel group leader before backend is
+ * deleted from ProcArray.
+ */
+void
+GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid,
+ int nsubxids, TransactionId *subxids)
+{
+ GlobalCSN oldAssignedGlobalCsn = InProgressGlobalCSN;
+ bool in_progress;
+
+ if (!track_global_snapshots)
+ return;
+
+ /* Set InDoubt status if it is local transaction */
+ in_progress = pg_atomic_compare_exchange_u64(&proc->assignedGlobalCsn,
+ &oldAssignedGlobalCsn,
+ InDoubtGlobalCSN);
+ if (in_progress)
+ {
+ Assert(GlobalCSNIsInProgress(oldAssignedGlobalCsn));
+ GlobalCSNLogSetCSN(xid, nsubxids,
+ subxids, InDoubtGlobalCSN);
+ }
+ else
+ {
+ /* Otherwise we should have valid GlobalCSN by this time */
+ Assert(GlobalCSNIsNormal(oldAssignedGlobalCsn));
+ /* Also global transaction should already be in InDoubt state */
+ Assert(GlobalCSNIsInDoubt(GlobalCSNLogGetCSN(xid)));
+ }
+}
+
+/*
+ * GlobalSnapshotCommit
+ *
+ * Write GlobalCSN that were acquired earlier to GlobalCsnLog. Should be
+ * preceded by GlobalSnapshotPrecommit() so readers can wait until we finally
+ * finished writing to SLRU.
+ *
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks, so that TransactionIdGetGlobalCSN can wait on this
+ * lock for GlobalCSN.
+ */
+void
+GlobalSnapshotCommit(PGPROC *proc, TransactionId xid,
+ int nsubxids, TransactionId *subxids)
+{
+ volatile GlobalCSN assigned_global_csn;
+
+ if (!track_global_snapshots)
+ return;
+
+ if (!TransactionIdIsValid(xid))
+ {
+ assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn);
+ Assert(GlobalCSNIsInProgress(assigned_global_csn));
+ return;
+ }
+
+ /* Finally write resulting GlobalCSN in SLRU */
+ assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn);
+ Assert(GlobalCSNIsNormal(assigned_global_csn));
+ GlobalCSNLogSetCSN(xid, nsubxids,
+ subxids, assigned_global_csn);
+
+ /* Reset for next transaction */
+ pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN);
+}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 9a69fc1e09..c89d1005c6 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -77,6 +77,7 @@
#include <unistd.h>
#include "access/commit_ts.h"
+#include "access/global_snapshot.h"
#include "access/global_csn_log.h"
#include "access/htup_details.h"
#include "access/subtrans.h"
@@ -1480,8 +1481,34 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
hdr->nabortrels, abortrels,
gid);
+ /*
+ * GlobalSnapshot callbacks that should be called right before we are
+ * going to become visible. Details in comments to this functions.
+ */
+ if (isCommit)
+ GlobalSnapshotPrecommit(proc, xid, hdr->nsubxacts, children);
+ else
+ GlobalSnapshotAbort(proc, xid, hdr->nsubxacts, children);
+
+
ProcArrayRemove(proc, latestXid);
+ /*
+ * Stamp our transaction with GlobalCSN in GlobalCsnLog.
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks, since TransactionIdGetGlobalCSN relies on
+ * XactLockTableWait to await global_csn.
+ */
+ if (isCommit)
+ {
+ GlobalSnapshotCommit(proc, xid, hdr->nsubxacts, children);
+ }
+ else
+ {
+ Assert(GlobalCSNIsInProgress(
+ pg_atomic_read_u64(&proc->assignedGlobalCsn)));
+ }
+
/*
* In case we fail while running the callbacks, mark the gxact invalid so
* no one else will try to commit/rollback, and so it will be recycled if
@@ -2442,3 +2469,132 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
RemoveTwoPhaseFile(xid, giveWarning);
RemoveGXact(gxact);
}
+
+/*
+ * GlobalSnapshotPrepareTwophase
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ *
+ * This function is a counterpart of GlobalSnapshotPrepareCurrent() for
+ * twophase transactions.
+ */
+static GlobalCSN
+GlobalSnapshotPrepareTwophase(const char *gid)
+{
+ GlobalTransaction gxact;
+ PGXACT *pgxact;
+ char *buf;
+ TransactionId xid;
+ xl_xact_parsed_prepare parsed;
+
+ if (!track_global_snapshots)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not prepare transaction for global commit"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "track_global_snapshots")));
+
+ /*
+ * Validate the GID, and lock the GXACT to ensure that two backends do not
+ * try to access the same GID at once.
+ */
+ gxact = LockGXact(gid, GetUserId());
+ pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+ xid = pgxact->xid;
+
+ if (gxact->ondisk)
+ buf = ReadTwoPhaseFile(xid, true);
+ else
+ XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+
+ ParsePrepareRecord(0, (xl_xact_prepare *) buf, &parsed);
+
+ GlobalCSNLogSetCSN(xid, parsed.nsubxacts,
+ parsed.subxacts, InDoubtGlobalCSN);
+
+ /* Unlock our GXACT */
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ gxact->locking_backend = InvalidBackendId;
+ LWLockRelease(TwoPhaseStateLock);
+
+ pfree(buf);
+
+ return GlobalSnapshotGenerate(false);
+}
+
+/*
+ * SQL interface to GlobalSnapshotPrepareTwophase()
+ *
+ * TODO: Rewrite this as PREPARE TRANSACTION 'gid' RETURNING SNAPSHOT
+ */
+Datum
+pg_global_snapshot_prepare(PG_FUNCTION_ARGS)
+{
+ const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ GlobalCSN global_csn;
+
+ global_csn = GlobalSnapshotPrepareTwophase(gid);
+
+ PG_RETURN_INT64(global_csn);
+}
+
+
+/*
+ * TwoPhaseAssignGlobalCsn
+ *
+ * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly
+ * maximal among of values returned by GlobalSnapshotPrepareCurrent and
+ * pg_global_snapshot_prepare.
+ *
+ * This function is a counterpart of GlobalSnapshotAssignCsnCurrent() for
+ * twophase transactions.
+ */
+static void
+GlobalSnapshotAssignCsnTwoPhase(const char *gid, GlobalCSN global_csn)
+{
+ GlobalTransaction gxact;
+ PGPROC *proc;
+
+ if (!track_global_snapshots)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not prepare transaction for global commit"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "track_global_snapshots")));
+
+ if (!GlobalCSNIsNormal(global_csn))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("pg_global_snapshot_assign expects normal global_csn")));
+
+ /*
+ * Validate the GID, and lock the GXACT to ensure that two backends do not
+ * try to access the same GID at once.
+ */
+ gxact = LockGXact(gid, GetUserId());
+ proc = &ProcGlobal->allProcs[gxact->pgprocno];
+
+ /* Set global_csn and defuse ProcArrayRemove from assigning one. */
+ pg_atomic_write_u64(&proc->assignedGlobalCsn, global_csn);
+
+ /* Unlock our GXACT */
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ gxact->locking_backend = InvalidBackendId;
+ LWLockRelease(TwoPhaseStateLock);
+}
+
+/*
+ * SQL interface to GlobalSnapshotAssignCsnTwoPhase()
+ *
+ * TODO: Rewrite this as COMMIT PREPARED 'gid' SNAPSHOT 'global_csn'
+ */
+Datum
+pg_global_snapshot_assign(PG_FUNCTION_ARGS)
+{
+ const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ GlobalCSN global_csn = PG_GETARG_INT64(1);
+
+ GlobalSnapshotAssignCsnTwoPhase(gid, global_csn);
+ PG_RETURN_VOID();
+}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index cd30b62d36..042239ec0e 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -21,6 +21,7 @@
#include <unistd.h>
#include "access/commit_ts.h"
+#include "access/global_snapshot.h"
#include "access/multixact.h"
#include "access/parallel.h"
#include "access/subtrans.h"
@@ -1433,6 +1434,14 @@ RecordTransactionCommit(void)
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
+
+ /*
+ * Mark our transaction as InDoubt in GlobalCsnLog and get ready for
+ * commit.
+ */
+ if (markXidCommitted)
+ GlobalSnapshotPrecommit(MyProc, xid, nchildren, children);
+
cleanup:
/* Clean up local data */
if (rels)
@@ -1694,6 +1703,11 @@ RecordTransactionAbort(bool isSubXact)
*/
TransactionIdAbortTree(xid, nchildren, children);
+ /*
+ * Mark our transaction as Aborted in GlobalCsnLog.
+ */
+ GlobalSnapshotAbort(MyProc, xid, nchildren, children);
+
END_CRIT_SECTION();
/* Compute latestXid while we have the child XIDs handy */
@@ -2183,6 +2197,21 @@ CommitTransaction(void)
*/
ProcArrayEndTransaction(MyProc, latestXid);
+ /*
+ * Stamp our transaction with GlobalCSN in GlobalCsnLog.
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks.
+ */
+ if (!is_parallel_worker)
+ {
+ TransactionId xid = GetTopTransactionIdIfAny();
+ TransactionId *subxids;
+ int nsubxids;
+
+ nsubxids = xactGetCommittedChildren(&subxids);
+ GlobalSnapshotCommit(MyProc, xid, nsubxids, subxids);
+ }
+
/*
* This is all post-commit cleanup. Note that if an error is raised here,
* it's too late to abort the transaction. This should be just
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 4ffe4aad03..aa91526468 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7063,6 +7063,7 @@ StartupXLOG(void)
StartupCLOG();
StartupGlobalCSNLog(oldestActiveXID);
StartupSUBTRANS(oldestActiveXID);
+ GlobalSnapshotStartup(oldestActiveXID);
/*
* If we're beginning at a shutdown checkpoint, we know that
@@ -7881,6 +7882,7 @@ StartupXLOG(void)
StartupCLOG();
StartupGlobalCSNLog(oldestActiveXID);
StartupSUBTRANS(oldestActiveXID);
+ GlobalSnapshotStartup(oldestActiveXID);
}
/*
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index dc2d2959c4..d1819dc2c8 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/nbtree.h"
#include "access/subtrans.h"
#include "access/twophase.h"
+#include "access/global_snapshot.h"
#include "commands/async.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -145,6 +146,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, ApplyLauncherShmemSize());
+ size = add_size(size, GlobalSnapshotShmemSize());
size = add_size(size, SnapMgrShmemSize());
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
@@ -266,6 +268,7 @@ CreateSharedMemoryAndSemaphores(void)
BTreeShmemInit();
SyncScanShmemInit();
AsyncShmemInit();
+ GlobalSnapshotShmemInit();
#ifdef EXEC_BACKEND
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 486da77f68..90c0e90b46 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -47,6 +47,7 @@
#include "access/clog.h"
#include "access/global_csn_log.h"
+#include "access/global_snapshot.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/twophase.h"
@@ -95,6 +96,8 @@ typedef struct ProcArrayStruct
TransactionId replication_slot_xmin;
/* oldest catalog xmin of any replication slot */
TransactionId replication_slot_catalog_xmin;
+ /* xmin of oldest active global snapshot */
+ TransactionId global_snapshot_xmin;
/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
int pgprocnos[FLEXIBLE_ARRAY_MEMBER];
@@ -250,6 +253,7 @@ CreateSharedProcArray(void)
procArray->lastOverflowedXid = InvalidTransactionId;
procArray->replication_slot_xmin = InvalidTransactionId;
procArray->replication_slot_catalog_xmin = InvalidTransactionId;
+ procArray->global_snapshot_xmin = InvalidTransactionId;
}
allProcs = ProcGlobal->allProcs;
@@ -353,6 +357,17 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
latestXid))
ShmemVariableCache->latestCompletedXid = latestXid;
+
+ /*
+ * Assign global csn while holding ProcArrayLock for non-global
+ * COMMIT PREPARED. After lock is released consequent
+ * GlobalSnapshotCommit() will write this value to GlobalCsnLog.
+ *
+ * In case of global commit proc->assignedGlobalCsn is already set
+ * by prior AssignGlobalCsn().
+ */
+ if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn)))
+ pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false));
}
else
{
@@ -433,6 +448,8 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
proc->lxid = InvalidLocalTransactionId;
pgxact->xmin = InvalidTransactionId;
+ proc->originalXmin = InvalidTransactionId;
+
/* must be cleared with xid/xmin: */
pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
proc->delayChkpt = false; /* be sure this is cleared in abort */
@@ -455,6 +472,8 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
pgxact->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId;
pgxact->xmin = InvalidTransactionId;
+ proc->originalXmin = InvalidTransactionId;
+
/* must be cleared with xid/xmin: */
pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
proc->delayChkpt = false; /* be sure this is cleared in abort */
@@ -468,6 +487,20 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
latestXid))
ShmemVariableCache->latestCompletedXid = latestXid;
+
+ /*
+ * Assign global csn while holding ProcArrayLock for non-global
+ * COMMIT. After lock is released consequent GlobalSnapshotFinish() will
+ * write this value to GlobalCsnLog.
+ *
+ * In case of global commit MyProc->assignedGlobalCsn is already set
+ * by prior AssignGlobalCsn().
+ *
+ * TODO: in case of group commit we can generate one GlobalSnapshot for
+ * whole group to save time on timestamp aquisition.
+ */
+ if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn)))
+ pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false));
}
/*
@@ -611,6 +644,7 @@ ProcArrayClearTransaction(PGPROC *proc)
pgxact->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId;
pgxact->xmin = InvalidTransactionId;
+ proc->originalXmin = InvalidTransactionId;
proc->recoveryConflictPending = false;
/* redundant, but just in case */
@@ -1313,6 +1347,7 @@ GetOldestXmin(Relation rel, int flags)
TransactionId replication_slot_xmin = InvalidTransactionId;
TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+ TransactionId global_snapshot_xmin = InvalidTransactionId;
/*
* If we're not computing a relation specific limit, or if a shared
@@ -1349,8 +1384,9 @@ GetOldestXmin(Relation rel, int flags)
proc->databaseId == MyDatabaseId ||
proc->databaseId == 0) /* always include WalSender */
{
- /* Fetch xid just once - see GetNewTransactionId */
+ /* Fetch both xids just once - see GetNewTransactionId */
TransactionId xid = UINT32_ACCESS_ONCE(pgxact->xid);
+ TransactionId original_xmin = UINT32_ACCESS_ONCE(proc->originalXmin);
/* First consider the transaction's own Xid, if any */
if (TransactionIdIsNormal(xid) &&
@@ -1363,8 +1399,17 @@ GetOldestXmin(Relation rel, int flags)
* We must check both Xid and Xmin because a transaction might
* have an Xmin but not (yet) an Xid; conversely, if it has an
* Xid, that could determine some not-yet-set Xmin.
+ *
+ * In case of oldestXmin calculation for GlobalSnapshotMapXmin()
+ * pgxact->xmin should be changed to proc->originalXmin. Details
+ * in commets to GlobalSnapshotMapXmin.
*/
- xid = UINT32_ACCESS_ONCE(pgxact->xmin);
+ if ((flags & PROCARRAY_NON_IMPORTED_XMIN) &&
+ TransactionIdIsValid(original_xmin))
+ xid = original_xmin;
+ else
+ xid = UINT32_ACCESS_ONCE(pgxact->xmin);
+
if (TransactionIdIsNormal(xid) &&
TransactionIdPrecedes(xid, result))
result = xid;
@@ -1378,6 +1423,7 @@ GetOldestXmin(Relation rel, int flags)
*/
replication_slot_xmin = procArray->replication_slot_xmin;
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+ global_snapshot_xmin = ProcArrayGetGlobalSnapshotXmin();
if (RecoveryInProgress())
{
@@ -1419,6 +1465,11 @@ GetOldestXmin(Relation rel, int flags)
result = FirstNormalTransactionId;
}
+ if (!(flags & PROCARRAY_NON_IMPORTED_XMIN) &&
+ TransactionIdIsValid(global_snapshot_xmin) &&
+ NormalTransactionIdPrecedes(global_snapshot_xmin, result))
+ result = global_snapshot_xmin;
+
/*
* Check whether there are replication slots requiring an older xmin.
*/
@@ -1513,8 +1564,10 @@ GetSnapshotData(Snapshot snapshot)
int count = 0;
int subcount = 0;
bool suboverflowed = false;
+ GlobalCSN global_csn = FrozenGlobalCSN;
TransactionId replication_slot_xmin = InvalidTransactionId;
TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+ TransactionId global_snapshot_xmin = InvalidTransactionId;
Assert(snapshot != NULL);
@@ -1706,10 +1759,18 @@ GetSnapshotData(Snapshot snapshot)
*/
replication_slot_xmin = procArray->replication_slot_xmin;
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+ global_snapshot_xmin = ProcArrayGetGlobalSnapshotXmin();
if (!TransactionIdIsValid(MyPgXact->xmin))
MyPgXact->xmin = TransactionXmin = xmin;
+ /*
+ * Take GlobalCSN under ProcArrayLock so the local/global snapshot stays
+ * synchronized.
+ */
+ if (track_global_snapshots)
+ global_csn = GlobalSnapshotGenerate(false);
+
LWLockRelease(ProcArrayLock);
/*
@@ -1725,6 +1786,10 @@ GetSnapshotData(Snapshot snapshot)
if (!TransactionIdIsNormal(RecentGlobalXmin))
RecentGlobalXmin = FirstNormalTransactionId;
+ if (/*track_global_snapshots && */TransactionIdIsValid(global_snapshot_xmin) &&
+ TransactionIdPrecedes(global_snapshot_xmin, RecentGlobalXmin))
+ RecentGlobalXmin = global_snapshot_xmin;
+
/* Check whether there's a replication slot requiring an older xmin. */
if (TransactionIdIsValid(replication_slot_xmin) &&
NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
@@ -1780,6 +1845,11 @@ GetSnapshotData(Snapshot snapshot)
MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
}
+ snapshot->imported_global_csn = false;
+ snapshot->global_csn = global_csn;
+ if (global_snapshot_defer_time > 0 && IsUnderPostmaster)
+ GlobalSnapshotMapXmin(snapshot->global_csn);
+
return snapshot;
}
@@ -3127,6 +3197,24 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
LWLockRelease(ProcArrayLock);
}
+/*
+ * ProcArraySetGlobalSnapshotXmin
+ */
+void
+ProcArraySetGlobalSnapshotXmin(TransactionId xmin)
+{
+ /* We rely on atomic fetch/store of xid */
+ procArray->global_snapshot_xmin = xmin;
+}
+
+/*
+ * ProcArrayGetGlobalSnapshotXmin
+ */
+TransactionId
+ProcArrayGetGlobalSnapshotXmin(void)
+{
+ return procArray->global_snapshot_xmin;
+}
#define XidCacheRemove(i) \
do { \
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index aa904b1f17..45d5b8e6ed 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -51,3 +51,4 @@ OldSnapshotTimeMapLock 42
LogicalRepWorkerLock 43
XactTruncationLock 44
GlobalCSNLogControlLock 45
+GlobalSnapshotXidMapLock 46
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index f5eef6fa4e..cad38c18a6 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -37,6 +37,7 @@
#include "access/transam.h"
#include "access/twophase.h"
+#include "access/global_snapshot.h"
#include "access/xact.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -441,6 +442,9 @@ InitProcess(void)
MyProc->clogGroupMemberLsn = InvalidXLogRecPtr;
Assert(pg_atomic_read_u32(&MyProc->clogGroupNext) == INVALID_PGPROCNO);
+ MyProc->originalXmin = InvalidTransactionId;
+ pg_atomic_init_u64(&MyProc->assignedGlobalCsn, InProgressGlobalCSN);
+
/*
* Acquire ownership of the PGPROC's latch, so that we can use WaitLatch
* on it. That allows us to repoint the process latch, which so far
@@ -584,6 +588,7 @@ InitAuxiliaryProcess(void)
MyProc->lwWaitMode = 0;
MyProc->waitLock = NULL;
MyProc->waitProcLock = NULL;
+ MyProc->originalXmin = InvalidTransactionId;
#ifdef USE_ASSERT_CHECKING
{
int i;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4910e4fc66..79d7123f9f 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -28,6 +28,7 @@
#include "access/commit_ts.h"
#include "access/gin.h"
+#include "access/global_snapshot.h"
#include "access/rmgr.h"
#include "access/tableam.h"
#include "access/transam.h"
@@ -1178,7 +1179,7 @@ static struct config_bool ConfigureNamesBool[] =
gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.")
},
&track_global_snapshots,
- true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */
+ false, /* XXX: Seems that RESOURCES_MEM isn't the best catagory */
NULL, NULL, NULL
},
{
@@ -2467,6 +2468,16 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"global_snapshot_defer_time", PGC_POSTMASTER, REPLICATION_MASTER,
+ gettext_noop("Minimal age of records which allowed to be vacuumed, in seconds."),
+ NULL
+ },
+ &global_snapshot_defer_time,
+ 5, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
/*
* See also CheckRequiredParameterValues() if this parameter changes
*/
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ac02bd0c00..cbd6de119a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -306,6 +306,8 @@
# and comma-separated list of application_name
# from standby(s); '*' = all
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
+#global_snapshot_defer_time = 0 # minimal age of records which allowed to be
+ # vacuumed, in seconds
# - Standby Servers -
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 1c063c592c..3d925a7866 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -48,6 +48,7 @@
#include <sys/stat.h>
#include <unistd.h>
+#include "access/global_snapshot.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
@@ -247,6 +248,8 @@ typedef struct SerializedSnapshotData
CommandId curcid;
TimestampTz whenTaken;
XLogRecPtr lsn;
+ GlobalCSN global_csn;
+ bool imported_global_csn;
} SerializedSnapshotData;
Size
@@ -1024,7 +1027,9 @@ SnapshotResetXmin(void)
pairingheap_first(&RegisteredSnapshots));
if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin))
+ {
MyPgXact->xmin = minSnapshot->xmin;
+ }
}
/*
@@ -2115,6 +2120,8 @@ SerializeSnapshot(Snapshot snapshot, char *start_address)
serialized_snapshot.curcid = snapshot->curcid;
serialized_snapshot.whenTaken = snapshot->whenTaken;
serialized_snapshot.lsn = snapshot->lsn;
+ serialized_snapshot.global_csn = snapshot->global_csn;
+ serialized_snapshot.imported_global_csn = snapshot->imported_global_csn;
/*
* Ignore the SubXID array if it has overflowed, unless the snapshot was
@@ -2189,6 +2196,8 @@ RestoreSnapshot(char *start_address)
snapshot->curcid = serialized_snapshot.curcid;
snapshot->whenTaken = serialized_snapshot.whenTaken;
snapshot->lsn = serialized_snapshot.lsn;
+ snapshot->global_csn = serialized_snapshot.global_csn;
+ snapshot->imported_global_csn = serialized_snapshot.imported_global_csn;
/* Copy XIDs, if present. */
if (serialized_snapshot.xcnt > 0)
@@ -2228,8 +2237,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
}
/*
- * XidInMVCCSnapshot
- * Is the given XID still-in-progress according to the snapshot?
+ * XidInLocalMVCCSnapshot
+ * Is the given XID still-in-progress according to the local snapshot?
*
* Note: GetSnapshotData never stores either top xid or subxids of our own
* backend into a snapshot, so these xids will not be reported as "running"
@@ -2237,8 +2246,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
* TransactionIdIsCurrentTransactionId first, except when it's known the
* XID could not be ours anyway.
*/
-bool
-XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+static bool
+XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot)
{
uint32 i;
@@ -2348,3 +2357,153 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
return false;
}
+
+/*
+ * XidInMVCCSnapshot
+ *
+ * Check whether this xid is in snapshot, taking into account fact that
+ * snapshot can be global. When track_global_snapshots is switched off
+ * just call XidInLocalMVCCSnapshot().
+ */
+bool
+XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+{
+ bool in_snapshot;
+
+ if (snapshot->imported_global_csn)
+ {
+ Assert(track_global_snapshots);
+ /* No point to using snapshot info except CSN */
+ return XidInvisibleInGlobalSnapshot(xid, snapshot);
+ }
+
+ in_snapshot = XidInLocalMVCCSnapshot(xid, snapshot);
+
+ if (!track_global_snapshots)
+ {
+ Assert(GlobalCSNIsFrozen(snapshot->global_csn));
+ return in_snapshot;
+ }
+
+ if (in_snapshot)
+ {
+ /*
+ * This xid may be already in unknown state and in that case
+ * we must wait and recheck.
+ *
+ * TODO: this check can be skipped if we know for sure that there were
+ * no global transactions when this snapshot was taken. That requires
+ * some changes to mechanisms of global snapshots exprot/import (if
+ * backend set xmin then we should have a-priori knowledge that this
+ * transaction going to be global or local -- right now this is not
+ * enforced). Leave that for future and don't complicate this patch.
+ */
+ return XidInvisibleInGlobalSnapshot(xid, snapshot);
+ }
+ else
+ {
+#ifdef USE_ASSERT_CHECKING
+ /* Check that global snapshot gives the same results as local one */
+ if (XidInvisibleInGlobalSnapshot(xid, snapshot))
+ {
+ GlobalCSN gcsn = TransactionIdGetGlobalCSN(xid);
+ Assert(GlobalCSNIsAborted(gcsn));
+ }
+#endif
+ return false;
+ }
+}
+
+/*
+ * ExportGlobalSnapshot
+ *
+ * Export global_csn so that caller can expand this transaction to other
+ * nodes.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+GlobalCSN
+ExportGlobalSnapshot()
+{
+ if (!track_global_snapshots)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not export global snapshot"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "track_global_snapshots")));
+
+ return CurrentSnapshot->global_csn;
+}
+
+/* SQL accessor to ExportGlobalSnapshot() */
+Datum
+pg_global_snapshot_export(PG_FUNCTION_ARGS)
+{
+ GlobalCSN global_csn = ExportGlobalSnapshot();
+ PG_RETURN_UINT64(global_csn);
+}
+
+/*
+ * ImportGlobalSnapshot
+ *
+ * Import global_csn and retract this backends xmin to the value that was
+ * actual when we had such global_csn.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+void
+ImportGlobalSnapshot(GlobalCSN snap_global_csn)
+{
+ volatile TransactionId xmin;
+
+ if (!track_global_snapshots)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not import global snapshot"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "track_global_snapshots")));
+
+ if (global_snapshot_defer_time <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not import global snapshot"),
+ errhint("Make sure the configuration parameter \"%s\" is positive.",
+ "global_snapshot_defer_time")));
+
+ /*
+ * Call GlobalSnapshotToXmin under ProcArrayLock to avoid situation that
+ * resulting xmin will be evicted from map before we will set it into our
+ * backend's xmin.
+ */
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+ xmin = GlobalSnapshotToXmin(snap_global_csn);
+ if (!TransactionIdIsValid(xmin))
+ {
+ LWLockRelease(ProcArrayLock);
+ elog(ERROR, "GlobalSnapshotToXmin: global snapshot too old");
+ }
+ MyProc->originalXmin = MyPgXact->xmin;
+ MyPgXact->xmin = TransactionXmin = xmin;
+ LWLockRelease(ProcArrayLock);
+
+ CurrentSnapshot->xmin = xmin; /* defuse SnapshotResetXmin() */
+ CurrentSnapshot->global_csn = snap_global_csn;
+ CurrentSnapshot->imported_global_csn = true;
+ GlobalSnapshotSync(snap_global_csn);
+
+ Assert(TransactionIdPrecedesOrEquals(RecentGlobalXmin, xmin));
+ Assert(TransactionIdPrecedesOrEquals(RecentGlobalDataXmin, xmin));
+}
+
+/* SQL accessor to ImportGlobalSnapshot() */
+Datum
+pg_global_snapshot_import(PG_FUNCTION_ARGS)
+{
+ GlobalCSN global_csn = PG_GETARG_UINT64(0);
+ ImportGlobalSnapshot(global_csn);
+ PG_RETURN_VOID();
+}
diff --git a/src/include/access/global_snapshot.h b/src/include/access/global_snapshot.h
new file mode 100644
index 0000000000..246b180cfd
--- /dev/null
+++ b/src/include/access/global_snapshot.h
@@ -0,0 +1,72 @@
+/*-------------------------------------------------------------------------
+ *
+ * global_snapshot.h
+ * Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/global_snapshot.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef GLOBAL_SNAPSHOT_H
+#define GLOBAL_SNAPSHOT_H
+
+#include "port/atomics.h"
+#include "storage/lock.h"
+#include "utils/snapshot.h"
+#include "utils/guc.h"
+
+/*
+ * snapshot.h is used in frontend code so atomic variant of GlobalCSN type
+ * is defined here.
+ */
+typedef pg_atomic_uint64 GlobalCSN_atomic;
+
+#define InProgressGlobalCSN UINT64CONST(0x0)
+#define AbortedGlobalCSN UINT64CONST(0x1)
+#define FrozenGlobalCSN UINT64CONST(0x2)
+#define InDoubtGlobalCSN UINT64CONST(0x3)
+#define FirstNormalGlobalCSN UINT64CONST(0x4)
+
+#define GlobalCSNIsInProgress(csn) ((csn) == InProgressGlobalCSN)
+#define GlobalCSNIsAborted(csn) ((csn) == AbortedGlobalCSN)
+#define GlobalCSNIsFrozen(csn) ((csn) == FrozenGlobalCSN)
+#define GlobalCSNIsInDoubt(csn) ((csn) == InDoubtGlobalCSN)
+#define GlobalCSNIsNormal(csn) ((csn) >= FirstNormalGlobalCSN)
+
+
+extern int global_snapshot_defer_time;
+
+
+extern Size GlobalSnapshotShmemSize(void);
+extern void GlobalSnapshotShmemInit(void);
+extern void GlobalSnapshotStartup(TransactionId oldestActiveXID);
+
+extern void GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn);
+extern TransactionId GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn);
+
+extern GlobalCSN GlobalSnapshotGenerate(bool locked);
+
+extern bool XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot);
+
+extern void GlobalSnapshotSync(GlobalCSN remote_gcsn);
+
+extern GlobalCSN TransactionIdGetGlobalCSN(TransactionId xid);
+
+extern GlobalCSN GlobalSnapshotPrepareGlobal(const char *gid);
+extern void GlobalSnapshotAssignCsnGlobal(const char *gid,
+ GlobalCSN global_csn);
+
+extern GlobalCSN GlobalSnapshotPrepareCurrent(void);
+extern void GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn);
+
+extern void GlobalSnapshotAbort(PGPROC *proc, TransactionId xid, int nsubxids,
+ TransactionId *subxids);
+extern void GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids,
+ TransactionId *subxids);
+extern void GlobalSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids,
+ TransactionId *subxids);
+
+#endif /* GLOBAL_SNAPSHOT_H */
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 2ca71c3445..b4899f3754 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -18,6 +18,7 @@
#include "access/xlogdefs.h"
#include "datatype/timestamp.h"
#include "storage/lock.h"
+#include "utils/snapshot.h"
/*
* GlobalTransactionData is defined in twophase.c; other places have no
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 61f2c2f5b4..c76da68a0a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10936,4 +10936,17 @@
proname => 'is_normalized', prorettype => 'bool', proargtypes => 'text text',
prosrc => 'unicode_is_normalized' },
+# global transaction handling
+{ oid => '4388', descr => 'export global transaction snapshot',
+ proname => 'pg_global_snapshot_export', provolatile => 'v', proparallel => 'u',
+ prorettype => 'int8', proargtypes => '', prosrc => 'pg_global_snapshot_export' },
+{ oid => '4389', descr => 'import global transaction snapshot',
+ proname => 'pg_global_snapshot_import', provolatile => 'v', proparallel => 'u',
+ prorettype => 'void', proargtypes => 'int8', prosrc => 'pg_global_snapshot_import' },
+{ oid => '4390', descr => 'prepare distributed transaction for commit, get global_csn',
+ proname => 'pg_global_snapshot_prepare', provolatile => 'v', proparallel => 'u',
+ prorettype => 'int8', proargtypes => 'text', prosrc => 'pg_global_snapshot_prepare' },
+{ oid => '4391', descr => 'assign global_csn to distributed transaction',
+ proname => 'pg_global_snapshot_assign', provolatile => 'v', proparallel => 'u',
+ prorettype => 'void', proargtypes => 'text int8', prosrc => 'pg_global_snapshot_assign' },
]
diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h
index 6be6d35d1e..583b1beea5 100644
--- a/src/include/datatype/timestamp.h
+++ b/src/include/datatype/timestamp.h
@@ -93,6 +93,9 @@ typedef struct
#define USECS_PER_MINUTE INT64CONST(60000000)
#define USECS_PER_SEC INT64CONST(1000000)
+#define NSECS_PER_SEC INT64CONST(1000000000)
+#define NSECS_PER_USEC INT64CONST(1000)
+
/*
* We allow numeric timezone offsets up to 15:59:59 either way from Greenwich.
* Currently, the record holders for wackiest offsets in actual use are zones
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index d349510b7c..5cdf2e17cb 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -280,6 +280,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum);
#define PG_GETARG_FLOAT4(n) DatumGetFloat4(PG_GETARG_DATUM(n))
#define PG_GETARG_FLOAT8(n) DatumGetFloat8(PG_GETARG_DATUM(n))
#define PG_GETARG_INT64(n) DatumGetInt64(PG_GETARG_DATUM(n))
+#define PG_GETARG_UINT64(n) DatumGetUInt64(PG_GETARG_DATUM(n))
/* use this if you want the raw, possibly-toasted input datum: */
#define PG_GETARG_RAW_VARLENA_P(n) ((struct varlena *) PG_GETARG_POINTER(n))
/* use this if you want the input datum de-toasted: */
diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h
index d6459327cc..4ac23da654 100644
--- a/src/include/portability/instr_time.h
+++ b/src/include/portability/instr_time.h
@@ -141,6 +141,9 @@ typedef struct timespec instr_time;
#define INSTR_TIME_GET_MICROSEC(t) \
(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) ((t).tv_nsec / 1000))
+#define INSTR_TIME_GET_NANOSEC(t) \
+ (((uint64) (t).tv_sec * (uint64) 1000000000) + (uint64) ((t).tv_nsec))
+
#else /* !HAVE_CLOCK_GETTIME */
/* Use gettimeofday() */
@@ -205,6 +208,10 @@ typedef struct timeval instr_time;
#define INSTR_TIME_GET_MICROSEC(t) \
(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) (t).tv_usec)
+#define INSTR_TIME_GET_NANOSEC(t) \
+ (((uint64) (t).tv_sec * (uint64) 1000000000) + \
+ (uint64) (t).tv_usec * (uint64) 1000)
+
#endif /* HAVE_CLOCK_GETTIME */
#else /* WIN32 */
@@ -237,6 +244,9 @@ typedef LARGE_INTEGER instr_time;
#define INSTR_TIME_GET_MICROSEC(t) \
((uint64) (((double) (t).QuadPart * 1000000.0) / GetTimerFrequency()))
+#define INSTR_TIME_GET_NANOSEC(t) \
+ ((uint64) (((double) (t).QuadPart * 1000000000.0) / GetTimerFrequency()))
+
static inline double
GetTimerFrequency(void)
{
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 1ee9000b2b..aeaeb021ef 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -15,8 +15,10 @@
#define _PROC_H_
#include "access/clog.h"
+#include "access/global_snapshot.h"
#include "access/xlogdefs.h"
#include "lib/ilist.h"
+#include "utils/snapshot.h"
#include "storage/latch.h"
#include "storage/lock.h"
#include "storage/pg_sema.h"
@@ -57,6 +59,7 @@ struct XidCache
#define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical
* decoding outside xact */
#define PROC_RESERVED 0x20 /* reserved for procarray */
+#define PROC_RESERVED2 0x40 /* reserved for procarray */
/* flags reset at EOXact */
#define PROC_VACUUM_STATE_MASK \
@@ -203,6 +206,18 @@ struct PGPROC
PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */
dlist_head lockGroupMembers; /* list of members, if I'm a leader */
dlist_node lockGroupLink; /* my member link, if I'm a member */
+
+ /*
+ * assignedGlobalCsn holds GlobalCSN for this transaction. It is generated
+ * under a ProcArray lock and later is writter to a GlobalCSNLog. This
+ * variable defined as atomic only for case of group commit, in all other
+ * scenarios only backend responsible for this proc entry is working with
+ * this variable.
+ */
+ GlobalCSN_atomic assignedGlobalCsn;
+
+ /* Original xmin of this backend before global snapshot was imported */
+ TransactionId originalXmin;
};
/* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index a5c7d0c064..452ae5d547 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -36,6 +36,10 @@
#define PROCARRAY_SLOTS_XMIN 0x20 /* replication slot xmin,
* catalog_xmin */
+
+#define PROCARRAY_NON_IMPORTED_XMIN 0x40 /* use originalXmin instead
+ * of xmin to properly
+ * maintain gsXidMap */
/*
* Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching
* PGXACT->vacuumFlags. Other flags are used for different purposes and
@@ -125,4 +129,8 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
TransactionId *catalog_xmin);
+extern void ProcArraySetGlobalSnapshotXmin(TransactionId xmin);
+
+extern TransactionId ProcArrayGetGlobalSnapshotXmin(void);
+
#endif /* PROCARRAY_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index b28d13ce84..f4768bc6d4 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -127,6 +127,9 @@ extern void AtSubCommit_Snapshot(int level);
extern void AtSubAbort_Snapshot(int level);
extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin);
+extern GlobalCSN ExportGlobalSnapshot(void);
+extern void ImportGlobalSnapshot(GlobalCSN snap_global_csn);
+
extern void ImportSnapshot(const char *idstr);
extern bool XactHasExportedSnapshots(void);
extern void DeleteAllExportedSnapshotFiles(void);
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 57d2dfaa67..71c92c69f4 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -204,6 +204,14 @@ typedef struct SnapshotData
TimestampTz whenTaken; /* timestamp when snapshot was taken */
XLogRecPtr lsn; /* position in the WAL stream when taken */
+
+ /*
+ * GlobalCSN for cross-node snapshot isolation support.
+ * Will be used only if track_global_snapshots is enabled.
+ */
+ GlobalCSN global_csn;
+ /* Did we have our own global_csn or imported one from different node */
+ bool imported_global_csn;
} SnapshotData;
#endif /* SNAPSHOT_H */
--
2.25.1
>From 64a39dc351dfd3d624ceca4332b70df3f819bad4 Mon Sep 17 00:00:00 2001
From: Andrey Lepikhov <a.lepik...@postgrespro.ru>
Date: Tue, 9 Jun 2020 15:04:44 +0500
Subject: [PATCH 3/3] postgres_fdw-support-for-global-snapshots
---
contrib/postgres_fdw/Makefile | 9 +
contrib/postgres_fdw/connection.c | 290 ++++++++++++++++--
contrib/postgres_fdw/postgres_fdw.c | 12 +
contrib/postgres_fdw/postgres_fdw.h | 2 +
.../postgres_fdw/t/001_bank_coordinator.pl | 264 ++++++++++++++++
.../postgres_fdw/t/002_bank_participant.pl | 240 +++++++++++++++
src/test/perl/PostgresNode.pm | 35 +++
7 files changed, 826 insertions(+), 26 deletions(-)
create mode 100644 contrib/postgres_fdw/t/001_bank_coordinator.pl
create mode 100644 contrib/postgres_fdw/t/002_bank_participant.pl
diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index ee8a80a392..07091f630e 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -29,3 +29,12 @@ top_builddir = ../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif
+
+# Global makefile will do temp-install for 'check'. Since REGRESS is defined,
+# PGXS (included from contrib-global.mk or directly) will care to add
+# postgres_fdw to it as EXTRA_INSTALL and build pg_regress. It will also
+# actually run pg_regress, so the only thing left is tap tests.
+check: tapcheck
+
+tapcheck: temp-install
+ $(prove_check)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 52d1fe3563..6745b2ae02 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -12,8 +12,10 @@
*/
#include "postgres.h"
+#include "access/global_snapshot.h"
#include "access/htup_details.h"
#include "access/xact.h"
+#include "access/xlog.h" /* GetSystemIdentifier() */
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "mb/pg_wchar.h"
@@ -25,6 +27,8 @@
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h"
+#include "utils/snapmgr.h"
+#include "utils/snapshot.h"
#include "utils/syscache.h"
/*
@@ -65,6 +69,21 @@ typedef struct ConnCacheEntry
*/
static HTAB *ConnectionHash = NULL;
+/*
+ * FdwTransactionState
+ *
+ * Holds number of open remote transactions and shared state
+ * needed for all connection entries.
+ */
+typedef struct FdwTransactionState
+{
+ char *gid;
+ int nparticipants;
+ GlobalCSN global_csn;
+ bool two_phase_commit;
+} FdwTransactionState;
+static FdwTransactionState *fdwTransState;
+
/* for assigning cursor numbers and prepared statement numbers */
static unsigned int cursor_number = 0;
static unsigned int prep_stmt_number = 0;
@@ -72,6 +91,9 @@ static unsigned int prep_stmt_number = 0;
/* tracks whether any work is needed in callback functions */
static bool xact_got_connection = false;
+/* counter of prepared tx made by this backend */
+static int two_phase_xact_count = 0;
+
/* prototypes of private functions */
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
static void disconnect_pg_server(ConnCacheEntry *entry);
@@ -80,6 +102,7 @@ static void configure_remote_session(PGconn *conn);
static void do_sql_command(PGconn *conn, const char *sql);
static void begin_remote_xact(ConnCacheEntry *entry);
static void pgfdw_xact_callback(XactEvent event, void *arg);
+static void deallocate_prepared_stmts(ConnCacheEntry *entry);
static void pgfdw_subxact_callback(SubXactEvent event,
SubTransactionId mySubid,
SubTransactionId parentSubid,
@@ -136,6 +159,15 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
pgfdw_inval_callback, (Datum) 0);
}
+ /* allocate FdwTransactionState */
+ if (fdwTransState == NULL)
+ {
+ MemoryContext oldcxt;
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+ fdwTransState = palloc0(sizeof(FdwTransactionState));
+ MemoryContextSwitchTo(oldcxt);
+ }
+
/* Set flag that we did GetConnection during the current transaction */
xact_got_connection = true;
@@ -447,7 +479,8 @@ configure_remote_session(PGconn *conn)
}
/*
- * Convenience subroutine to issue a non-data-returning SQL command to remote
+ * Convenience subroutine to issue a non-data-returning SQL command or
+ * statement to remote node.
*/
static void
do_sql_command(PGconn *conn, const char *sql)
@@ -457,7 +490,8 @@ do_sql_command(PGconn *conn, const char *sql)
if (!PQsendQuery(conn, sql))
pgfdw_report_error(ERROR, NULL, conn, false, sql);
res = pgfdw_get_result(conn, sql);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
}
@@ -485,6 +519,10 @@ begin_remote_xact(ConnCacheEntry *entry)
elog(DEBUG3, "starting remote transaction on connection %p",
entry->conn);
+ if (UseGlobalSnapshots && (!IsolationUsesXactSnapshot() ||
+ IsolationIsSerializable()))
+ elog(ERROR, "Global snapshots support only REPEATABLE READ");
+
if (IsolationIsSerializable())
sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
else
@@ -493,6 +531,23 @@ begin_remote_xact(ConnCacheEntry *entry)
do_sql_command(entry->conn, sql);
entry->xact_depth = 1;
entry->changing_xact_state = false;
+
+ if (UseGlobalSnapshots)
+ {
+ char import_sql[128];
+
+ /* Export our snapshot */
+ if (fdwTransState->global_csn == 0)
+ fdwTransState->global_csn = ExportGlobalSnapshot();
+
+ snprintf(import_sql, sizeof(import_sql),
+ "SELECT pg_global_snapshot_import("UINT64_FORMAT")",
+ fdwTransState->global_csn);
+
+ do_sql_command(entry->conn, import_sql);
+ }
+
+ fdwTransState->nparticipants += 1;
}
/*
@@ -700,6 +755,94 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
PG_END_TRY();
}
+/* Callback typedef for BroadcastStmt */
+typedef bool (*BroadcastCmdResHandler) (PGresult *result, void *arg);
+
+/* Broadcast sql in parallel to all ConnectionHash entries */
+static bool
+BroadcastStmt(char const * sql, unsigned expectedStatus,
+ BroadcastCmdResHandler handler, void *arg)
+{
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+ bool allOk = true;
+
+ /* Broadcast sql */
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ pgfdw_reject_incomplete_xact_state_change(entry);
+
+ if (entry->xact_depth > 0 && entry->conn != NULL)
+ {
+ if (!PQsendQuery(entry->conn, sql))
+ {
+ PGresult *res = PQgetResult(entry->conn);
+
+ elog(WARNING, "Failed to send command %s", sql);
+ pgfdw_report_error(WARNING, res, entry->conn, true, sql);
+ PQclear(res);
+ }
+ }
+ }
+
+ /* Collect responses */
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ if (entry->xact_depth > 0 && entry->conn != NULL)
+ {
+ PGresult *result = PQgetResult(entry->conn);
+
+ if (PQresultStatus(result) != expectedStatus ||
+ (handler && !handler(result, arg)))
+ {
+ elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus);
+ pgfdw_report_error(ERROR, result, entry->conn, true, sql);
+ allOk = false;
+ }
+ PQclear(result);
+ PQgetResult(entry->conn); /* consume NULL result */
+ }
+ }
+
+ return allOk;
+}
+
+/* Wrapper for broadcasting commands */
+static bool
+BroadcastCmd(char const *sql)
+{
+ return BroadcastStmt(sql, PGRES_COMMAND_OK, NULL, NULL);
+}
+
+/* Wrapper for broadcasting statements */
+static bool
+BroadcastFunc(char const *sql)
+{
+ return BroadcastStmt(sql, PGRES_TUPLES_OK, NULL, NULL);
+}
+
+/* Callback for selecting maximal csn */
+static bool
+MaxCsnCB(PGresult *result, void *arg)
+{
+ char *resp;
+ GlobalCSN *max_csn = (GlobalCSN *) arg;
+ GlobalCSN csn = 0;
+
+ resp = PQgetvalue(result, 0, 0);
+
+ if (resp == NULL || (*resp) == '\0' ||
+ sscanf(resp, UINT64_FORMAT, &csn) != 1)
+ return false;
+
+ if (*max_csn < csn)
+ *max_csn = csn;
+
+ return true;
+}
+
/*
* pgfdw_xact_callback --- cleanup at main-transaction end.
*/
@@ -713,6 +856,86 @@ pgfdw_xact_callback(XactEvent event, void *arg)
if (!xact_got_connection)
return;
+ /* Handle possible two-phase commit */
+ if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT)
+ {
+ bool include_local_tx = false;
+
+ /* Should we take into account this node? */
+ if (TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+ {
+ include_local_tx = true;
+ fdwTransState->nparticipants += 1;
+ }
+
+ /* Switch to 2PC mode there were more than one participant */
+ if (UseGlobalSnapshots && fdwTransState->nparticipants > 1)
+ fdwTransState->two_phase_commit = true;
+
+ if (fdwTransState->two_phase_commit)
+ {
+ GlobalCSN max_csn = InProgressGlobalCSN,
+ my_csn = InProgressGlobalCSN;
+ bool res;
+ char *sql;
+
+ fdwTransState->gid = psprintf("pgfdw:%lld:%llu:%d:%u:%d:%d",
+ (long long) GetCurrentTimestamp(),
+ (long long) GetSystemIdentifier(),
+ MyProcPid,
+ GetCurrentTransactionIdIfAny(),
+ ++two_phase_xact_count,
+ fdwTransState->nparticipants);
+
+ /* Broadcast PREPARE */
+ sql = psprintf("PREPARE TRANSACTION '%s'", fdwTransState->gid);
+ res = BroadcastCmd(sql);
+ if (!res)
+ goto error;
+
+ /* Broadcast pg_global_snapshot_prepare() */
+ if (include_local_tx)
+ my_csn = GlobalSnapshotPrepareCurrent();
+
+ sql = psprintf("SELECT pg_global_snapshot_prepare('%s')",
+ fdwTransState->gid);
+ res = BroadcastStmt(sql, PGRES_TUPLES_OK, MaxCsnCB, &max_csn);
+ if (!res)
+ goto error;
+
+ /* select maximal global csn */
+ if (include_local_tx && my_csn > max_csn)
+ max_csn = my_csn;
+
+ /* Broadcast pg_global_snapshot_assign() */
+ if (include_local_tx)
+ GlobalSnapshotAssignCsnCurrent(max_csn);
+ sql = psprintf("SELECT pg_global_snapshot_assign('%s',"UINT64_FORMAT")",
+ fdwTransState->gid, max_csn);
+ res = BroadcastFunc(sql);
+
+error:
+ if (!res)
+ {
+ sql = psprintf("ABORT PREPARED '%s'", fdwTransState->gid);
+ BroadcastCmd(sql);
+ elog(ERROR, "Failed to PREPARE transaction on remote node");
+ }
+
+ /*
+ * Do not fall down. Consequent COMMIT event will clean thing up.
+ */
+ return;
+ }
+ }
+
+ /* COMMIT open transaction of we were doing 2PC */
+ if (fdwTransState->two_phase_commit &&
+ (event == XACT_EVENT_PARALLEL_COMMIT || event == XACT_EVENT_COMMIT))
+ {
+ BroadcastCmd(psprintf("COMMIT PREPARED '%s'", fdwTransState->gid));
+ }
+
/*
* Scan all connection cache entries to find open remote transactions, and
* close them.
@@ -720,8 +943,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
- PGresult *res;
-
/* Ignore cache entry if no open connection right now */
if (entry->conn == NULL)
continue;
@@ -738,6 +959,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_COMMIT:
+ Assert(!fdwTransState->two_phase_commit);
/*
* If abort cleanup previously failed for this connection,
@@ -750,28 +972,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
do_sql_command(entry->conn, "COMMIT TRANSACTION");
entry->changing_xact_state = false;
- /*
- * If there were any errors in subtransactions, and we
- * made prepared statements, do a DEALLOCATE ALL to make
- * sure we get rid of all prepared statements. This is
- * annoying and not terribly bulletproof, but it's
- * probably not worth trying harder.
- *
- * DEALLOCATE ALL only exists in 8.3 and later, so this
- * constrains how old a server postgres_fdw can
- * communicate with. We intentionally ignore errors in
- * the DEALLOCATE, so that we can hobble along to some
- * extent with older servers (leaking prepared statements
- * as we go; but we don't really support update operations
- * pre-8.3 anyway).
- */
- if (entry->have_prep_stmt && entry->have_error)
- {
- res = PQexec(entry->conn, "DEALLOCATE ALL");
- PQclear(res);
- }
- entry->have_prep_stmt = false;
- entry->have_error = false;
+ deallocate_prepared_stmts(entry);
break;
case XACT_EVENT_PRE_PREPARE:
@@ -790,6 +991,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
break;
case XACT_EVENT_PARALLEL_COMMIT:
case XACT_EVENT_COMMIT:
+ if (fdwTransState->two_phase_commit)
+ deallocate_prepared_stmts(entry);
+ else /* Pre-commit should have closed the open transaction */
+ elog(ERROR, "missed cleaning up connection during pre-commit");
+ break;
case XACT_EVENT_PREPARE:
/* Pre-commit should have closed the open transaction */
elog(ERROR, "missed cleaning up connection during pre-commit");
@@ -885,6 +1091,38 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* Also reset cursor numbering for next transaction */
cursor_number = 0;
+
+ /* Reset fdwTransState */
+ memset(fdwTransState, '\0', sizeof(FdwTransactionState));
+}
+
+/*
+ * If there were any errors in subtransactions, and we
+ * made prepared statements, do a DEALLOCATE ALL to make
+ * sure we get rid of all prepared statements. This is
+ * annoying and not terribly bulletproof, but it's
+ * probably not worth trying harder.
+ *
+ * DEALLOCATE ALL only exists in 8.3 and later, so this
+ * constrains how old a server postgres_fdw can
+ * communicate with. We intentionally ignore errors in
+ * the DEALLOCATE, so that we can hobble along to some
+ * extent with older servers (leaking prepared statements
+ * as we go; but we don't really support update operations
+ * pre-8.3 anyway).
+ */
+static void
+deallocate_prepared_stmts(ConnCacheEntry *entry)
+{
+ PGresult *res;
+
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ res = PQexec(entry->conn, "DEALLOCATE ALL");
+ PQclear(res);
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
}
/*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9fc53cad68..03c5b0093a 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -301,6 +301,9 @@ typedef struct
List *already_used; /* expressions already dealt with */
} ec_member_foreign_arg;
+bool UseGlobalSnapshots;
+void _PG_init(void);
+
/*
* SQL functions
*/
@@ -6584,3 +6587,12 @@ find_em_expr_for_input_target(PlannerInfo *root,
elog(ERROR, "could not find pathkey item to sort");
return NULL; /* keep compiler quiet */
}
+
+void
+_PG_init(void)
+{
+ DefineCustomBoolVariable("postgres_fdw.use_global_snapshots",
+ "Use global snapshots for FDW transactions", NULL,
+ &UseGlobalSnapshots, false, PGC_USERSET, 0, NULL,
+ NULL, NULL);
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..9d3ea077a1 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -208,4 +208,6 @@ extern const char *get_jointype_name(JoinType jointype);
extern bool is_builtin(Oid objectId);
extern bool is_shippable(Oid objectId, Oid classId, PgFdwRelationInfo *fpinfo);
+extern bool UseGlobalSnapshots;
+
#endif /* POSTGRES_FDW_H */
diff --git a/contrib/postgres_fdw/t/001_bank_coordinator.pl b/contrib/postgres_fdw/t/001_bank_coordinator.pl
new file mode 100644
index 0000000000..1e31f33349
--- /dev/null
+++ b/contrib/postgres_fdw/t/001_bank_coordinator.pl
@@ -0,0 +1,264 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+my $master = get_new_node("master");
+$master->init;
+$master->append_conf('postgresql.conf', qq(
+ max_prepared_transactions = 30
+ log_checkpoints = true
+ postgres_fdw.use_global_snapshots = on
+ track_global_snapshots = on
+ default_transaction_isolation = 'REPEATABLE READ'
+));
+$master->start;
+
+my $shard1 = get_new_node("shard1");
+$shard1->init;
+$shard1->append_conf('postgresql.conf', qq(
+ max_prepared_transactions = 30
+ global_snapshot_defer_time = 15
+ track_global_snapshots = on
+));
+$shard1->start;
+
+my $shard2 = get_new_node("shard2");
+$shard2->init;
+$shard2->append_conf('postgresql.conf', qq(
+ max_prepared_transactions = 30
+ global_snapshot_defer_time = 15
+ track_global_snapshots = on
+));
+$shard2->start;
+
+###############################################################################
+# Prepare nodes
+###############################################################################
+
+$master->safe_psql('postgres', qq[
+ CREATE EXTENSION postgres_fdw;
+ CREATE TABLE accounts(id integer primary key, amount integer);
+ CREATE TABLE global_transactions(tx_time timestamp);
+]);
+
+foreach my $node ($shard1, $shard2)
+{
+ my $port = $node->port;
+ my $host = $node->host;
+
+ $node->safe_psql('postgres',
+ "CREATE TABLE accounts(id integer primary key, amount integer)");
+
+ $master->safe_psql('postgres', qq[
+ CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw options(dbname 'postgres', host '$host', port '$port');
+ CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts) server shard_$port options(table_name 'accounts');
+ CREATE USER MAPPING for CURRENT_USER SERVER shard_$port;
+ ])
+}
+
+$shard1->safe_psql('postgres', qq[
+ insert into accounts select 2*id-1, 0 from generate_series(1, 10010) as id;
+ CREATE TABLE local_transactions(tx_time timestamp);
+]);
+
+$shard2->safe_psql('postgres', qq[
+ insert into accounts select 2*id, 0 from generate_series(1, 10010) as id;
+ CREATE TABLE local_transactions(tx_time timestamp);
+]);
+
+diag("master: @{[$master->connstr('postgres')]}");
+diag("shard1: @{[$shard1->connstr('postgres')]}");
+diag("shard2: @{[$shard2->connstr('postgres')]}");
+
+###############################################################################
+# pgbench scripts
+###############################################################################
+
+my $bank = File::Temp->new();
+append_to_file($bank, q{
+ \set id random(1, 20000)
+ BEGIN;
+ WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *)
+ INSERT into global_transactions SELECT now() FROM upd;
+ UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
+ COMMIT;
+});
+
+my $bank1 = File::Temp->new();
+append_to_file($bank1, q{
+ \set id random(1, 10000)
+ BEGIN;
+ WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = (2*:id + 1) RETURNING *)
+ INSERT into local_transactions SELECT now() FROM upd;
+ UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 3);
+ COMMIT;
+});
+
+my $bank2 = File::Temp->new();
+append_to_file($bank2, q{
+ \set id random(1, 10000)
+
+ BEGIN;
+ WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = 2*:id RETURNING *)
+ INSERT into local_transactions SELECT now() FROM upd;
+ UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 2);
+ COMMIT;
+});
+
+###############################################################################
+# Helpers
+###############################################################################
+
+sub count_and_delete_rows
+{
+ my ($node, $table) = @_;
+ my $count;
+
+ $count = $node->safe_psql('postgres',"select count(*) from $table");
+ $node->safe_psql('postgres',"delete from $table");
+ diag($node->name, ": completed $count transactions");
+ return $count;
+}
+
+###############################################################################
+# Concurrent global transactions
+###############################################################################
+
+my ($err, $rc);
+my $started;
+my $seconds = 30;
+my $selects;
+my $total = '0';
+my $oldtotal = '0';
+my $isolation_errors = 0;
+
+
+my $pgb_handle;
+
+$pgb_handle = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+while (time() - $started < $seconds)
+{
+ $total = $master->safe_psql('postgres', "select sum(amount) from accounts");
+ if ( ($total ne $oldtotal) and ($total ne '') )
+ {
+ $isolation_errors++;
+ $oldtotal = $total;
+ diag("Isolation error. Total = $total");
+ }
+ if ($total ne '') { $selects++; }
+}
+
+$master->pgbench_await($pgb_handle);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+ count_and_delete_rows($master, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction');
+
+###############################################################################
+# Concurrent global and local transactions
+###############################################################################
+
+my ($pgb_handle1, $pgb_handle2, $pgb_handle3);
+
+# global txses
+$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+# concurrent local
+$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' );
+$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' );
+
+$started = time();
+$selects = 0;
+$oldtotal = 0;
+while (time() - $started < $seconds)
+{
+ $total = $master->safe_psql('postgres', "select sum(amount) from accounts");
+ if ( ($total ne $oldtotal) and ($total ne '') )
+ {
+ $isolation_errors++;
+ $oldtotal = $total;
+ diag("Isolation error. Total = $total");
+ }
+ if ($total ne '') { $selects++; }
+}
+
+diag("selects = $selects");
+$master->pgbench_await($pgb_handle1);
+$shard1->pgbench_await($pgb_handle2);
+$shard2->pgbench_await($pgb_handle3);
+
+diag("completed $selects selects");
+die "" unless ( $selects > 0 &&
+ count_and_delete_rows($master, 'global_transactions') > 0 &&
+ count_and_delete_rows($shard1, 'local_transactions') > 0 &&
+ count_and_delete_rows($shard2, 'local_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global and local transactions');
+
+
+###############################################################################
+# Snapshot stability
+###############################################################################
+
+my ($hashes, $hash1, $hash2);
+my $stability_errors = 0;
+
+# global txses
+$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+# concurrent local
+$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' );
+$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' );
+
+$selects = 0;
+$started = time();
+while (time() - $started < $seconds)
+{
+ foreach my $node ($master, $shard1, $shard2)
+ {
+ ($hash1, $_, $hash2) = split "\n", $node->safe_psql('postgres', qq[
+ begin isolation level repeatable read;
+ select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t;
+ select pg_sleep(3);
+ select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t;
+ commit;
+ ]);
+
+ if ($hash1 ne $hash2)
+ {
+ diag("oops");
+ $stability_errors++;
+ }
+ elsif ($hash1 eq '' or $hash2 eq '')
+ {
+ die;
+ }
+ else
+ {
+ $selects++;
+ }
+ }
+}
+
+$master->pgbench_await($pgb_handle1);
+$shard1->pgbench_await($pgb_handle2);
+$shard2->pgbench_await($pgb_handle3);
+
+die "" unless ( $selects > 0 &&
+ count_and_delete_rows($master, 'global_transactions') > 0 &&
+ count_and_delete_rows($shard1, 'local_transactions') > 0 &&
+ count_and_delete_rows($shard2, 'local_transactions') > 0);
+
+is($stability_errors, 0, 'snapshot is stable during concurrent global and local transactions');
+
+$master->stop;
+$shard1->stop;
+$shard2->stop;
diff --git a/contrib/postgres_fdw/t/002_bank_participant.pl b/contrib/postgres_fdw/t/002_bank_participant.pl
new file mode 100644
index 0000000000..04a2f1ba85
--- /dev/null
+++ b/contrib/postgres_fdw/t/002_bank_participant.pl
@@ -0,0 +1,240 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+my $shard1 = get_new_node("shard1");
+$shard1->init;
+$shard1->append_conf('postgresql.conf', qq(
+ max_prepared_transactions = 30
+ postgres_fdw.use_global_snapshots = on
+ global_snapshot_defer_time = 15
+ track_global_snapshots = on
+ default_transaction_isolation = 'REPEATABLE READ'
+));
+$shard1->start;
+
+my $shard2 = get_new_node("shard2");
+$shard2->init;
+$shard2->append_conf('postgresql.conf', qq(
+ max_prepared_transactions = 30
+ postgres_fdw.use_global_snapshots = on
+ global_snapshot_defer_time = 15
+ track_global_snapshots = on
+ default_transaction_isolation = 'REPEATABLE READ'
+));
+$shard2->start;
+
+###############################################################################
+# Prepare nodes
+###############################################################################
+
+my @shards = ($shard1, $shard2);
+
+foreach my $node (@shards)
+{
+ $node->safe_psql('postgres', qq[
+ CREATE EXTENSION postgres_fdw;
+ CREATE TABLE accounts(id integer primary key, amount integer);
+ CREATE TABLE accounts_local() inherits(accounts);
+ CREATE TABLE global_transactions(tx_time timestamp);
+ CREATE TABLE local_transactions(tx_time timestamp);
+ ]);
+
+ foreach my $neighbor (@shards)
+ {
+ next if ($neighbor eq $node);
+
+ my $port = $neighbor->port;
+ my $host = $neighbor->host;
+
+ $node->safe_psql('postgres', qq[
+ CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw
+ options(dbname 'postgres', host '$host', port '$port');
+ CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts)
+ server shard_$port options(table_name 'accounts_local');
+ CREATE USER MAPPING for CURRENT_USER SERVER shard_$port;
+ ]);
+ }
+}
+
+$shard1->psql('postgres', "insert into accounts_local select 2*id-1, 0 from generate_series(1, 10010) as id;");
+$shard2->psql('postgres', "insert into accounts_local select 2*id, 0 from generate_series(1, 10010) as id;");
+
+###############################################################################
+# pgbench scripts
+###############################################################################
+
+my $bank = File::Temp->new();
+append_to_file($bank, q{
+ \set id random(1, 20000)
+ BEGIN;
+ WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *)
+ INSERT into global_transactions SELECT now() FROM upd;
+ UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
+ COMMIT;
+});
+
+###############################################################################
+# Helpers
+###############################################################################
+
+sub count_and_delete_rows
+{
+ my ($node, $table) = @_;
+ my $count;
+
+ $count = $node->safe_psql('postgres',"select count(*) from $table");
+ $node->safe_psql('postgres',"delete from $table");
+ diag($node->name, ": completed $count transactions");
+ return $count;
+}
+
+###############################################################################
+# Concurrent global transactions
+###############################################################################
+
+my ($err, $rc);
+my $started;
+my $seconds = 30;
+my $selects;
+my $total = '0';
+my $oldtotal = '0';
+my $isolation_errors = 0;
+my $i;
+
+
+my ($pgb_handle1, $pgb_handle2);
+
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+while (time() - $started < $seconds)
+{
+ my $shard = $shard1;
+ foreach my $shard (@shards)
+ {
+ $total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+ if ( ($total ne $oldtotal) and ($total ne '') )
+ {
+ $isolation_errors++;
+ $oldtotal = $total;
+ diag("$i: Isolation error. Total = $total");
+ }
+ if ($total ne '') { $selects++; }
+ }
+ $i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+ count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+ count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction');
+
+###############################################################################
+# And do the same after soft restart
+###############################################################################
+
+$shard1->restart;
+$shard2->restart;
+$shard1->poll_query_until('postgres', "select 't'")
+ or die "Timed out waiting for shard1 to became online";
+$shard2->poll_query_until('postgres', "select 't'")
+ or die "Timed out waiting for shard2 to became online";
+
+$seconds = 15;
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+
+while (time() - $started < $seconds)
+{
+ my $shard = $shard1;
+ foreach my $shard (@shards)
+ {
+ $total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+ if ( ($total ne $oldtotal) and ($total ne '') )
+ {
+ $isolation_errors++;
+ $oldtotal = $total;
+ diag("$i: Isolation error. Total = $total");
+ }
+ if ($total ne '') { $selects++; }
+ }
+ $i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+ count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+ count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction after restart');
+
+###############################################################################
+# And do the same after hard restart
+###############################################################################
+
+$shard1->teardown_node;
+$shard2->teardown_node;
+$shard1->start;
+$shard2->start;
+$shard1->poll_query_until('postgres', "select 't'")
+ or die "Timed out waiting for shard1 to became online";
+$shard2->poll_query_until('postgres', "select 't'")
+ or die "Timed out waiting for shard2 to became online";
+
+
+$seconds = 15;
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+
+while (time() - $started < $seconds)
+{
+ my $shard = $shard1;
+ foreach my $shard (@shards)
+ {
+ $total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+ if ( ($total ne $oldtotal) and ($total ne '') )
+ {
+ $isolation_errors++;
+ $oldtotal = $total;
+ diag("$i: Isolation error. Total = $total");
+ }
+ if ($total ne '') { $selects++; }
+ }
+ $i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+ count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+ count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction after hard restart');
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 1407359aef..247a21155f 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -2129,6 +2129,41 @@ sub pg_recvlogical_upto
}
}
+sub pgbench()
+{
+ my ($self, $node, @args) = @_;
+ my $pgbench_handle = $self->pgbench_async($node, @args);
+ $self->pgbench_await($pgbench_handle);
+}
+
+sub pgbench_async()
+{
+ my ($self, @args) = @_;
+
+ my ($in, $out, $err, $rc);
+ $in = '';
+ $out = '';
+
+ my @pgbench_command = (
+ 'pgbench',
+ -h => $self->host,
+ -p => $self->port,
+ @args
+ );
+ my $handle = IPC::Run::start(\@pgbench_command, $in, $out);
+ return $handle;
+}
+
+sub pgbench_await()
+{
+ my ($self, $pgbench_handle) = @_;
+
+ # During run some pgbench threads can exit (for example due to
+ # serialization error). That will set non-zero returning code.
+ # So don't check return code here and leave it to a caller.
+ my $rc = IPC::Run::finish($pgbench_handle);
+}
+
=pod
=back
--
2.25.1