Next version of CSN implementation in snapshots to achieve a proper
snapshot isolation in the case of a cross-instance distributed transaction.
--
regards,
Andrey Lepikhov
Postgres Professional
>From bbb7dd1d7621c091f11e697d3d894fe7a36918a6 Mon Sep 17 00:00:00 2001
From: Andrey Lepikhov <a.lepik...@postgrespro.ru>
Date: Wed, 17 Nov 2021 11:13:37 +0500
Subject: [PATCH] Add Commit Sequence Number (CSN) machinery into MVCC
implementation for a timestamp-based resolving of visibility conflicts.
It allows to achieve proper snapshot isolation semantics in the case
of distributed transactions involving more than one Postgres instance.
Authors: K.Knizhnik, S.Kelvich, A.Sher, A.Lepikhov, M.Usama.
Discussion:
(2020/05/21 -)
https://www.postgresql.org/message-id/flat/CA%2Bfd4k6HE8xLGEvqWzABEg8kkju5MxU%2Bif7bf-md0_2pjzXp9Q%40mail.gmail.com#ed1359340871688bed2e643921f73365
(2018/05/01 - 2019/04/21)
https://www.postgresql.org/message-id/flat/21BC916B-80A1-43BF-8650-3363CCDAE09C%40postgrespro.ru
---
doc/src/sgml/config.sgml | 50 +-
src/backend/access/rmgrdesc/Makefile | 1 +
src/backend/access/rmgrdesc/csnlogdesc.c | 95 +++
src/backend/access/rmgrdesc/xlogdesc.c | 6 +-
src/backend/access/transam/Makefile | 2 +
src/backend/access/transam/csn_log.c | 748 ++++++++++++++++++
src/backend/access/transam/csn_snapshot.c | 687 ++++++++++++++++
src/backend/access/transam/rmgr.c | 1 +
src/backend/access/transam/twophase.c | 154 ++++
src/backend/access/transam/varsup.c | 2 +
src/backend/access/transam/xact.c | 32 +
src/backend/access/transam/xlog.c | 23 +-
src/backend/access/transam/xloginsert.c | 2 +
src/backend/commands/vacuum.c | 3 +-
src/backend/storage/ipc/ipci.c | 6 +
src/backend/storage/ipc/procarray.c | 85 ++
src/backend/storage/lmgr/lwlock.c | 2 +
src/backend/storage/lmgr/lwlocknames.txt | 2 +
src/backend/storage/lmgr/proc.c | 6 +
src/backend/storage/sync/sync.c | 5 +
src/backend/utils/misc/guc.c | 37 +
src/backend/utils/probes.d | 2 +
src/backend/utils/time/snapmgr.c | 149 +++-
src/bin/initdb/initdb.c | 3 +-
src/bin/pg_controldata/pg_controldata.c | 2 +
src/bin/pg_upgrade/pg_upgrade.c | 5 +
src/bin/pg_upgrade/pg_upgrade.h | 2 +
src/bin/pg_waldump/rmgrdesc.c | 1 +
src/include/access/csn_log.h | 98 +++
src/include/access/csn_snapshot.h | 54 ++
src/include/access/rmgrlist.h | 1 +
src/include/access/xlog_internal.h | 2 +
src/include/catalog/pg_control.h | 1 +
src/include/catalog/pg_proc.dat | 17 +
src/include/datatype/timestamp.h | 3 +
src/include/fmgr.h | 1 +
src/include/portability/instr_time.h | 10 +
src/include/storage/lwlock.h | 1 +
src/include/storage/proc.h | 14 +
src/include/storage/procarray.h | 7 +
src/include/storage/sync.h | 1 +
src/include/utils/snapmgr.h | 7 +-
src/include/utils/snapshot.h | 11 +
src/test/modules/Makefile | 1 +
src/test/modules/csnsnapshot/Makefile | 25 +
.../modules/csnsnapshot/csn_snapshot.conf | 1 +
.../csnsnapshot/expected/csnsnapshot.out | 1 +
src/test/modules/csnsnapshot/t/001_base.pl | 103 +++
src/test/modules/csnsnapshot/t/002_standby.pl | 66 ++
.../modules/csnsnapshot/t/003_time_skew.pl | 214 +++++
.../csnsnapshot/t/004_read_committed.pl | 97 +++
.../csnsnapshot/t/005_basic_visibility.pl | 181 +++++
src/test/modules/snapshot_too_old/sto.conf | 1 +
src/test/regress/expected/sysviews.out | 4 +-
54 files changed, 3024 insertions(+), 11 deletions(-)
create mode 100644 src/backend/access/rmgrdesc/csnlogdesc.c
create mode 100644 src/backend/access/transam/csn_log.c
create mode 100644 src/backend/access/transam/csn_snapshot.c
create mode 100644 src/include/access/csn_log.h
create mode 100644 src/include/access/csn_snapshot.h
create mode 100644 src/test/modules/csnsnapshot/Makefile
create mode 100644 src/test/modules/csnsnapshot/csn_snapshot.conf
create mode 100644 src/test/modules/csnsnapshot/expected/csnsnapshot.out
create mode 100644 src/test/modules/csnsnapshot/t/001_base.pl
create mode 100644 src/test/modules/csnsnapshot/t/002_standby.pl
create mode 100644 src/test/modules/csnsnapshot/t/003_time_skew.pl
create mode 100644 src/test/modules/csnsnapshot/t/004_read_committed.pl
create mode 100644 src/test/modules/csnsnapshot/t/005_basic_visibility.pl
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3f806740d5..f4f6c83fd0 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -9682,8 +9682,56 @@ dynamic_library_path = 'C:\tools\postgresql;H:\my_project\lib;$libdir'
</varlistentry>
</variablelist>
- </sect1>
+ <sect2 id="runtime-config-CSN-based-snapshot">
+ <title>CSN Based Snapshot</title>
+ <para>
+ By default, snapshots in <productname>PostgreSQL</productname> contains a
+ XID (TransactionID) that allows to identify the status of a transaction
+ and make arbitrary visibility calculations.
+ </para>
+
+ <para>
+ <productname>PostgreSQL</productname> also provides a CSN (Commit
+ Sequence Number) based machinery as an additional tool for visibility
+ calculations. It may be used within distributed transactions when a xid of
+ a local transaction can't correctly identify order of the distributed one.
+ </para>
+
+ <variablelist>
+ <varlistentry id="guc-enable-csn-snapshot" xreflabel="enable_csn_snapshot">
+ <term><varname>enable_csn_snapshot</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>enable_csn_snapshot</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+
+ <para>
+ Enable/disable the CSN tracking for the snapshot.
+ </para>
+
+ <para>
+ <productname>PostgreSQL</productname> uses a physical clock timestamp as
+ a CSN, so enabling the CSN based snapshots can be useful for implementing
+ cross-instance snapshots and visibility of distributed transaction.
+ </para>
+
+ <para>
+ when enabled <productname>PostgreSQL</productname> creates
+ <filename>pg_csn</filename> directory under <envar>PGDATA</envar> to keep
+ the track of CSN and XID mappings.
+ </para>
+
+ <para>
+ The default value is on.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </sect2>
+ </sect1>
<sect1 id="runtime-config-compatible">
<title>Version and Platform Compatibility</title>
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd86..15fc36f7b4 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -11,6 +11,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
brindesc.o \
clogdesc.o \
+ csnlogdesc.o \
committsdesc.o \
dbasedesc.o \
genericdesc.o \
diff --git a/src/backend/access/rmgrdesc/csnlogdesc.c b/src/backend/access/rmgrdesc/csnlogdesc.c
new file mode 100644
index 0000000000..f8c644e906
--- /dev/null
+++ b/src/backend/access/rmgrdesc/csnlogdesc.c
@@ -0,0 +1,95 @@
+/*-------------------------------------------------------------------------
+ *
+ * clogdesc.c
+ * rmgr descriptor routines for access/transam/csn_log.c
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/rmgrdesc/csnlogdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/csn_log.h"
+
+
+void
+csnlog_desc(StringInfo buf, XLogReaderState *record)
+{
+ char *rec = XLogRecGetData(record);
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ if (info == XLOG_CSN_ZEROPAGE)
+ {
+ int pageno;
+
+ memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+ appendStringInfo(buf, "pageno %d", pageno);
+ }
+ else if (info == XLOG_CSN_TRUNCATE)
+ {
+ int pageno;
+
+ memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+ appendStringInfo(buf, "pageno %d", pageno);
+ }
+ else if (info == XLOG_CSN_ASSIGNMENT)
+ {
+ CSN csn;
+
+ memcpy(&csn, XLogRecGetData(record), sizeof(CSN));
+ appendStringInfo(buf, "assign "INT64_FORMAT"", csn);
+ }
+ else if (info == XLOG_CSN_SETCSN)
+ {
+ xl_csn_set *xlrec = (xl_csn_set *) rec;
+ int nsubxids;
+
+ appendStringInfo(buf, "set "INT64_FORMAT" for: %u",
+ xlrec->csn,
+ xlrec->xtop);
+ nsubxids = ((XLogRecGetDataLen(record) - MinSizeOfCSNSet) /
+ sizeof(TransactionId));
+ if (nsubxids > 0)
+ {
+ int i;
+ TransactionId *subxids;
+
+ subxids = palloc(sizeof(TransactionId) * nsubxids);
+ memcpy(subxids,
+ XLogRecGetData(record) + MinSizeOfCSNSet,
+ sizeof(TransactionId) * nsubxids);
+ for (i = 0; i < nsubxids; i++)
+ appendStringInfo(buf, ", %u", subxids[i]);
+ pfree(subxids);
+ }
+ }
+}
+
+const char *
+csnlog_identify(uint8 info)
+{
+ const char *id = NULL;
+
+ switch (info & ~XLR_INFO_MASK)
+ {
+ case XLOG_CSN_ASSIGNMENT:
+ id = "ASSIGNMENT";
+ break;
+ case XLOG_CSN_SETCSN:
+ id = "SETCSN";
+ break;
+ case XLOG_CSN_ZEROPAGE:
+ id = "ZEROPAGE";
+ break;
+ case XLOG_CSN_TRUNCATE:
+ id = "TRUNCATE";
+ break;
+ }
+
+ return id;
+}
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 5bf2346dd9..ea433046cf 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -113,7 +113,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
appendStringInfo(buf, "max_connections=%d max_worker_processes=%d "
"max_wal_senders=%d max_prepared_xacts=%d "
"max_locks_per_xact=%d wal_level=%s "
- "wal_log_hints=%s track_commit_timestamp=%s",
+ "wal_log_hints=%s track_commit_timestamp=%s "
+ "enable_csn_snapshot=%s",
xlrec.MaxConnections,
xlrec.max_worker_processes,
xlrec.max_wal_senders,
@@ -121,7 +122,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
xlrec.max_locks_per_xact,
wal_level_str,
xlrec.wal_log_hints ? "on" : "off",
- xlrec.track_commit_timestamp ? "on" : "off");
+ xlrec.track_commit_timestamp ? "on" : "off",
+ xlrec.enable_csn_snapshot ? "on" : "off");
}
else if (info == XLOG_FPW_CHANGE)
{
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de72..fc0321ee6b 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -15,6 +15,8 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
clog.o \
commit_ts.o \
+ csn_log.o \
+ csn_snapshot.o \
generic_xlog.o \
multixact.o \
parallel.o \
diff --git a/src/backend/access/transam/csn_log.c b/src/backend/access/transam/csn_log.c
new file mode 100644
index 0000000000..33517271ed
--- /dev/null
+++ b/src/backend/access/transam/csn_log.c
@@ -0,0 +1,748 @@
+/*-----------------------------------------------------------------------------
+ *
+ * csn_log.c
+ * Track commit sequence numbers of finished transactions
+ *
+ * This module provides SLRU to store CSN for each transaction. This
+ * mapping need to be kept only for xid's greater then oldestXid, but
+ * that can require arbitrary large amounts of memory in case of long-lived
+ * transactions. Because of same lifetime and persistancy requirements
+ * this module is quite similar to subtrans.c
+ *
+ * If we switch database from CSN-base snapshot to xid-base snapshot then,
+ * nothing wrong. But if we switch xid-base snapshot to CSN-base snapshot
+ * it should decide a new xid which begin csn-base check. It can not be
+ * oldestActiveXID because of prepared transaction.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/csn_log.c
+ *
+ *-----------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/csn_log.h"
+#include "access/slru.h"
+#include "access/csn_snapshot.h"
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/xlogutils.h"
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "portability/instr_time.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/snapmgr.h"
+#include "access/xlog_internal.h"
+
+/*
+ * We use csnSnapshotActive to judge if csn snapshot enabled instead of by
+ * enable_csn_snapshot, this design is similar to 'track_commit_timestamp'.
+ *
+ * Because in process of replication if master changes 'enable_csn_snapshot'
+ * in a database restart, standby should apply wal record for GUC changed,
+ * then it's difficult to notice all backends about that. So they can get
+ * the message by 'csnSnapshotActive' which in shared buffer. It will not
+ * acquire a lock, so without performance issue.
+ * last_max_csn - Record the max csn till now.
+ * last_csn_log_wal - for interval we log the assign csn to wal
+ * oldestXmin - first sensible Xmin on the first existed page in the CSN Log
+ */
+typedef struct CSNShared
+{
+ bool csnSnapshotActive;
+ pg_atomic_uint32 oldestXmin;
+ CSN last_max_csn;
+ CSN last_csn_log_wal;
+ volatile slock_t lock;
+} CSNShared;
+
+CSNShared *csnShared;
+
+/*
+ * Defines for CSNLog page sizes. A page is the same BLCKSZ as is used
+ * everywhere else in Postgres.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * CSNLog page numbering also wraps around at
+ * 0xFFFFFFFF/CSN_LOG_XACTS_PER_PAGE, and CSNLog segment numbering at
+ * 0xFFFFFFFF/CLOG_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
+ * explicit notice of that fact in this module, except when comparing segment
+ * and page numbers in TruncateCSNLog (see CSNLogPagePrecedes).
+ */
+
+/* We store the commit CSN for each xid */
+#define CSN_LOG_XACTS_PER_PAGE (BLCKSZ / sizeof(CSN))
+
+#define TransactionIdToPage(xid) ((xid) / (TransactionId) CSN_LOG_XACTS_PER_PAGE)
+#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CSN_LOG_XACTS_PER_PAGE)
+
+/*
+ * Link to shared-memory data structures for CLOG control
+ */
+static SlruCtlData CSNLogCtlData;
+#define CsnlogCtl (&CSNLogCtlData)
+
+static int ZeroCSNLogPage(int pageno, bool write_xlog);
+static void ZeroTruncateCSNLogPage(int pageno, bool write_xlog);
+static bool CSNLogPagePrecedes(int page1, int page2);
+static void CSNLogSetPageStatus(TransactionId xid, int nsubxids,
+ TransactionId *subxids,
+ CSN csn, int pageno);
+static void CSNLogSetCSNInSlot(TransactionId xid, CSN csn, int slotno);
+
+static void WriteCSNXlogRec(TransactionId xid, int nsubxids,
+ TransactionId *subxids, CSN csn);
+static void WriteZeroCSNPageXlogRec(int pageno);
+static void WriteTruncateCSNXlogRec(int pageno);
+static void set_oldest_xmin(TransactionId xid);
+
+
+/*
+ * Number of shared CSNLog buffers.
+ */
+static Size
+CSNLogShmemBuffers(void)
+{
+ return Min(32, Max(4, NBuffers / 512));
+}
+
+/*
+ * Reserve shared memory for CsnlogCtl.
+ */
+Size
+CSNLogShmemSize(void)
+{
+ return SimpleLruShmemSize(CSNLogShmemBuffers(), 0);
+}
+
+/*
+ * Initialization of shared memory for CSNLog.
+ */
+void
+CSNLogShmemInit(void)
+{
+ bool found;
+
+ CsnlogCtl->PagePrecedes = CSNLogPagePrecedes;
+ SimpleLruInit(CsnlogCtl, "CSNLog Ctl", CSNLogShmemBuffers(), 0,
+ CSNLogSLRULock, "pg_csn", LWTRANCHE_CSN_LOG_BUFFERS,
+ SYNC_HANDLER_CSN);
+
+ csnShared = ShmemInitStruct("CSNlog shared",
+ sizeof(CSNShared),
+ &found);
+ if (!found)
+ {
+ csnShared->csnSnapshotActive = false;
+ pg_atomic_init_u32(&csnShared->oldestXmin, InvalidTransactionId);
+ csnShared->last_max_csn = InvalidCSN;
+ csnShared->last_csn_log_wal = InvalidCSN;
+ SpinLockInit(&csnShared->lock);
+ }
+}
+
+/*
+ * CSNLogSetCSN
+ *
+ * Record CSN of transaction and its subtransaction tree.
+ *
+ * xid is a single xid to set status for. This will typically be the top level
+ * transactionid for a top level commit or abort. It can also be a
+ * subtransaction when we record transaction aborts.
+ *
+ * subxids is an array of xids of length nsubxids, representing subtransactions
+ * in the tree of xid. In various cases nsubxids may be zero.
+ *
+ * csn is the commit sequence number of the transaction. It should be
+ * AbortedCSN for abort cases.
+ */
+void
+CSNLogSetCSN(TransactionId xid, int nsubxids, TransactionId *subxids, CSN csn,
+ bool write_xlog)
+{
+ int pageno;
+ int i = 0;
+ int offset = 0;
+
+ Assert(TransactionIdIsValid(xid));
+
+ pageno = TransactionIdToPage(xid); /* get page of parent */
+
+ if(write_xlog)
+ WriteCSNXlogRec(xid, nsubxids, subxids, csn);
+
+ for (;;)
+ {
+ int num_on_page = 0;
+
+ /* Form subtransactions bucket that can be written on the same page */
+ while (i < nsubxids && TransactionIdToPage(subxids[i]) == pageno)
+ {
+ num_on_page++;
+ i++;
+ }
+
+ CSNLogSetPageStatus(xid,
+ num_on_page, subxids + offset,
+ csn, pageno);
+ if (i >= nsubxids)
+ break;
+
+ offset = i;
+ pageno = TransactionIdToPage(subxids[offset]);
+ xid = InvalidTransactionId;
+ }
+}
+
+/*
+ * Record the final state of transaction entries in the csn log for
+ * all entries on a single page. Atomic only on this page.
+ *
+ * Otherwise API is same as TransactionIdSetTreeStatus()
+ */
+static void
+CSNLogSetPageStatus(TransactionId xid, int nsubxids, TransactionId *subxids,
+ CSN csn, int pageno)
+{
+ int slotno;
+ int i;
+
+ LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE);
+
+ slotno = SimpleLruReadPage(CsnlogCtl, pageno, true, xid);
+
+ /* Subtransactions first, if needed ... */
+ for (i = 0; i < nsubxids; i++)
+ {
+ Assert(CsnlogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
+ CSNLogSetCSNInSlot(subxids[i], csn, slotno);
+ }
+
+ /* ... then the main transaction */
+ if (TransactionIdIsValid(xid))
+ CSNLogSetCSNInSlot(xid, csn, slotno);
+
+ CsnlogCtl->shared->page_dirty[slotno] = true;
+
+ LWLockRelease(CSNLogSLRULock);
+}
+
+/*
+ * Sets the commit status of a single transaction.
+ */
+static void
+CSNLogSetCSNInSlot(TransactionId xid, CSN csn, int slotno)
+{
+ int entryno = TransactionIdToPgIndex(xid);
+ CSN *ptr;
+
+ Assert(LWLockHeldByMe(CSNLogSLRULock));
+
+ ptr = (CSN *) (CsnlogCtl->shared->page_buffer[slotno] +
+ entryno * sizeof(CSN));
+ *ptr = csn;
+}
+
+/*
+ * Interrogate the state of a transaction in the log.
+ *
+ * NB: this is a low-level routine and is NOT the preferred entry point
+ * for most uses; TransactionIdGetCSN() in csn_snapshot.c is the
+ * intended caller.
+ */
+CSN
+CSNLogGetCSNByXid(TransactionId xid)
+{
+ int pageno = TransactionIdToPage(xid);
+ int entryno = TransactionIdToPgIndex(xid);
+ int slotno;
+ CSN csn;
+
+ /* lock is acquired by SimpleLruReadPage_ReadOnly */
+ slotno = SimpleLruReadPage_ReadOnly(CsnlogCtl, pageno, xid);
+ csn = *(CSN *) (CsnlogCtl->shared->page_buffer[slotno] +
+ entryno * sizeof(CSN));
+ LWLockRelease(CSNLogSLRULock);
+
+ return csn;
+}
+
+/*
+ * Initialize (or reinitialize) a page of CSNLog to zeroes.
+ *
+ * The page is not actually written, just set up in shared memory.
+ * The slot number of the new page is returned.
+ *
+ * Control lock must be held at entry, and will be held at exit.
+ */
+static int
+ZeroCSNLogPage(int pageno, bool write_xlog)
+{
+ Assert(LWLockHeldByMe(CSNLogSLRULock));
+ if(write_xlog)
+ WriteZeroCSNPageXlogRec(pageno);
+ return SimpleLruZeroPage(CsnlogCtl, pageno);
+}
+
+static void
+ZeroTruncateCSNLogPage(int pageno, bool write_xlog)
+{
+ if(write_xlog)
+ WriteTruncateCSNXlogRec(pageno);
+ SimpleLruTruncate(CsnlogCtl, pageno);
+}
+
+void
+ActivateCSNlog(void)
+{
+ int pageno;
+ TransactionId nextXid = InvalidTransactionId;
+ TransactionId oldest_xid = InvalidTransactionId;
+
+ if (csnShared->csnSnapshotActive)
+ return;
+
+ nextXid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
+ pageno = TransactionIdToPage(nextXid);
+
+ LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE);
+
+ /*
+ * Create the current segment file, if necessary.
+ * This means that
+ */
+ if (!SimpleLruDoesPhysicalPageExist(CsnlogCtl, pageno))
+ {
+ int slotno;
+ TransactionId curxid = nextXid;
+
+ slotno = ZeroCSNLogPage(pageno, false);
+ SimpleLruWritePage(CsnlogCtl, slotno);
+
+ elog(LOG, "Create SLRU page=%d, slotno=%d for xid %u on a CSN log activation",
+ pageno, slotno, nextXid);
+
+ /*
+ * nextXid isn't first xid on the page. It is the first page in the CSN
+ * log. Set UnclearCSN value into all previous slots on this page.
+ * This xid value can be used as an oldest xid in the CSN log.
+ */
+ if (TransactionIdToPgIndex(nextXid) > 0)
+ {
+ /* Cleaning procedure. Can be optimized. */
+ do
+ {
+ curxid--;
+ CSNLogSetCSNInSlot(curxid, UnclearCSN, slotno);
+ } while (TransactionIdToPgIndex(curxid) > 0);
+
+ elog(LOG,
+ "Set UnclearCSN values for %d xids in the range [%u,%u]",
+ nextXid - curxid, curxid, nextXid-1);
+
+ /* Oldest XID found on this page */
+ oldest_xid = nextXid;
+ }
+ }
+ LWLockRelease(CSNLogSLRULock);
+
+ if (!TransactionIdIsValid(oldest_xid))
+ {
+ TransactionId curxid;
+
+ elog(LOG, "Search for the oldest xid across previous pages");
+
+ /* Need to scan previous pages for an oldest xid. */
+ while (pageno > 0 && SimpleLruDoesPhysicalPageExist(CsnlogCtl, pageno - 1))
+ pageno--;
+
+ /* look up for the first clear xid value. */
+ curxid = pageno * (TransactionId) CSN_LOG_XACTS_PER_PAGE;
+ while(CSNLogGetCSNByXid(curxid) == UnclearCSN)
+ curxid++;
+ oldest_xid = curxid;
+ }
+
+ set_oldest_xmin(oldest_xid);
+ csnShared->csnSnapshotActive = true;
+}
+
+bool
+get_csnlog_status(void)
+{
+ return csnShared->csnSnapshotActive;
+}
+
+void
+DeactivateCSNlog(void)
+{
+ csnShared->csnSnapshotActive = false;
+ set_oldest_xmin(InvalidTransactionId);
+ LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE);
+ (void) SlruScanDirectory(CsnlogCtl, SlruScanDirCbDeleteAll, NULL);
+ LWLockRelease(CSNLogSLRULock);
+ elog(LOG, "CSN log has deactivated");
+}
+
+void
+StartupCSN(void)
+{
+ ActivateCSNlog();
+}
+
+void
+CompleteCSNInitialization(void)
+{
+ /*
+ * If the feature is not enabled, turn it off for good. This also removes
+ * any leftover data.
+ *
+ * Conversely, we activate the module if the feature is enabled. This is
+ * necessary for primary and standby as the activation depends on the
+ * control file contents at the beginning of recovery or when a
+ * XLOG_PARAMETER_CHANGE is replayed.
+ */
+ if (!enable_csn_snapshot)
+ DeactivateCSNlog();
+ else
+ ActivateCSNlog();
+}
+
+void
+CSNlogParameterChange(bool newvalue, bool oldvalue)
+{
+ if (newvalue)
+ {
+ if (!csnShared->csnSnapshotActive)
+ ActivateCSNlog();
+ }
+ else if (csnShared->csnSnapshotActive)
+ DeactivateCSNlog();
+}
+
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
+void
+CheckPointCSNLog(void)
+{
+ if (!get_csnlog_status())
+ return;
+
+ /*
+ * Flush dirty CSNLog pages to disk.
+ *
+ * This is not actually necessary from a correctness point of view. We do
+ * it merely to improve the odds that writing of dirty pages is done by
+ * the checkpoint process and not by backends.
+ */
+ TRACE_POSTGRESQL_CSNLOG_CHECKPOINT_START(true);
+ SimpleLruWriteAll(CsnlogCtl, true);
+ TRACE_POSTGRESQL_CSNLOG_CHECKPOINT_DONE(true);
+}
+
+/*
+ * Make sure that CSNLog has room for a newly-allocated XID.
+ *
+ * NB: this is called while holding XidGenLock. We want it to be very fast
+ * most of the time; even when it's not so fast, no actual I/O need happen
+ * unless we're forced to write out a dirty clog or xlog page to make room
+ * in shared memory.
+ */
+void
+ExtendCSNLog(TransactionId newestXact)
+{
+ int pageno;
+
+ if (!get_csnlog_status())
+ return;
+
+ /*
+ * No work except at first XID of a page. But beware: just after
+ * wraparound, the first XID of page zero is FirstNormalTransactionId.
+ */
+ if (TransactionIdToPgIndex(newestXact) != 0 &&
+ !TransactionIdEquals(newestXact, FirstNormalTransactionId))
+ return;
+
+ pageno = TransactionIdToPage(newestXact);
+
+ LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE);
+
+ /* Zero the page and make an XLOG entry about it */
+ ZeroCSNLogPage(pageno, !InRecovery);
+
+ LWLockRelease(CSNLogSLRULock);
+}
+
+/*
+ * Remove all CSNLog segments before the one holding the passed
+ * transaction ID.
+ *
+ * This is normally called during checkpoint, with oldestXact being the
+ * oldest TransactionXmin of any running transaction.
+ */
+void
+TruncateCSNLog(TransactionId oldestXact)
+{
+ int cutoffPage;
+ TransactionId oldestXmin;
+
+ /* Can't do truncation because WAL messages isn't allowed during recovery */
+ if (RecoveryInProgress() || !get_csnlog_status())
+ return;
+
+ /*
+ * The cutoff point is the start of the segment containing oldestXact. We
+ * pass the *page* containing oldestXact to SimpleLruTruncate. We step
+ * back one transaction to avoid passing a cutoff page that hasn't been
+ * created yet in the rare case that oldestXact would be the first item on
+ * a page and oldestXact == next XID. In that case, if we didn't subtract
+ * one, we'd trigger SimpleLruTruncate's wraparound detection.
+ */
+ TransactionIdRetreat(oldestXact);
+ cutoffPage = TransactionIdToPage(oldestXact);
+
+ /* Detect, that we really need to cut CSN log. */
+ oldestXmin = pg_atomic_read_u32(&csnShared->oldestXmin);
+
+ if (TransactionIdToPage(oldestXmin) < cutoffPage)
+ {
+ /* OldestXact is located in the same page as oldestXmin. No actions needed. */
+ return;
+ }
+
+ /*
+ * Shift oldestXmin to the start of new first page. Use first position
+ * on the page because all transactions on this page is created with enabled
+ * CSN snapshot machinery.
+ */
+ pg_atomic_write_u32(&csnShared->oldestXmin,
+ oldestXact - TransactionIdToPgIndex(oldestXact));
+
+ SpinLockRelease(&csnShared->lock);
+ ZeroTruncateCSNLogPage(cutoffPage, true);
+}
+
+/*
+ * Decide which of two CSNLog page numbers is "older" for truncation
+ * purposes.
+ *
+ * We need to use comparison of TransactionIds here in order to do the right
+ * thing with wraparound XID arithmetic. However, if we are asked about
+ * page number zero, we don't want to hand InvalidTransactionId to
+ * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
+ * offset both xids by FirstNormalTransactionId to avoid that.
+ */
+static bool
+CSNLogPagePrecedes(int page1, int page2)
+{
+ TransactionId xid1;
+ TransactionId xid2;
+
+ xid1 = ((TransactionId) page1) * CSN_LOG_XACTS_PER_PAGE;
+ xid1 += FirstNormalTransactionId;
+ xid2 = ((TransactionId) page2) * CSN_LOG_XACTS_PER_PAGE;
+ xid2 += FirstNormalTransactionId;
+
+ return TransactionIdPrecedes(xid1, xid2);
+}
+
+void
+WriteAssignCSNXlogRec(CSN csn)
+{
+ Assert(enable_csn_wal && csn <= csnShared->last_csn_log_wal);
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&csn), sizeof(CSN));
+ XLogInsert(RM_CSNLOG_ID, XLOG_CSN_ASSIGNMENT);
+}
+
+static void
+WriteCSNXlogRec(TransactionId xid, int nsubxids,
+ TransactionId *subxids, CSN csn)
+{
+ xl_csn_set xlrec;
+
+ if(!enable_csn_wal)
+ return;
+
+ xlrec.xtop = xid;
+ xlrec.nsubxacts = nsubxids;
+ xlrec.csn = csn;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, MinSizeOfCSNSet);
+ XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
+ XLogInsert(RM_CSNLOG_ID, XLOG_CSN_SETCSN);
+}
+
+/*
+ * Write a ZEROPAGE xlog record
+ */
+static void
+WriteZeroCSNPageXlogRec(int pageno)
+{
+ if(!enable_csn_wal)
+ {
+ return;
+ }
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&pageno), sizeof(int));
+ (void) XLogInsert(RM_CSNLOG_ID, XLOG_CSN_ZEROPAGE);
+}
+
+/*
+ * Write a TRUNCATE xlog record
+ */
+static void
+WriteTruncateCSNXlogRec(int pageno)
+{
+ if(!enable_csn_wal)
+ {
+ return;
+ }
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&pageno), sizeof(int));
+ XLogInsert(RM_CSNLOG_ID, XLOG_CSN_TRUNCATE);
+}
+
+
+void
+csnlog_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ /* Backup blocks are not used in csnlog records */
+ Assert(!XLogRecHasAnyBlockRefs(record));
+
+ if (info == XLOG_CSN_ASSIGNMENT)
+ {
+ CSN csn;
+
+ memcpy(&csn, XLogRecGetData(record), sizeof(CSN));
+ /* XXX: Do we really not needed to acquire the lock here? */
+ csnShared->last_max_csn = csn;
+ }
+ else if (info == XLOG_CSN_SETCSN)
+ {
+ xl_csn_set *xlrec = (xl_csn_set *) XLogRecGetData(record);
+ CSNLogSetCSN(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub, xlrec->csn, false);
+ }
+ else if (info == XLOG_CSN_ZEROPAGE)
+ {
+ int pageno;
+ int slotno;
+
+ memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+ LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE);
+ slotno = ZeroCSNLogPage(pageno, false);
+ SimpleLruWritePage(CsnlogCtl, slotno);
+ LWLockRelease(CSNLogSLRULock);
+ Assert(!CsnlogCtl->shared->page_dirty[slotno]);
+
+ }
+ else if (info == XLOG_CSN_TRUNCATE)
+ {
+ int pageno;
+
+ memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+ CsnlogCtl->shared->latest_page_number = pageno;
+ ZeroTruncateCSNLogPage(pageno, false);
+ }
+ else
+ elog(PANIC, "csnlog_redo: unknown op code %u", info);
+}
+
+/*
+ * Entrypoint for sync.c to sync members files.
+ */
+int
+csnsyncfiletag(const FileTag *ftag, char *path)
+{
+ return SlruSyncFileTag(&CSNLogCtlData, ftag, path);
+}
+
+/*
+ * GenerateCSN
+ *
+ * Generate CSN which is actually a local time. Also we are forcing
+ * this time to be always increasing. Since now it is not uncommon to have
+ * millions of read transactions per second we are trying to use nanoseconds
+ * if such time resolution is available.
+ */
+CSN
+GenerateCSN(bool locked, CSN assign)
+{
+ instr_time current_time;
+ CSN csn;
+ CSN log_csn = InvalidCSN;
+
+ Assert(get_csnlog_status() || csn_snapshot_defer_time > 0);
+
+ /* TODO: create some macro that add small random shift to current time. */
+ INSTR_TIME_SET_CURRENT(current_time);
+ csn = (CSN) INSTR_TIME_GET_NANOSEC(current_time) + (int64) (csn_time_shift * 1E9);
+
+ if(assign != InvalidCSN && csn < assign)
+ csn = assign;
+
+ /* TODO: change to atomics? */
+ if (!locked)
+ SpinLockAcquire(&csnShared->lock);
+
+ if (csn <= csnShared->last_max_csn)
+ csn = csnShared->last_max_csn + 1;
+ csnShared->last_max_csn = csn;
+
+ if (enable_csn_wal && csn > csnShared->last_csn_log_wal)
+ {
+ /*
+ * We log the CSN 5s greater than generated, you can see comments on
+ * the CSN_ASSIGN_TIME_INTERVAL.
+ */
+ log_csn = CSNAddByNanosec(csn, CSN_ASSIGN_TIME_INTERVAL);
+ csnShared->last_csn_log_wal = log_csn;
+ }
+
+ if (!locked)
+ SpinLockRelease(&csnShared->lock);
+
+ if (log_csn != InvalidCSN)
+ WriteAssignCSNXlogRec(csn);
+
+ return csn;
+}
+
+CSN
+GetLastGeneratedCSN(void)
+{
+ CSN csn;
+
+ SpinLockAcquire(&csnShared->lock);
+ csn = csnShared->last_max_csn;
+ SpinLockRelease(&csnShared->lock);
+ return csn;
+}
+
+/*
+ * Mostly for debug purposes.
+ */
+static void
+set_oldest_xmin(TransactionId xid)
+{
+ elog(LOG, "Oldest Xmin for CSN will be changed from %u to %u",
+ pg_atomic_read_u32(&csnShared->oldestXmin), xid);
+
+ pg_atomic_write_u32(&csnShared->oldestXmin, xid);
+}
+
+TransactionId
+GetOldestXmin(void)
+{
+ Assert(get_csnlog_status());
+ return pg_atomic_read_u32(&csnShared->oldestXmin);
+}
diff --git a/src/backend/access/transam/csn_snapshot.c b/src/backend/access/transam/csn_snapshot.c
new file mode 100644
index 0000000000..a381d219ea
--- /dev/null
+++ b/src/backend/access/transam/csn_snapshot.c
@@ -0,0 +1,687 @@
+/*-------------------------------------------------------------------------
+ *
+ * csn_snapshot.c
+ * Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/csn_snapshot.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/csn_snapshot.h"
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/twophase.h"
+#include "access/xact.h"
+#include "portability/instr_time.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "miscadmin.h"
+
+/* Raise a warning if imported snapshot_csn exceeds ours by this value. */
+#define SNAP_DESYNC_COMPLAIN (1*NSECS_PER_SEC) /* 1 second */
+
+static TransactionId xmin_for_csn = InvalidTransactionId;
+
+
+/*
+ * GUC to delay advance of oldestXid for this amount of time. Also determines
+ * the size CSNSnapshotXidMap circular buffer.
+ */
+int csn_snapshot_defer_time;
+
+int csn_time_shift;
+
+/*
+ * CSNSnapshotXidMap
+ *
+ * To be able to install csn snapshot that points to past we need to keep
+ * old versions of tuples and therefore delay advance of oldestXid. Here we
+ * keep track of correspondence between snapshot's snapshot_csn and oldestXid
+ * that was set at the time when the snapshot was taken. Much like the
+ * snapshot too old's OldSnapshotControlData does, but with finer granularity
+ * to seconds.
+ *
+ * Different strategies can be employed to hold oldestXid (e.g. we can track
+ * oldest csn-based snapshot among cluster nodes and map it oldestXid
+ * on each node).
+ *
+ * On each snapshot acquisition CSNSnapshotMapXmin() is called and stores
+ * correspondence between current snapshot_csn and oldestXmin in a sparse way:
+ * snapshot_csn is rounded to seconds (and here we use the fact that snapshot_csn
+ * is just a timestamp) and oldestXmin is stored in the circular buffer where
+ * rounded snapshot_csn acts as an offset from current circular buffer head.
+ * Size of the circular buffer is controlled by csn_snapshot_defer_time GUC.
+ *
+ * When csn snapshot arrives we check that its
+ * snapshot_csn is still in our map, otherwise we'll error out with "snapshot too
+ * old" message. If snapshot_csn is successfully mapped to oldestXid we move
+ * backend's pgxact->xmin to proc->originalXmin and fill pgxact->xmin to
+ * mapped oldestXid. That way GetOldestXmin() can take into account backends
+ * with imported csn snapshot and old tuple versions will be preserved.
+ *
+ * Also while calculating oldestXmin for our map in presence of imported
+ * csn snapshots we should use proc->originalXmin instead of pgxact->xmin
+ * that was set during import. Otherwise, we can create a feedback loop:
+ * xmin's of imported csn snapshots were calculated using our map and new
+ * entries in map going to be calculated based on that xmin's, and there is
+ * a risk to stuck forever with one non-increasing oldestXmin. All other
+ * callers of GetOldestXmin() are using pgxact->xmin so the old tuple versions
+ * are preserved.
+ */
+typedef struct CSNSnapshotXidMap
+{
+ int head; /* offset of current freshest value */
+ int size; /* total size of circular buffer */
+ CSN_atomic last_csn_seconds; /* last rounded csn that changed
+ * xmin_by_second[] */
+ TransactionId *xmin_by_second; /* circular buffer of oldestXmin's */
+}
+CSNSnapshotXidMap;
+
+static CSNSnapshotXidMap *csnXidMap;
+
+
+/* Estimate shared memory space needed */
+Size
+CSNSnapshotShmemSize(void)
+{
+ Size size = 0;
+
+ if (csn_snapshot_defer_time > 0)
+ {
+ size += sizeof(CSNSnapshotXidMap);
+ size += csn_snapshot_defer_time*sizeof(TransactionId);
+ size = MAXALIGN(size);
+ }
+
+ return size;
+}
+
+/* Init shared memory structures */
+void
+CSNSnapshotShmemInit()
+{
+ bool found;
+
+ if (csn_snapshot_defer_time > 0)
+ {
+ csnXidMap = ShmemInitStruct("csnXidMap",
+ sizeof(CSNSnapshotXidMap),
+ &found);
+ if (!found)
+ {
+ int i;
+
+ pg_atomic_init_u64(&csnXidMap->last_csn_seconds, 0);
+ csnXidMap->head = 0;
+ csnXidMap->size = csn_snapshot_defer_time;
+ csnXidMap->xmin_by_second =
+ ShmemAlloc(sizeof(TransactionId)*csnXidMap->size);
+
+ for (i = 0; i < csnXidMap->size; i++)
+ csnXidMap->xmin_by_second[i] = InvalidTransactionId;
+ }
+ }
+}
+
+/*
+ * CSNSnapshotStartup
+ *
+ * Set csnXidMap entries to oldestActiveXID during startup.
+ */
+void
+CSNSnapshotStartup(TransactionId oldestActiveXID)
+{
+ /*
+ * Run only if we have initialized shared memory and csnXidMap
+ * is enabled.
+ */
+ if (IsNormalProcessingMode() &&
+ enable_csn_snapshot && csn_snapshot_defer_time > 0)
+ {
+ int i;
+
+ Assert(TransactionIdIsValid(oldestActiveXID));
+ for (i = 0; i < csnXidMap->size; i++)
+ csnXidMap->xmin_by_second[i] = oldestActiveXID;
+ ProcArraySetCSNSnapshotXmin(oldestActiveXID);
+
+ elog(LOG, "CSN map initialized with oldest active xid %u", oldestActiveXID);
+ }
+}
+
+/*
+ * CSNSnapshotMapXmin
+ *
+ * Maintain circular buffer of oldestXmins for several seconds in past. This
+ * buffer allows to shift oldestXmin in the past when backend is importing
+ * CSN snapshot. Otherwise old versions of tuples that were needed for
+ * this transaction can be recycled by other processes (vacuum, HOT, etc).
+ *
+ * Locking here is not trivial. Called upon each snapshot creation after
+ * ProcArrayLock is released. Such usage creates several race conditions. It
+ * is possible that backend who got csn called CSNSnapshotMapXmin()
+ * only after other backends managed to get snapshot and complete
+ * CSNSnapshotMapXmin() call, or even committed. This is safe because
+ *
+ * * We already hold our xmin in MyPgXact, so our snapshot will not be
+ * harmed even though ProcArrayLock is released.
+ *
+ * * snapshot_csn is always pessmistically rounded up to the next
+ * second.
+ *
+ * * For performance reasons, xmin value for particular second is filled
+ * only once. Because of that instead of writing to buffer just our
+ * xmin (which is enough for our snapshot), we bump oldestXmin there --
+ * it mitigates the possibility of damaging someone else's snapshot by
+ * writing to the buffer too advanced value in case of slowness of
+ * another backend who generated csn earlier, but didn't manage to
+ * insert it before us.
+ *
+ * * if CSNSnapshotMapXmin() founds a gap in several seconds between
+ * current call and latest completed call then it should fill that gap
+ * with latest known values instead of new one. Otherwise it is
+ * possible (however highly unlikely) that this gap also happend
+ * between taking snapshot and call to CSNSnapshotMapXmin() for some
+ * backend. And we are at risk to fill circullar buffer with
+ * oldestXmin's that are bigger then they actually were.
+ */
+void
+CSNSnapshotMapXmin(SnapshotCSN snapshot_csn)
+{
+ int offset, gap, i;
+ SnapshotCSN csn_seconds;
+ SnapshotCSN last_csn_seconds;
+ volatile TransactionId oldest_deferred_xmin;
+ TransactionId current_oldest_xmin, previous_oldest_xmin;
+ TransactionId ImportedXmin;
+
+ /* Callers should check config values */
+ Assert(csn_snapshot_defer_time > 0);
+ Assert(csnXidMap != NULL);
+ /*
+ * Round up snapshot_csn to the next second -- pessimistically and safely.
+ */
+ csn_seconds = (snapshot_csn / NSECS_PER_SEC + 1);
+
+ /*
+ * Fast-path check. Avoid taking exclusive CSNSnapshotXidMapLock lock
+ * if oldestXid was already written to xmin_by_second[] for this rounded
+ * snapshot_csn.
+ */
+ if (pg_atomic_read_u64(&csnXidMap->last_csn_seconds) >= csn_seconds)
+ return;
+
+ /* Ok, we have new entry (or entries) */
+ LWLockAcquire(CSNSnapshotXidMapLock, LW_EXCLUSIVE);
+
+ /* Re-check last_csn_seconds under lock */
+ last_csn_seconds = pg_atomic_read_u64(&csnXidMap->last_csn_seconds);
+ if (last_csn_seconds >= csn_seconds)
+ {
+ LWLockRelease(CSNSnapshotXidMapLock);
+ return;
+ }
+ pg_atomic_write_u64(&csnXidMap->last_csn_seconds, csn_seconds);
+
+ /*
+ * Count oldest_xmin.
+ *
+ * It was possible to calculate oldest_xmin during corresponding snapshot
+ * creation, but GetSnapshotData() intentionally reads only PgXact, but not
+ * PgProc. And we need info about originalXmin (see comment to csnXidMap)
+ * which is stored in PgProc because of threats in comments around PgXact
+ * about extending it with new fields. So just calculate oldest_xmin again,
+ * that anyway happens quite rarely.
+ */
+
+ /*
+ * Don't afraid here because csn_snapshot_xmin will hold border of
+ * minimal non-removable from vacuuming.
+ */
+ ImportedXmin = MyProc->xmin;
+ MyProc->xmin = MyProc->originalXmin;
+ current_oldest_xmin = GetOldestNonRemovableTransactionId(NULL);
+ MyProc->xmin = ImportedXmin;
+ Assert(TransactionIdIsNormal(current_oldest_xmin));
+
+ previous_oldest_xmin = csnXidMap->xmin_by_second[csnXidMap->head];
+ Assert(TransactionIdIsNormal(previous_oldest_xmin) || !enable_csn_snapshot);
+
+ gap = csn_seconds - last_csn_seconds;
+ offset = csn_seconds % csnXidMap->size;
+
+ /* Sanity check before we update head and gap */
+ Assert( gap >= 1 );
+ Assert( (csnXidMap->head + gap) % csnXidMap->size == offset );
+
+ gap = gap > csnXidMap->size ? csnXidMap->size : gap;
+ csnXidMap->head = offset;
+
+ /* Fill new entry with current_oldest_xmin */
+ csnXidMap->xmin_by_second[offset] = current_oldest_xmin;
+
+ /*
+ * If we have gap then fill it with previous_oldest_xmin for reasons
+ * outlined in comment above this function.
+ */
+ for (i = 1; i < gap; i++)
+ {
+ offset = (offset + csnXidMap->size - 1) % csnXidMap->size;
+ csnXidMap->xmin_by_second[offset] = previous_oldest_xmin;
+ }
+
+ oldest_deferred_xmin =
+ csnXidMap->xmin_by_second[ (csnXidMap->head + 1) % csnXidMap->size ];
+
+ LWLockRelease(CSNSnapshotXidMapLock);
+
+ elog(DEBUG5, "Advance xmin for CSN. Oldest deferred xmin = %u",
+ oldest_deferred_xmin);
+
+ /*
+ * Advance procArray->csn_snapshot_xmin after we released
+ * CSNSnapshotXidMapLock. Since we gather not xmin but oldestXmin, it
+ * never goes backwards regardless of how slow we can do that.
+ */
+ /*Assert(TransactionIdFollowsOrEquals(oldest_deferred_xmin,
+ ProcArrayGetCSNSnapshotXmin()));*/
+ ProcArraySetCSNSnapshotXmin(oldest_deferred_xmin);
+}
+
+
+/*
+ * CSNSnapshotToXmin
+ *
+ * Get oldestXmin that took place when snapshot_csn was taken.
+ */
+TransactionId
+CSNSnapshotToXmin(SnapshotCSN snapshot_csn)
+{
+ TransactionId xmin;
+ SnapshotCSN csn_seconds;
+ volatile SnapshotCSN last_csn_seconds;
+
+ /* Callers should check config values */
+ Assert(csn_snapshot_defer_time > 0);
+ Assert(csnXidMap != NULL);
+
+ /* Round down to get conservative estimates */
+ csn_seconds = (snapshot_csn / NSECS_PER_SEC);
+
+ LWLockAcquire(CSNSnapshotXidMapLock, LW_SHARED);
+ last_csn_seconds = pg_atomic_read_u64(&csnXidMap->last_csn_seconds);
+ if (csn_seconds > last_csn_seconds)
+ {
+ /* we don't have entry for this snapshot_csn yet, return latest known */
+ xmin = csnXidMap->xmin_by_second[csnXidMap->head];
+ }
+ else if (last_csn_seconds - csn_seconds < csnXidMap->size)
+ {
+ /* we are good, retrieve value from our map */
+ Assert(last_csn_seconds % csnXidMap->size == csnXidMap->head);
+ xmin = csnXidMap->xmin_by_second[csn_seconds % csnXidMap->size];
+ }
+ else
+ {
+ /* requested snapshot_csn is too old, let caller know */
+ xmin = InvalidTransactionId;
+ }
+ LWLockRelease(CSNSnapshotXidMapLock);
+
+ return xmin;
+}
+
+/*
+ * CSNSnapshotPrepareCurrent
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ */
+SnapshotCSN
+CSNSnapshotPrepareCurrent(void)
+{
+ TransactionId xid = GetCurrentTransactionIdIfAny();
+
+ if (!enable_csn_snapshot)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not prepare transaction for global commit"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "enable_csn_snapshot")));
+
+ if (TransactionIdIsValid(xid))
+ {
+ TransactionId *subxids;
+ int nsubxids = xactGetCommittedChildren(&subxids);
+ CSNLogSetCSN(xid, nsubxids, subxids, InDoubtCSN, true);
+ }
+
+ /* Nothing to write if we don't have xid */
+
+ return GenerateCSN(false, InvalidCSN);
+}
+
+
+/*
+ * CSNSnapshotAssignCurrent
+ *
+ * Assign SnapshotCSN to the currently active transaction. SnapshotCSN is supposedly
+ * maximal among of values returned by CSNSnapshotPrepareCurrent and
+ * pg_csn_snapshot_prepare.
+ */
+void
+CSNSnapshotAssignCurrent(SnapshotCSN snapshot_csn)
+{
+ if (!enable_csn_snapshot)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not prepare transaction for global commit"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "enable_csn_snapshot")));
+
+ if (!CSNIsNormal(snapshot_csn))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("pg_csn_snapshot_assign expects normal snapshot_csn")));
+
+ Assert(snapshot_csn != InvalidCSN);
+ /* We do not care the Generate result, we just want to make sure max
+ * csnShared->last_max_csn value.
+ */
+ GenerateCSN(false, snapshot_csn);
+
+ /* Set csn and defuse ProcArrayEndTransaction from assigning one */
+ pg_atomic_write_u64(&MyProc->assignedCSN, snapshot_csn);
+}
+
+/*
+ * CSNSnapshotSync
+ *
+ * Due to time desynchronization on different nodes we can receive snapshot_csn
+ * which is greater than snapshot_csn on this node. To preserve proper isolation
+ * this node needs to wait when such snapshot_csn comes on local clock.
+ *
+ * This should happend relatively rare if nodes have running NTP/PTP/etc.
+ * Complain if wait time is more than SNAP_SYNC_COMPLAIN.
+ */
+void
+CSNSnapshotSync(SnapshotCSN remote_csn)
+{
+ SnapshotCSN local_csn;
+ SnapshotCSN delta;
+
+ Assert(enable_csn_snapshot);
+
+ for(;;)
+ {
+ if (GetLastGeneratedCSN() > remote_csn)
+ return;
+
+ local_csn = GenerateCSN(true, InvalidCSN);
+
+ if (local_csn >= remote_csn)
+ /*
+ * Everything is fine too, but last_max_csn wasn't updated for
+ * some time.
+ */
+ return;
+
+ /* Okay we need to sleep now */
+ delta = remote_csn - local_csn;
+ if (delta > SNAP_DESYNC_COMPLAIN)
+ ereport(WARNING,
+ (errmsg("remote global snapshot exceeds ours by more than a second"),
+ errhint("Consider running NTPd on servers participating in global transaction")));
+
+ /* TODO: report this sleeptime somewhere? */
+ pg_usleep((long) (delta/NSECS_PER_USEC));
+
+ /*
+ * Loop that checks to ensure that we actually slept for specified
+ * amount of time.
+ */
+ }
+
+ Assert(false); /* Should not happend */
+ return;
+}
+
+/*
+ * TransactionIdGetCSN
+ *
+ * Get CSN for specified TransactionId taking care about special xids,
+ * xids beyond TransactionXmin and InDoubt states.
+ */
+CSN
+TransactionIdGetCSN(TransactionId xid)
+{
+ CSN csn;
+
+ /* Handle permanent TransactionId's for which we don't have mapping */
+ if (!TransactionIdIsNormal(xid))
+ {
+ if (xid == InvalidTransactionId)
+ return AbortedCSN;
+ if (xid == FrozenTransactionId || xid == BootstrapTransactionId)
+ return FrozenCSN;
+ Assert(false); /* Should not happend */
+ }
+
+ /*
+ * If we just switch a xid-snapsot to a csn_snapshot, we should handle a start
+ * xid for csn base check. Just in case we have prepared transaction which
+ * hold the TransactionXmin but without CSN.
+ */
+ xmin_for_csn = GetOldestXmin();
+
+ /*
+ * For the xid with 'xid >= TransactionXmin and xid < xmin_for_csn',
+ * it defined as unclear csn which follow xid-snapshot result.
+ */
+ if(!TransactionIdPrecedes(xid, TransactionXmin) &&
+ TransactionIdPrecedes(xid, xmin_for_csn))
+ {
+ elog(LOG, "UnclearCSN was returned. xid=%u, TransactionXmin=%u, xmin_for_csn=%u",
+ xid, TransactionXmin, xmin_for_csn);
+ return UnclearCSN;
+ }
+ /*
+ * For xids which less then TransactionXmin CSNLog can be already
+ * trimmed but we know that such transaction is definitely not concurrently
+ * running according to any snapshot including timetravel ones. Callers
+ * should check TransactionDidCommit after.
+ */
+ if (TransactionIdPrecedes(xid, TransactionXmin))
+ return FrozenCSN;
+
+ /* Read CSN from SLRU */
+ csn = CSNLogGetCSNByXid(xid);
+
+ /*
+ * If we faced InDoubt state then transaction is being committed and we
+ * should wait until CSN will be assigned so that visibility check
+ * could decide whether tuple is in snapshot. See also comments in
+ * CSNSnapshotPrecommit().
+ */
+ if (CSNIsInDoubt(csn))
+ {
+ XactLockTableWait(SubTransGetTopmostTransaction(xid), NULL, NULL, XLTW_None);
+ csn = CSNLogGetCSNByXid(xid);
+ Assert(CSNIsNormal(csn) || CSNIsAborted(csn));
+ }
+
+ Assert(CSNIsNormal(csn) || CSNIsInProgress(csn) || CSNIsAborted(csn));
+ return csn;
+}
+
+/*
+ * XidInCSNSnapshot
+ *
+ * Version of XidInMVCCSnapshot for transactions. For non-imported
+ * csn snapshots this should give same results as XidInLocalMVCCSnapshot
+ * (except that aborts will be shown as invisible without going to clog) and to
+ * ensure such behaviour XidInMVCCSnapshot is coated with asserts that checks
+ * identicalness of XidInCSNSnapshot/XidInLocalMVCCSnapshot in
+ * case of ordinary snapshot.
+ */
+bool
+XidInCSNSnapshot(TransactionId xid, Snapshot snapshot)
+{
+ CSN csn;
+
+ csn = TransactionIdGetCSN(xid);
+
+ if (CSNIsNormal(csn))
+ return (csn >= snapshot->snapshot_csn);
+ else if (CSNIsFrozen(csn))
+ {
+ /* It is bootstrap or frozen transaction */
+ return false;
+ }
+ else if(CSNIsUnclear(csn))
+ {
+ /*
+ * Some xid can not figure out csn because of snapshot switch,
+ * and we can follow xid-base result.
+ */
+ return true;
+ }
+ else
+ {
+ /* It is aborted or in-progress */
+ Assert(CSNIsAborted(csn) || CSNIsInProgress(csn));
+ if (CSNIsAborted(csn))
+ Assert(TransactionIdDidAbort(xid));
+ return true;
+ }
+}
+
+
+/*****************************************************************************
+ * Functions to handle transactions commit.
+ *
+ * For local transactions CSNSnapshotPrecommit sets InDoubt state before
+ * ProcArrayEndTransaction is called and transaction data potetntially becomes
+ * visible to other backends. ProcArrayEndTransaction (or ProcArrayRemove in
+ * twophase case) then acquires csn under ProcArray lock and stores it
+ * in proc->assignedCSN. It's important that csn for commit is
+ * generated under ProcArray lock, otherwise snapshots won't
+ * be equivalent. Consequent call to CSNSnapshotCommit will write
+ * proc->assignedCSN to CSNLog.
+ *
+ *
+ * CSNSnapshotAbort is slightly different comparing to commit because abort
+ * can skip InDoubt phase and can be called for transaction subtree.
+ *****************************************************************************/
+
+
+/*
+ * CSNSnapshotAbort
+ *
+ * Abort transaction in CsnLog. We can skip InDoubt state for aborts
+ * since no concurrent transactions allowed to see aborted data anyway.
+ */
+void
+CSNSnapshotAbort(PGPROC *proc, TransactionId xid,
+ int nsubxids, TransactionId *subxids)
+{
+ if (!get_csnlog_status())
+ return;
+
+ CSNLogSetCSN(xid, nsubxids, subxids, AbortedCSN, true);
+
+ /*
+ * Clean assignedCSN anyway, as it was possibly set in
+ * XidSnapshotAssignCsnCurrent.
+ */
+ pg_atomic_write_u64(&proc->assignedCSN, InProgressCSN);
+}
+
+/*
+ * CSNSnapshotPrecommit
+ *
+ * Set InDoubt status for local transaction that we are going to commit.
+ * This step is needed to achieve consistency between local snapshots and
+ * csn-based snapshots. We don't hold ProcArray lock while writing
+ * csn for transaction in SLRU but instead we set InDoubt status before
+ * transaction is deleted from ProcArray so the readers who will read csn
+ * in the gap between ProcArray removal and CSN assignment can wait
+ * until CSN is finally assigned. See also TransactionIdGetCSN().
+ *
+ * This should be called only from parallel group leader before backend is
+ * deleted from ProcArray.
+ */
+void
+CSNSnapshotPrecommit(PGPROC *proc, TransactionId xid,
+ int nsubxids, TransactionId *subxids)
+{
+ CSN oldassignedCSN = InProgressCSN;
+ bool in_progress;
+
+ if (!get_csnlog_status())
+ return;
+
+ /* Set InDoubt status if it is local transaction */
+ in_progress = pg_atomic_compare_exchange_u64(&proc->assignedCSN,
+ &oldassignedCSN,
+ InDoubtCSN);
+ if (in_progress)
+ {
+ Assert(CSNIsInProgress(oldassignedCSN));
+ CSNLogSetCSN(xid, nsubxids, subxids, InDoubtCSN, true);
+ }
+ else
+ {
+ /* Otherwise we should have valid CSN by this time */
+ Assert(CSNIsNormal(oldassignedCSN));
+ Assert(CSNIsInDoubt(CSNLogGetCSNByXid(xid)));
+ }
+}
+
+/*
+ * CSNSnapshotCommit
+ *
+ * Write CSN that were acquired earlier to CsnLog. Should be
+ * preceded by CSNSnapshotPrecommit() so readers can wait until we finally
+ * finished writing to SLRU.
+ *
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks, so that TransactionIdGetCSN can wait on this
+ * lock for CSN.
+ */
+void
+CSNSnapshotCommit(PGPROC *proc, TransactionId xid,
+ int nsubxids, TransactionId *subxids)
+{
+ volatile CSN assignedCSN;
+
+ if (!get_csnlog_status())
+ return;
+
+ if (!TransactionIdIsValid(xid))
+ {
+ assignedCSN = pg_atomic_read_u64(&proc->assignedCSN);
+ Assert(CSNIsInProgress(assignedCSN));
+ return;
+ }
+
+ /* Finally write resulting CSN in SLRU */
+ assignedCSN = pg_atomic_read_u64(&proc->assignedCSN);
+ Assert(CSNIsNormal(assignedCSN));
+ CSNLogSetCSN(xid, nsubxids, subxids, assignedCSN, true);
+
+ /* Reset for next transaction */
+ pg_atomic_write_u64(&proc->assignedCSN, InProgressCSN);
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 58091f6b52..b86c172e46 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -10,6 +10,7 @@
#include "access/brin_xlog.h"
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/csn_log.h"
#include "access/generic_xlog.h"
#include "access/ginxlog.h"
#include "access/gistxlog.h"
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 28b153abc3..7bc6aae9a4 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -77,6 +77,7 @@
#include <unistd.h>
#include "access/commit_ts.h"
+#include "access/csn_snapshot.h"
#include "access/htup_details.h"
#include "access/subtrans.h"
#include "access/transam.h"
@@ -1536,8 +1537,34 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
hdr->nabortrels, abortrels,
gid);
+ /*
+ * CSNSnapshot callbacks that should be called right before we are
+ * going to become visible. Details in comments to this functions.
+ */
+ if (isCommit)
+ CSNSnapshotPrecommit(proc, xid, hdr->nsubxacts, children);
+ else
+ CSNSnapshotAbort(proc, xid, hdr->nsubxacts, children);
+
+
ProcArrayRemove(proc, latestXid);
+ /*
+ * Stamp our transaction with CSN in CSNLog.
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks, since TransactionIdGetCSN relies on
+ * XactLockTableWait to await csn.
+ */
+ if (isCommit)
+ {
+ CSNSnapshotCommit(proc, xid, hdr->nsubxacts, children);
+ }
+ else
+ {
+ Assert(CSNIsInProgress(
+ pg_atomic_read_u64(&proc->assignedCSN)));
+ }
+
/*
* In case we fail while running the callbacks, mark the gxact invalid so
* no one else will try to commit/rollback, and so it will be recycled if
@@ -2583,3 +2610,130 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
LWLockRelease(TwoPhaseStateLock);
return found;
}
+
+/*
+ * CSNSnapshotPrepareTwophase
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ */
+static SnapshotCSN
+CSNSnapshotPrepareTwophase(const char *gid)
+{
+ GlobalTransaction gxact;
+ PGPROC *proc;
+ char *buf;
+ TransactionId xid;
+ xl_xact_parsed_prepare parsed;
+
+ if (!enable_csn_snapshot)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not prepare transaction for global commit"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "enable_csn_snapshot")));
+
+ /*
+ * Validate the GID, and lock the GXACT to ensure that two backends do not
+ * try to access the same GID at once.
+ */
+ gxact = LockGXact(gid, GetUserId());
+ proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ xid = proc->xid;
+
+ if (gxact->ondisk)
+ buf = ReadTwoPhaseFile(xid, true);
+ else
+ XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+
+ ParsePrepareRecord(0, (xl_xact_prepare *)buf, &parsed);
+
+ CSNLogSetCSN(xid, parsed.nsubxacts,
+ parsed.subxacts, InDoubtCSN, true);
+
+ /* Unlock our GXACT */
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ gxact->locking_backend = InvalidBackendId;
+ LWLockRelease(TwoPhaseStateLock);
+
+ pfree(buf);
+ return GenerateCSN(false, InvalidCSN);
+}
+
+/*
+ * CSNSnapshotAssignTwoPhase
+ *
+ * Asign SnapshotCSN for currently active transaction. SnapshotCSN is supposedly
+ * maximal among of values returned by CSNSnapshotPrepareCurrent and
+ * pg_csn_snapshot_prepare.
+ *
+ * This function is a counterpart of CSNSnapshotAssignCurrent() for
+ * twophase transactions.
+ */
+static void
+CSNSnapshotAssignTwoPhase(const char *gid, SnapshotCSN csn)
+{
+ GlobalTransaction gxact;
+ PGPROC *proc;
+
+ if (!enable_csn_snapshot)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not prepare transaction for global commit"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "enable_csn_snapshot")));
+
+ if (!CSNIsNormal(csn))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("pg_csn_snapshot_assign expects normal snapshot_csn")));
+
+ /*
+ * Validate the GID, and lock the GXACT to ensure that two backends do not
+ * try to access the same GID at once.
+ */
+ gxact = LockGXact(gid, GetUserId());
+ proc = &ProcGlobal->allProcs[gxact->pgprocno];
+
+ Assert(csn != InvalidCSN);
+ /* We do not care the Generate result, we just want to make sure max
+ * csnShared->last_max_csn value.
+ */
+ GenerateCSN(false, csn);
+ /* Set snapshot_csn and defuse ProcArrayRemove from assigning one. */
+ pg_atomic_write_u64(&proc->assignedCSN, csn);
+
+ /* Unlock our GXACT */
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ gxact->locking_backend = InvalidBackendId;
+ LWLockRelease(TwoPhaseStateLock);
+}
+
+/*
+ * SQL interface to CSNSnapshotPrepareTwophase()
+ *
+ * TODO: Rewrite this as PREPARE TRANSACTION 'gid' RETURNING SNAPSHOT
+ */
+Datum
+pg_csn_snapshot_prepare(PG_FUNCTION_ARGS)
+{
+ const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ SnapshotCSN csn = CSNSnapshotPrepareTwophase(gid);
+
+ PG_RETURN_INT64(csn);
+}
+
+/*
+ * SQL interface to CSNSnapshotAssignTwoPhase()
+ *
+ * TODO: Rewrite this as COMMIT PREPARED 'gid' SNAPSHOT 'csn'
+ */
+Datum
+pg_csn_snapshot_assign(PG_FUNCTION_ARGS)
+{
+ const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ SnapshotCSN csn = PG_GETARG_INT64(1);
+
+ CSNSnapshotAssignTwoPhase(gid, csn);
+ PG_RETURN_VOID();
+}
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index a6e98e71bd..8e1d074806 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -15,6 +15,7 @@
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/csn_log.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
@@ -175,6 +176,7 @@ GetNewTransactionId(bool isSubXact)
* Extend pg_subtrans and pg_commit_ts too.
*/
ExtendCLOG(xid);
+ ExtendCSNLog(xid);
ExtendCommitTs(xid);
ExtendSUBTRANS(xid);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8e35c432f5..e6baf880d9 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -21,6 +21,7 @@
#include <unistd.h>
#include "access/commit_ts.h"
+#include "access/csn_snapshot.h"
#include "access/multixact.h"
#include "access/parallel.h"
#include "access/subtrans.h"
@@ -1418,6 +1419,12 @@ RecordTransactionCommit(void)
TransactionTreeSetCommitTsData(xid, nchildren, children,
replorigin_session_origin_timestamp,
replorigin_session_origin);
+
+ /*
+ * Mark our transaction as InDoubt in CsnLog and get ready for
+ * commit.
+ */
+ CSNSnapshotPrecommit(MyProc, xid, nchildren, children);
}
/*
@@ -1772,6 +1779,9 @@ RecordTransactionAbort(bool isSubXact)
*/
TransactionIdAbortTree(xid, nchildren, children);
+ /* Mark our transaction as Aborted in CSN Log. */
+ CSNSnapshotAbort(MyProc, xid, nchildren, children);
+
END_CRIT_SECTION();
/* Compute latestXid while we have the child XIDs handy */
@@ -2114,6 +2124,13 @@ StartTransaction(void)
ShowTransactionState("StartTransaction");
}
+Datum
+pg_current_csn(PG_FUNCTION_ARGS)
+{
+ SnapshotCSN csn = GenerateCSN(false, InvalidCSN);
+
+ PG_RETURN_INT64(csn);
+}
/*
* CommitTransaction
@@ -2262,6 +2279,21 @@ CommitTransaction(void)
*/
ProcArrayEndTransaction(MyProc, latestXid);
+ /*
+ * Stamp our transaction with CSN in CsnLog.
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks.
+ */
+ if (!is_parallel_worker)
+ {
+ TransactionId xid = GetTopTransactionIdIfAny();
+ TransactionId *subxids;
+ int nsubxids;
+
+ nsubxids = xactGetCommittedChildren(&subxids);
+ CSNSnapshotCommit(MyProc, xid, nsubxids, subxids);
+ }
+
/*
* This is all post-commit cleanup. Note that if an error is raised here,
* it's too late to abort the transaction. This should be just
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1616448368..2a8de10038 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -24,6 +24,7 @@
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/csn_log.h"
#include "access/heaptoast.h"
#include "access/multixact.h"
#include "access/rewriteheap.h"
@@ -4747,6 +4748,7 @@ InitControlFile(uint64 sysidentifier)
ControlFile->wal_level = wal_level;
ControlFile->wal_log_hints = wal_log_hints;
ControlFile->track_commit_timestamp = track_commit_timestamp;
+ ControlFile->enable_csn_snapshot = enable_csn_snapshot;
ControlFile->data_checksum_version = bootstrap_data_checksum_version;
}
@@ -7181,6 +7183,9 @@ StartupXLOG(void)
if (ControlFile->track_commit_timestamp)
StartupCommitTs();
+ if(ControlFile->enable_csn_snapshot)
+ StartupCSN();
+
/*
* Recover knowledge about replay progress of known replication partners.
*/
@@ -7448,6 +7453,8 @@ StartupXLOG(void)
*/
StartupSUBTRANS(oldestActiveXID);
+ CSNSnapshotStartup(oldestActiveXID);
+
/*
* If we're beginning at a shutdown checkpoint, we know that
* nothing was running on the primary at this point. So fake-up an
@@ -8117,7 +8124,10 @@ StartupXLOG(void)
* timestamps are started below, if necessary.)
*/
if (standbyState == STANDBY_DISABLED)
+ {
StartupSUBTRANS(oldestActiveXID);
+ CSNSnapshotStartup(oldestActiveXID);
+ }
/*
* Perform end of recovery actions for any SLRUs that need it.
@@ -8183,6 +8193,7 @@ StartupXLOG(void)
* commit timestamp.
*/
CompleteCommitTsInitialization();
+ CompleteCSNInitialization();
/*
* All done with end-of-recovery actions.
@@ -9616,6 +9627,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
CheckPointCLOG();
+ CheckPointCSNLog();
CheckPointCommitTs();
CheckPointSUBTRANS();
CheckPointMultiXact();
@@ -9894,7 +9906,10 @@ CreateRestartPoint(int flags)
* this because StartupSUBTRANS hasn't been called yet.
*/
if (EnableHotStandby)
+ {
TruncateSUBTRANS(GetOldestTransactionIdConsideredRunning());
+ TruncateCSNLog(GetOldestTransactionIdConsideredRunning());
+ }
/* Real work is done; log and update stats. */
LogCheckpointEnd(true);
@@ -10172,7 +10187,8 @@ XLogReportParameters(void)
max_wal_senders != ControlFile->max_wal_senders ||
max_prepared_xacts != ControlFile->max_prepared_xacts ||
max_locks_per_xact != ControlFile->max_locks_per_xact ||
- track_commit_timestamp != ControlFile->track_commit_timestamp)
+ track_commit_timestamp != ControlFile->track_commit_timestamp ||
+ enable_csn_snapshot != ControlFile->enable_csn_snapshot)
{
/*
* The change in number of backend slots doesn't need to be WAL-logged
@@ -10194,6 +10210,7 @@ XLogReportParameters(void)
xlrec.wal_level = wal_level;
xlrec.wal_log_hints = wal_log_hints;
xlrec.track_commit_timestamp = track_commit_timestamp;
+ xlrec.enable_csn_snapshot = enable_csn_snapshot;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, sizeof(xlrec));
@@ -10212,6 +10229,7 @@ XLogReportParameters(void)
ControlFile->wal_level = wal_level;
ControlFile->wal_log_hints = wal_log_hints;
ControlFile->track_commit_timestamp = track_commit_timestamp;
+ ControlFile->enable_csn_snapshot = enable_csn_snapshot;
UpdateControlFile();
LWLockRelease(ControlFileLock);
@@ -10665,6 +10683,9 @@ xlog_redo(XLogReaderState *record)
CommitTsParameterChange(xlrec.track_commit_timestamp,
ControlFile->track_commit_timestamp);
ControlFile->track_commit_timestamp = xlrec.track_commit_timestamp;
+ CSNlogParameterChange(xlrec.enable_csn_snapshot,
+ ControlFile->enable_csn_snapshot);
+ ControlFile->enable_csn_snapshot = xlrec.enable_csn_snapshot;
UpdateControlFile();
LWLockRelease(ControlFileLock);
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 689384a411..e6585a94ba 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -73,6 +73,8 @@ typedef struct
char compressed_page[COMPRESS_BUFSIZE];
} registered_buffer;
+bool enable_csn_wal = true;
+
static registered_buffer *registered_buffers;
static int max_registered_buffers; /* allocated size */
static int max_registered_block_id = 0; /* highest block_id + 1 currently
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 5c4bc15b44..e64ada86c7 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -53,7 +53,7 @@
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
-
+#include "access/csn_log.h"
/*
* GUC parameters
@@ -1760,6 +1760,7 @@ vac_truncate_clog(TransactionId frozenXID,
*/
TruncateCLOG(frozenXID, oldestxid_datoid);
TruncateCommitTs(frozenXID);
+ TruncateCSNLog(frozenXID);
TruncateMultiXact(minMulti, minmulti_datoid);
/*
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 9fa3e0631e..2a7e184da9 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -16,6 +16,8 @@
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/csn_log.h"
+#include "access/csn_snapshot.h"
#include "access/heapam.h"
#include "access/multixact.h"
#include "access/nbtree.h"
@@ -120,6 +122,8 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, ProcGlobalShmemSize());
size = add_size(size, XLOGShmemSize());
size = add_size(size, CLOGShmemSize());
+ size = add_size(size, CSNLogShmemSize());
+ size = add_size(size, CSNSnapshotShmemSize());
size = add_size(size, CommitTsShmemSize());
size = add_size(size, SUBTRANSShmemSize());
size = add_size(size, TwoPhaseShmemSize());
@@ -242,6 +246,8 @@ CreateSharedMemoryAndSemaphores(void)
*/
XLOGShmemInit();
CLOGShmemInit();
+ CSNLogShmemInit();
+ CSNSnapshotShmemInit();
CommitTsShmemInit();
SUBTRANSShmemInit();
MultiXactShmemInit();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 892f0f6799..5bc7370c73 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -48,6 +48,7 @@
#include <signal.h>
#include "access/clog.h"
+#include "access/csn_snapshot.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/twophase.h"
@@ -96,6 +97,8 @@ typedef struct ProcArrayStruct
TransactionId replication_slot_xmin;
/* oldest catalog xmin of any replication slot */
TransactionId replication_slot_catalog_xmin;
+ /* xmin of oldest active csn snapshot */
+ TransactionId csn_snapshot_xmin;
/* indexes into allProcs[], has PROCARRAY_MAXPROCS entries */
int pgprocnos[FLEXIBLE_ARRAY_MEMBER];
@@ -429,6 +432,7 @@ CreateSharedProcArray(void)
procArray->lastOverflowedXid = InvalidTransactionId;
procArray->replication_slot_xmin = InvalidTransactionId;
procArray->replication_slot_catalog_xmin = InvalidTransactionId;
+ procArray->csn_snapshot_xmin = InvalidTransactionId;
ShmemVariableCache->xactCompletionCount = 1;
}
@@ -577,6 +581,14 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
/* Advance global latestCompletedXid while holding the lock */
MaintainLatestCompletedXid(latestXid);
+ /*
+ * Assign xid csn while holding ProcArrayLock for non-distributed
+ * COMMIT PREPARED. After lock is released consequent
+ * CSNSnapshotCommit() will write this value to CsnLog.
+ */
+ if (CSNIsInDoubt(pg_atomic_read_u64(&proc->assignedCSN)))
+ pg_atomic_write_u64(&proc->assignedCSN, GenerateCSN(false, InvalidCSN));
+
/* Same with xactCompletionCount */
ShmemVariableCache->xactCompletionCount++;
@@ -691,6 +703,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
proc->xmin = InvalidTransactionId;
proc->delayChkpt = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false;
+ proc->originalXmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */
/* avoid unnecessarily dirtying shared cachelines */
@@ -730,6 +743,7 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
proc->xmin = InvalidTransactionId;
proc->delayChkpt = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false;
+ proc->originalXmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */
/* avoid unnecessarily dirtying shared cachelines */
@@ -753,6 +767,16 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
/* Also advance global latestCompletedXid while holding the lock */
MaintainLatestCompletedXid(latestXid);
+ /*
+ * Assign xid csn while holding ProcArrayLock for
+ * COMMIT.
+ *
+ * TODO: in case of group commit we can generate one CSNSnapshot for
+ * whole group to save time on timestamp aquisition.
+ */
+ if (CSNIsInDoubt(pg_atomic_read_u64(&proc->assignedCSN)))
+ pg_atomic_write_u64(&proc->assignedCSN, GenerateCSN(false, InvalidCSN));
+
/* Same with xactCompletionCount */
ShmemVariableCache->xactCompletionCount++;
}
@@ -912,6 +936,7 @@ ProcArrayClearTransaction(PGPROC *proc)
proc->lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId;
proc->recoveryConflictPending = false;
+ proc->originalXmin = InvalidTransactionId;
Assert(!(proc->statusFlags & PROC_VACUUM_STATE_MASK));
Assert(!proc->delayChkpt);
@@ -1204,6 +1229,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
while (TransactionIdPrecedes(latestObservedXid, running->nextXid))
{
ExtendSUBTRANS(latestObservedXid);
+ ExtendCSNLog(latestObservedXid);
TransactionIdAdvance(latestObservedXid);
}
TransactionIdRetreat(latestObservedXid); /* = running->nextXid - 1 */
@@ -1704,6 +1730,7 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
TransactionId kaxmin;
bool in_recovery = RecoveryInProgress();
TransactionId *other_xids = ProcGlobal->xids;
+ TransactionId csn_snapshot_xmin = InvalidTransactionId;
LWLockAcquire(ProcArrayLock, LW_SHARED);
@@ -1843,6 +1870,10 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
if (in_recovery)
kaxmin = KnownAssignedXidsGetOldestXmin();
+ /* Get value of xmin, delayed by a CSN snapshot settings. */
+ if (get_csnlog_status() && csn_snapshot_defer_time > 0 && IsUnderPostmaster)
+ csn_snapshot_xmin = ProcArrayGetCSNSnapshotXmin();
+
/*
* No other information from shared state is needed, release the lock
* immediately. The rest of the computations can be done without a lock.
@@ -1899,6 +1930,15 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
h->data_oldest_nonremovable =
TransactionIdOlder(h->data_oldest_nonremovable, h->slot_xmin);
+ /*
+ * Hold non-removable border because distributed transactions
+ * can wish to see old data.
+ */
+ h->shared_oldest_nonremovable =
+ TransactionIdOlder(h->shared_oldest_nonremovable, csn_snapshot_xmin);
+ h->data_oldest_nonremovable =
+ TransactionIdOlder(h->data_oldest_nonremovable, csn_snapshot_xmin);
+
/*
* The only difference between catalog / data horizons is that the slot's
* catalog xmin is applied to the catalog one (so catalogs can be accessed
@@ -2133,6 +2173,9 @@ GetSnapshotDataReuse(Snapshot snapshot)
if (curXactCompletionCount != snapshot->snapXactCompletionCount)
return false;
+ if (get_csnlog_status())
+ return false;
+
/*
* If the current xactCompletionCount is still the same as it was at the
* time the snapshot was built, we can be sure that rebuilding the
@@ -2212,6 +2255,8 @@ GetSnapshotData(Snapshot snapshot)
int count = 0;
int subcount = 0;
bool suboverflowed = false;
+ CSN csn = FrozenCSN;
+ TransactionId csn_snapshot_xmin = InvalidTransactionId;
FullTransactionId latest_completed;
TransactionId oldestxid;
int mypgxactoff;
@@ -2444,6 +2489,20 @@ GetSnapshotData(Snapshot snapshot)
if (!TransactionIdIsValid(MyProc->xmin))
MyProc->xmin = TransactionXmin = xmin;
+ /* Take CSN under ProcArrayLock so the snapshot stays synchronized. */
+ if (!snapshot->takenDuringRecovery && get_csnlog_status())
+ csn = GenerateCSN(false, InvalidCSN);
+
+ if (get_csnlog_status() && csn_snapshot_defer_time > 0 && IsUnderPostmaster)
+ {
+ CSNSnapshotMapXmin(snapshot->snapshot_csn);
+
+ /* Get value of xmin, delayed by a CSN snapshot settings. */
+ csn_snapshot_xmin = ProcArrayGetCSNSnapshotXmin();
+ /* Adjust an oldest xid value with a xmin, delayed by CSN options. */
+ oldestxid = TransactionIdOlder(oldestxid, csn_snapshot_xmin);
+ }
+
LWLockRelease(ProcArrayLock);
/* maintain state for GlobalVis* */
@@ -2469,6 +2528,10 @@ GetSnapshotData(Snapshot snapshot)
def_vis_xid_data =
TransactionIdOlder(def_vis_xid_data, replication_slot_xmin);
+ /* The csn-related settings can require an older xmin. */
+ def_vis_xid_data =
+ TransactionIdOlder(def_vis_xid_data, csn_snapshot_xmin);
+
/*
* Rows in non-shared, non-catalog tables possibly could be vacuumed
* if older than this xid.
@@ -2549,6 +2612,8 @@ GetSnapshotData(Snapshot snapshot)
snapshot->active_count = 0;
snapshot->regd_count = 0;
snapshot->copied = false;
+ snapshot->imported_csn = false;
+ snapshot->snapshot_csn = csn;
GetSnapshotDataInitOldSnapshot(snapshot);
@@ -3901,6 +3966,25 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
LWLockRelease(ProcArrayLock);
}
+/*
+ * ProcArraySetCSNSnapshotXmin
+ */
+void
+ProcArraySetCSNSnapshotXmin(TransactionId xmin)
+{
+ /* We rely on atomic fetch/store of xid */
+ procArray->csn_snapshot_xmin = xmin;
+}
+
+/*
+ * ProcArrayGetCSNSnapshotXmin
+ */
+TransactionId
+ProcArrayGetCSNSnapshotXmin(void)
+{
+ return procArray->csn_snapshot_xmin;
+}
+
/*
* XidCacheRemoveRunningXids
*
@@ -4383,6 +4467,7 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
while (TransactionIdPrecedes(next_expected_xid, xid))
{
TransactionIdAdvance(next_expected_xid);
+ ExtendCSNLog(next_expected_xid);
ExtendSUBTRANS(next_expected_xid);
}
Assert(next_expected_xid == xid);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 862097352b..1f78161d9a 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -135,6 +135,8 @@ static const char *const BuiltinTrancheNames[] = {
"CommitTSBuffer",
/* LWTRANCHE_SUBTRANS_BUFFER: */
"SubtransBuffer",
+ /* LWTRANCHE_CSN_LOG_BUFFERS */
+ "CSNLogBuffer",
/* LWTRANCHE_MULTIXACTOFFSET_BUFFER: */
"MultiXactOffsetBuffer",
/* LWTRANCHE_MULTIXACTMEMBER_BUFFER: */
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index 6c7cf6c295..e8ca393611 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -53,3 +53,5 @@ XactTruncationLock 44
# 45 was XactTruncationLock until removal of BackendRandomLock
WrapLimitsVacuumLock 46
NotifyQueueTailLock 47
+CSNLogSLRULock 48
+CSNSnapshotXidMapLock 49
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index b7d9da0aa9..88f4f42456 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -35,9 +35,11 @@
#include <unistd.h>
#include <sys/time.h>
+#include "access/csn_snapshot.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xlogutils.h"
+#include "access/xact.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -440,6 +442,9 @@ InitProcess(void)
MyProc->clogGroupMemberLsn = InvalidXLogRecPtr;
Assert(pg_atomic_read_u32(&MyProc->clogGroupNext) == INVALID_PGPROCNO);
+ MyProc->originalXmin = InvalidTransactionId;
+ pg_atomic_init_u64(&MyProc->assignedCSN, InProgressCSN);
+
/*
* Acquire ownership of the PGPROC's latch, so that we can use WaitLatch
* on it. That allows us to repoint the process latch, which so far
@@ -585,6 +590,7 @@ InitAuxiliaryProcess(void)
MyProc->lwWaitMode = 0;
MyProc->waitLock = NULL;
MyProc->waitProcLock = NULL;
+ MyProc->originalXmin = InvalidTransactionId;
pg_atomic_write_u64(&MyProc->waitStart, 0);
#ifdef USE_ASSERT_CHECKING
{
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index d4083e8a56..383f1e4566 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -20,6 +20,7 @@
#include "access/commit_ts.h"
#include "access/clog.h"
+#include "access/csn_log.h"
#include "access/multixact.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
@@ -119,6 +120,10 @@ static const SyncOps syncsw[] = {
/* pg_multixact/members */
[SYNC_HANDLER_MULTIXACT_MEMBER] = {
.sync_syncfiletag = multixactmemberssyncfiletag
+ },
+ /* pg_multixact/members */
+ [SYNC_HANDLER_CSN] = {
+ .sync_syncfiletag = csnsyncfiletag
}
};
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e91d5a3cfd..4d9833fb5f 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -34,6 +34,7 @@
#include "access/commit_ts.h"
#include "access/gin.h"
+#include "access/csn_snapshot.h"
#include "access/rmgr.h"
#include "access/tableam.h"
#include "access/toast_compression.h"
@@ -1212,6 +1213,24 @@ static struct config_bool ConfigureNamesBool[] =
false,
NULL, NULL, NULL
},
+ {
+ {"enable_csn_snapshot", PGC_POSTMASTER, RESOURCES_MEM,
+ gettext_noop("Enable csn-base snapshot."),
+ gettext_noop("Used to achieve REPEATABLE READ isolation level for postgres_fdw transactions.")
+ },
+ &enable_csn_snapshot,
+ true,
+ NULL, NULL, NULL
+ },
+ {
+ {"enable_csn_wal", PGC_POSTMASTER, RESOURCES_MEM,
+ gettext_noop("Enable csn-wal record."),
+ gettext_noop("Used to enable csn-wal record")
+ },
+ &enable_csn_wal,
+ true,
+ NULL, NULL, NULL
+ },
{
{"ssl", PGC_SIGHUP, CONN_AUTH_SSL,
gettext_noop("Enables SSL connections."),
@@ -3195,6 +3214,24 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"csn_snapshot_defer_time", PGC_POSTMASTER, REPLICATION_PRIMARY,
+ gettext_noop("Minimal age of records which allowed to be vacuumed, in seconds."),
+ NULL
+ },
+ &csn_snapshot_defer_time,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+ {
+ {"csn_time_shift", PGC_USERSET, RESOURCES_MEM,
+ gettext_noop("Do the time shift in the CSN generator."),
+ gettext_noop("Used for debug purposes.")
+ },
+ &csn_time_shift,
+ 0, INT_MIN, INT_MAX,
+ NULL, NULL, NULL
+ },
{
{"block_size", PGC_INTERNAL, PRESET_OPTIONS,
gettext_noop("Shows the size of a disk block."),
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index b0c50a3c7f..3fcd0f4ccf 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -77,6 +77,8 @@ provider postgresql {
probe clog__checkpoint__done(bool);
probe subtrans__checkpoint__start(bool);
probe subtrans__checkpoint__done(bool);
+ probe csnlog__checkpoint__start(bool);
+ probe csnlog__checkpoint__done(bool);
probe multixact__checkpoint__start(bool);
probe multixact__checkpoint__done(bool);
probe twophase__checkpoint__start();
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 5001efdf7a..eaf2082e41 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -48,6 +48,7 @@
#include <sys/stat.h>
#include <unistd.h>
+#include "access/csn_log.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
@@ -77,6 +78,8 @@
*/
int old_snapshot_threshold; /* number of minutes, -1 disables */
+bool enable_csn_snapshot;
+
volatile OldSnapshotControlData *oldSnapshotControl;
@@ -173,6 +176,7 @@ static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
static Snapshot CopySnapshot(Snapshot snapshot);
static void FreeSnapshot(Snapshot snapshot);
static void SnapshotResetXmin(void);
+static bool XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot);
/*
* Snapshot fields to be serialized.
@@ -191,6 +195,8 @@ typedef struct SerializedSnapshotData
CommandId curcid;
TimestampTz whenTaken;
XLogRecPtr lsn;
+ CSN csn;
+ bool imported_csn;
} SerializedSnapshotData;
Size
@@ -2130,6 +2136,8 @@ SerializeSnapshot(Snapshot snapshot, char *start_address)
serialized_snapshot.curcid = snapshot->curcid;
serialized_snapshot.whenTaken = snapshot->whenTaken;
serialized_snapshot.lsn = snapshot->lsn;
+ serialized_snapshot.csn = snapshot->snapshot_csn;
+ serialized_snapshot.imported_csn = snapshot->imported_csn;
/*
* Ignore the SubXID array if it has overflowed, unless the snapshot was
@@ -2204,6 +2212,8 @@ RestoreSnapshot(char *start_address)
snapshot->curcid = serialized_snapshot.curcid;
snapshot->whenTaken = serialized_snapshot.whenTaken;
snapshot->lsn = serialized_snapshot.lsn;
+ snapshot->snapshot_csn = serialized_snapshot.csn;
+ snapshot->imported_csn = serialized_snapshot.imported_csn;
snapshot->snapXactCompletionCount = 0;
/* Copy XIDs, if present. */
@@ -2245,6 +2255,44 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
/*
* XidInMVCCSnapshot
+ *
+ * Check whether this xid is in snapshot. When enable_csn_snapshot is
+ * switched off just call XidInLocalMVCCSnapshot().
+ */
+bool
+XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+{
+ bool in_snapshot;
+
+ if (snapshot->imported_csn)
+ {
+ Assert(enable_csn_snapshot);
+ /* No point to using snapshot info except CSN */
+ return XidInCSNSnapshot(xid, snapshot);
+ }
+
+ in_snapshot = XidInLocalMVCCSnapshot(xid, snapshot);
+
+ if (!get_csnlog_status())
+ {
+ Assert(CSNIsFrozen(snapshot->snapshot_csn));
+ return in_snapshot;
+ }
+
+ if (in_snapshot)
+ {
+ /*
+ * This xid may be already in unknown state and in that case
+ * we must wait and recheck.
+ */
+ return XidInCSNSnapshot(xid, snapshot);
+ }
+ else
+ return false;
+}
+
+/*
+ * XidInLocalMVCCSnapshot
* Is the given XID still-in-progress according to the snapshot?
*
* Note: GetSnapshotData never stores either top xid or subxids of our own
@@ -2253,8 +2301,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
* TransactionIdIsCurrentTransactionId first, except when it's known the
* XID could not be ours anyway.
*/
-bool
-XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+static bool
+XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot)
{
uint32 i;
@@ -2364,3 +2412,100 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
return false;
}
+
+
+/*
+ * ExportCSNSnapshot
+ *
+ * Export snapshot_csn so that caller can expand this transaction to other
+ * nodes.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+SnapshotCSN
+ExportCSNSnapshot()
+{
+ if (!get_csnlog_status())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not export csn snapshot"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "enable_csn_snapshot")));
+
+ elog(DEBUG5, "Export CSN Snapshot: csn = %lu",
+ CurrentSnapshot->snapshot_csn);
+ return CurrentSnapshot->snapshot_csn;
+}
+
+/* SQL accessor to ExportCSNSnapshot() */
+Datum
+pg_csn_snapshot_export(PG_FUNCTION_ARGS)
+{
+ SnapshotCSN csn = ExportCSNSnapshot();
+
+ PG_RETURN_UINT64(csn);
+}
+
+/*
+ * ImportCSNSnapshot
+ *
+ * Import csn and retract this backends xmin to the value that was
+ * actual when we had such csn.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+void
+ImportCSNSnapshot(SnapshotCSN snapshot_csn)
+{
+ volatile TransactionId xmin;
+
+ if (!get_csnlog_status())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not import csn snapshot"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "enable_csn_snapshot")));
+
+ if (csn_snapshot_defer_time <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not import csn snapshot"),
+ errhint("Make sure the configuration parameter \"%s\" is positive.",
+ "csn_snapshot_defer_time")));
+
+ /*
+ * Call CSNSnapshotToXmin under ProcArrayLock to avoid situation that
+ * resulting xmin will be evicted from map before we will set it into our
+ * backend's xmin.
+ */
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+ xmin = CSNSnapshotToXmin(snapshot_csn);
+ if (!TransactionIdIsValid(xmin))
+ {
+ LWLockRelease(ProcArrayLock);
+ elog(ERROR, "CSNSnapshotToXmin: csn snapshot too old");
+ }
+
+ MyProc->originalXmin = MyProc->xmin;
+ MyProc->xmin = TransactionXmin = xmin;
+ LWLockRelease(ProcArrayLock);
+
+ CurrentSnapshot->xmin = xmin; /* defuse SnapshotResetXmin() */
+ CurrentSnapshot->snapshot_csn = snapshot_csn;
+ CurrentSnapshot->imported_csn = true;
+ CSNSnapshotSync(snapshot_csn);
+}
+
+/* SQL accessor to ImportCSNSnapshot() */
+Datum
+pg_csn_snapshot_import(PG_FUNCTION_ARGS)
+{
+ SnapshotCSN csn = PG_GETARG_UINT64(0);
+
+ ImportCSNSnapshot(csn);
+ PG_RETURN_VOID();
+}
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 31839c1a19..1864952bd2 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -227,7 +227,8 @@ static const char *const subdirs[] = {
"pg_xact",
"pg_logical",
"pg_logical/snapshots",
- "pg_logical/mappings"
+ "pg_logical/mappings",
+ "pg_csn"
};
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index f911f98d94..325e6a0e2b 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -300,6 +300,8 @@ main(int argc, char *argv[])
ControlFile->max_locks_per_xact);
printf(_("track_commit_timestamp setting: %s\n"),
ControlFile->track_commit_timestamp ? _("on") : _("off"));
+ printf(_("enable_csn_snapshot setting: %s\n"),
+ ControlFile->enable_csn_snapshot ? _("on") : _("off"));
printf(_("Maximum data alignment: %u\n"),
ControlFile->maxAlign);
/* we don't print floatFormat since can't say much useful about it */
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 3628bd74a7..18cf9197cc 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -548,6 +548,11 @@ copy_xact_xlog_xid(void)
check_ok();
}
+ if(old_cluster.controldata.cat_ver > CSN_BASE_SNAPSHOT_ADD_VER)
+ {
+ copy_subdir_files("pg_csn", "pg_csn");
+ }
+
/* now reset the wal archives in the new cluster */
prep_status("Resetting WAL archives");
exec_prog(UTILITY_LOG_FILE, NULL, true, true,
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index ca0795f68f..54f2984387 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -124,6 +124,8 @@ extern char *output_files[];
*/
#define JSONB_FORMAT_CHANGE_CAT_VER 201409291
+#define CSN_BASE_SNAPSHOT_ADD_VER 202002010
+
/*
* Each relation is represented by a relinfo structure.
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 852d8ca4b1..2d280ce940 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -11,6 +11,7 @@
#include "access/brin_xlog.h"
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/csn_log.h"
#include "access/generic_xlog.h"
#include "access/ginxlog.h"
#include "access/gistxlog.h"
diff --git a/src/include/access/csn_log.h b/src/include/access/csn_log.h
new file mode 100644
index 0000000000..12df028bf4
--- /dev/null
+++ b/src/include/access/csn_log.h
@@ -0,0 +1,98 @@
+/*
+ * csn_log.h
+ *
+ * Commit-Sequence-Number log.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/csn_log.h
+ */
+#ifndef CSNLOG_H
+#define CSNLOG_H
+
+#include "access/xlog.h"
+#include "utils/snapshot.h"
+#include "storage/sync.h"
+
+
+#define InProgressCSN UINT64CONST(0x0)
+#define AbortedCSN UINT64CONST(0x1)
+#define FrozenCSN UINT64CONST(0x2)
+#define InDoubtCSN UINT64CONST(0x3)
+#define UnclearCSN UINT64CONST(0x4)
+#define FirstNormalCSN UINT64CONST(0x5)
+
+#define CSNIsInProgress(csn) ((csn) == InProgressCSN)
+#define CSNIsAborted(csn) ((csn) == AbortedCSN)
+#define CSNIsFrozen(csn) ((csn) == FrozenCSN)
+#define CSNIsInDoubt(csn) ((csn) == InDoubtCSN)
+#define CSNIsUnclear(csn) ((csn) == UnclearCSN)
+#define CSNIsNormal(csn) ((csn) >= FirstNormalCSN)
+
+/* XLOG stuff */
+#define XLOG_CSN_ASSIGNMENT 0x00
+#define XLOG_CSN_SETCSN 0x10
+#define XLOG_CSN_ZEROPAGE 0x20
+#define XLOG_CSN_TRUNCATE 0x30
+
+/*
+ * We should log MAX generated CSN to wal, so that database will not generate
+ * a historical CSN after database restart. This may appear when system time
+ * turned back.
+ *
+ * However we can not log the MAX CSN every time it generated, if so it will
+ * cause too many wal expend, so we log it 5s more in the future.
+ *
+ * As a trade off, when this database restart, there will be 5s bad performance
+ * for time synchronization among sharding nodes.
+ *
+ * It looks like we can redefine this as a configure parameter, and the user
+ * can decide which way they prefer.
+ *
+ */
+#define CSN_ASSIGN_TIME_INTERVAL 5
+
+typedef struct xl_csn_set
+{
+ CSN csn;
+ TransactionId xtop; /* XID's top-level XID */
+ int nsubxacts; /* number of subtransaction XIDs */
+ TransactionId xsub[FLEXIBLE_ARRAY_MEMBER]; /* assigned subxids */
+} xl_csn_set;
+
+#define MinSizeOfCSNSet offsetof(xl_csn_set, xsub)
+#define CSNAddByNanosec(csn,second) (csn + second * 1000000000L)
+
+/* Main functions */
+extern void CSNLogSetCSN(TransactionId xid, int nsubxids,
+ TransactionId *subxids, CSN csn, bool write_xlog);
+extern CSN CSNLogGetCSNByXid(TransactionId xid);
+
+/* Infrastructure functions */
+extern Size CSNLogShmemSize(void);
+extern void CSNLogShmemInit(void);
+extern void ActivateCSNlog(void);
+extern void ExtendCSNLog(TransactionId newestXact);
+extern void DeactivateCSNlog(void);
+
+extern void CheckPointCSNLog(void);
+extern void TruncateCSNLog(TransactionId oldestXact);
+
+extern void csnlog_redo(XLogReaderState *record);
+extern void csnlog_desc(StringInfo buf, XLogReaderState *record);
+extern const char *csnlog_identify(uint8 info);
+extern void WriteAssignCSNXlogRec(CSN csn);
+extern void CatchCSNLog(void);
+extern void StartupCSN(void);
+extern void CompleteCSNInitialization(void);
+extern void CSNlogParameterChange(bool newvalue, bool oldvalue);
+extern bool get_csnlog_status(void);
+extern int csnsyncfiletag(const FileTag *ftag, char *path);
+
+extern CSN GenerateCSN(bool locked, CSN assign);
+extern CSN GetLastGeneratedCSN(void);
+
+extern TransactionId GetOldestXmin(void);
+
+#endif /* CSNLOG_H */
\ No newline at end of file
diff --git a/src/include/access/csn_snapshot.h b/src/include/access/csn_snapshot.h
new file mode 100644
index 0000000000..916603af0c
--- /dev/null
+++ b/src/include/access/csn_snapshot.h
@@ -0,0 +1,54 @@
+/*-------------------------------------------------------------------------
+ *
+ * csn_snapshot.h
+ * Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/csn_snapshot.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CSN_SNAPSHOT_H
+#define CSN_SNAPSHOT_H
+
+#include "access/csn_log.h"
+#include "port/atomics.h"
+#include "storage/lock.h"
+#include "utils/snapshot.h"
+#include "utils/guc.h"
+
+/*
+ * snapshot.h is used in frontend code so atomic variant of SnapshotCSN type
+ * is defined here.
+ */
+typedef pg_atomic_uint64 CSN_atomic;
+
+
+extern int csn_snapshot_defer_time;
+extern int csn_time_shift;
+
+
+extern Size CSNSnapshotShmemSize(void);
+extern void CSNSnapshotShmemInit(void);
+extern void CSNSnapshotStartup(TransactionId oldestActiveXID);
+
+extern void CSNSnapshotMapXmin(SnapshotCSN snapshot_csn);
+extern TransactionId CSNSnapshotToXmin(SnapshotCSN snapshot_csn);
+
+extern bool XidInCSNSnapshot(TransactionId xid, Snapshot snapshot);
+
+extern CSN TransactionIdGetCSN(TransactionId xid);
+
+extern void CSNSnapshotAbort(PGPROC *proc, TransactionId xid, int nsubxids,
+ TransactionId *subxids);
+extern void CSNSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids,
+ TransactionId *subxids);
+extern void CSNSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids,
+ TransactionId *subxids);
+extern void CSNSnapshotAssignCurrent(SnapshotCSN snapshot_csn);
+extern SnapshotCSN CSNSnapshotPrepareCurrent(void);
+extern void CSNSnapshotSync(SnapshotCSN remote_csn);
+
+#endif /* CSN_SNAPSHOT_H */
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index f582cf535f..3cf0775176 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+PG_RMGR(RM_CSNLOG_ID, "CSN", csnlog_redo, csnlog_desc, csnlog_identify, NULL, NULL, NULL)
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index c0da76cab4..2ee489dcad 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -242,6 +242,7 @@ typedef struct xl_parameter_change
int wal_level;
bool wal_log_hints;
bool track_commit_timestamp;
+ bool enable_csn_snapshot;
} xl_parameter_change;
/* logs restore point */
@@ -332,5 +333,6 @@ extern bool ArchiveRecoveryRequested;
extern bool InArchiveRecovery;
extern bool StandbyMode;
extern char *recoveryRestoreCommand;
+extern bool enable_csn_wal;
#endif /* XLOG_INTERNAL_H */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 749bce0cc6..a7da532f3a 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -183,6 +183,7 @@ typedef struct ControlFileData
int max_prepared_xacts;
int max_locks_per_xact;
bool track_commit_timestamp;
+ bool enable_csn_snapshot;
/*
* This data is used to check for hardware-architecture compatibility of
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d068d6532e..d578aceb40 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11689,4 +11689,21 @@
prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
prosrc => 'brin_minmax_multi_summary_send' },
+# csn shnapshot handling
+{ oid => '10001', descr => 'export csn snapshot',
+ proname => 'pg_csn_snapshot_export', provolatile => 'v', proparallel => 'u',
+ prorettype => 'int8', proargtypes => '', prosrc => 'pg_csn_snapshot_export' },
+{ oid => '10002', descr => 'import csn snapshot',
+ proname => 'pg_csn_snapshot_import', provolatile => 'v', proparallel => 'u',
+ prorettype => 'void', proargtypes => 'int8', prosrc => 'pg_csn_snapshot_import' },
+{ oid => '10003', descr => 'prepare distributed transaction for commit, get csn',
+ proname => 'pg_csn_snapshot_prepare', provolatile => 'v', proparallel => 'u',
+ prorettype => 'int8', proargtypes => 'text', prosrc => 'pg_csn_snapshot_prepare' },
+{ oid => '10004', descr => 'assign csn to distributed transaction',
+ proname => 'pg_csn_snapshot_assign', provolatile => 'v', proparallel => 'u',
+ prorettype => 'void', proargtypes => 'text int8', prosrc => 'pg_csn_snapshot_assign' },
+{ oid => '10005', descr => 'get current CSN',
+ proname => 'pg_current_csn', provolatile => 'v', proparallel => 'u',
+ prorettype => 'int8', proargtypes => '', prosrc => 'pg_current_csn' },
+
]
diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h
index 99873497a6..8d1ced7430 100644
--- a/src/include/datatype/timestamp.h
+++ b/src/include/datatype/timestamp.h
@@ -93,6 +93,9 @@ typedef struct
#define USECS_PER_MINUTE INT64CONST(60000000)
#define USECS_PER_SEC INT64CONST(1000000)
+#define NSECS_PER_SEC INT64CONST(1000000000)
+#define NSECS_PER_USEC INT64CONST(1000)
+
/*
* We allow numeric timezone offsets up to 15:59:59 either way from Greenwich.
* Currently, the record holders for wackiest offsets in actual use are zones
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index ab7b85c86e..f08999740b 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -281,6 +281,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum);
#define PG_GETARG_FLOAT4(n) DatumGetFloat4(PG_GETARG_DATUM(n))
#define PG_GETARG_FLOAT8(n) DatumGetFloat8(PG_GETARG_DATUM(n))
#define PG_GETARG_INT64(n) DatumGetInt64(PG_GETARG_DATUM(n))
+#define PG_GETARG_UINT64(n) DatumGetUInt64(PG_GETARG_DATUM(n))
/* use this if you want the raw, possibly-toasted input datum: */
#define PG_GETARG_RAW_VARLENA_P(n) ((struct varlena *) PG_GETARG_POINTER(n))
/* use this if you want the input datum de-toasted: */
diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h
index 39a4f0600e..a78f0d284b 100644
--- a/src/include/portability/instr_time.h
+++ b/src/include/portability/instr_time.h
@@ -141,6 +141,9 @@ typedef struct timespec instr_time;
#define INSTR_TIME_GET_MICROSEC(t) \
(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) ((t).tv_nsec / 1000))
+#define INSTR_TIME_GET_NANOSEC(t) \
+ (((uint64) (t).tv_sec * (uint64) 1000000000) + (uint64) ((t).tv_nsec))
+
#else /* !HAVE_CLOCK_GETTIME */
/* Use gettimeofday() */
@@ -205,6 +208,10 @@ typedef struct timeval instr_time;
#define INSTR_TIME_GET_MICROSEC(t) \
(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) (t).tv_usec)
+#define INSTR_TIME_GET_NANOSEC(t) \
+ (((uint64) (t).tv_sec * (uint64) 1000000000) + \
+ (uint64) (t).tv_usec * (uint64) 1000)
+
#endif /* HAVE_CLOCK_GETTIME */
#else /* WIN32 */
@@ -237,6 +244,9 @@ typedef LARGE_INTEGER instr_time;
#define INSTR_TIME_GET_MICROSEC(t) \
((uint64) (((double) (t).QuadPart * 1000000.0) / GetTimerFrequency()))
+#define INSTR_TIME_GET_NANOSEC(t) \
+ ((uint64) (((double) (t).QuadPart * 1000000000.0) / GetTimerFrequency()))
+
static inline double
GetTimerFrequency(void)
{
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index a8f052e484..65d1e49fb2 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -168,6 +168,7 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_XACT_BUFFER = NUM_INDIVIDUAL_LWLOCKS,
LWTRANCHE_COMMITTS_BUFFER,
LWTRANCHE_SUBTRANS_BUFFER,
+ LWTRANCHE_CSN_LOG_BUFFERS,
LWTRANCHE_MULTIXACTOFFSET_BUFFER,
LWTRANCHE_MULTIXACTMEMBER_BUFFER,
LWTRANCHE_NOTIFY_BUFFER,
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index be67d8a861..ade5d8e169 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -15,12 +15,14 @@
#define _PROC_H_
#include "access/clog.h"
+#include "access/csn_snapshot.h"
#include "access/xlogdefs.h"
#include "lib/ilist.h"
#include "storage/latch.h"
#include "storage/lock.h"
#include "storage/pg_sema.h"
#include "storage/proclist_types.h"
+#include "utils/snapshot.h"
/*
* Each backend advertises up to PGPROC_MAX_CACHED_SUBXIDS TransactionIds
@@ -251,6 +253,18 @@ struct PGPROC
PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */
dlist_head lockGroupMembers; /* list of members, if I'm a leader */
dlist_node lockGroupLink; /* my member link, if I'm a member */
+
+ /*
+ * assignedCSN holds CSN for this transaction. It is generated
+ * under a ProcArray lock and later is written to a CSNLog. This
+ * variable defined as atomic only for case of group commit, in all other
+ * scenarios only backend responsible for this proc entry is working with
+ * this variable.
+ */
+ CSN_atomic assignedCSN;
+
+ /* Original xmin of this backend before csn snapshot was imported */
+ TransactionId originalXmin;
};
/* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index b01fa52139..ba580435f9 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -20,6 +20,10 @@
#include "utils/snapshot.h"
+#define PROCARRAY_NON_IMPORTED_XMIN 0x80 /* use originalXmin instead
+ * of xmin to properly
+ * maintain csnXidMap */
+
extern Size ProcArrayShmemSize(void);
extern void CreateSharedProcArray(void);
extern void ProcArrayAdd(PGPROC *proc);
@@ -94,4 +98,7 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
TransactionId *catalog_xmin);
+extern void ProcArraySetCSNSnapshotXmin(TransactionId xmin);
+
+extern TransactionId ProcArrayGetCSNSnapshotXmin(void);
#endif /* PROCARRAY_H */
diff --git a/src/include/storage/sync.h b/src/include/storage/sync.h
index 6fd50cfa7b..eb1d52673a 100644
--- a/src/include/storage/sync.h
+++ b/src/include/storage/sync.h
@@ -39,6 +39,7 @@ typedef enum SyncRequestHandler
SYNC_HANDLER_COMMIT_TS,
SYNC_HANDLER_MULTIXACT_OFFSET,
SYNC_HANDLER_MULTIXACT_MEMBER,
+ SYNC_HANDLER_CSN,
SYNC_HANDLER_NONE
} SyncRequestHandler;
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index c6a176cc95..122eea20ba 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -41,10 +41,11 @@
&& !RelationIsAccessibleInLogicalDecoding(rel) \
)
-#define EarlyPruningEnabled(rel) (old_snapshot_threshold >= 0 && RelationAllowsEarlyPruning(rel))
+#define EarlyPruningEnabled(rel) (old_snapshot_threshold >= 0 && !enable_csn_snapshot && RelationAllowsEarlyPruning(rel))
/* GUC variables */
extern PGDLLIMPORT int old_snapshot_threshold;
+extern PGDLLIMPORT bool enable_csn_snapshot;
extern Size SnapMgrShmemSize(void);
@@ -100,7 +101,7 @@ extern PGDLLIMPORT SnapshotData CatalogSnapshotData;
static inline bool
OldSnapshotThresholdActive(void)
{
- return old_snapshot_threshold >= 0;
+ return (old_snapshot_threshold >= 0) && (!enable_csn_snapshot);
}
extern Snapshot GetTransactionSnapshot(void);
@@ -130,6 +131,8 @@ extern void AtSubCommit_Snapshot(int level);
extern void AtSubAbort_Snapshot(int level);
extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin);
+extern SnapshotCSN ExportCSNSnapshot(void);
+extern void ImportCSNSnapshot(SnapshotCSN snapshot_csn);
extern void ImportSnapshot(const char *idstr);
extern bool XactHasExportedSnapshots(void);
extern void DeleteAllExportedSnapshotFiles(void);
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 6b60755c53..3580a94c43 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -121,6 +121,9 @@ typedef enum SnapshotType
typedef struct SnapshotData *Snapshot;
#define InvalidSnapshot ((Snapshot) NULL)
+#define InvalidCSN ((CSN) 0)
+typedef uint64 CSN;
+typedef uint64 SnapshotCSN;
/*
* Struct representing all kind of possible snapshots.
@@ -214,6 +217,14 @@ typedef struct SnapshotData
* transactions completed since the last GetSnapshotData().
*/
uint64 snapXactCompletionCount;
+
+ /*
+ * SnapshotCSN for snapshot isolation support.
+ * Will be used only if enable_csn_snapshot is enabled.
+ */
+ SnapshotCSN snapshot_csn;
+ /* Did we have our own snapshot_csn or imported one from different node */
+ bool imported_csn;
} SnapshotData;
#endif /* SNAPSHOT_H */
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index dffc79b2d9..16bb65e7e1 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -7,6 +7,7 @@ include $(top_builddir)/src/Makefile.global
SUBDIRS = \
brin \
commit_ts \
+ csnsnapshot \
delay_execution \
dummy_index_am \
dummy_seclabel \
diff --git a/src/test/modules/csnsnapshot/Makefile b/src/test/modules/csnsnapshot/Makefile
new file mode 100644
index 0000000000..fa4245752b
--- /dev/null
+++ b/src/test/modules/csnsnapshot/Makefile
@@ -0,0 +1,25 @@
+# src/test/modules/csnsnapshot/Makefile
+
+REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/csnsnapshot/csn_snapshot.conf
+NO_INSTALLCHECK = 1
+
+TAP_TESTS = 1
+EXTRA_INSTALL=contrib/postgres_fdw
+
+# Don't support full consistency of distributed commit in READ COMMITTED
+# transactions.
+#PROVE_TESTS = t/001_base.pl \
+# t/002_standby.pl \
+# t/003_time_skew.pl
+PROVE_TESTS = t/005_basic_visibility.pl
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/csnsnapshot
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/csnsnapshot/csn_snapshot.conf b/src/test/modules/csnsnapshot/csn_snapshot.conf
new file mode 100644
index 0000000000..e9d3c35756
--- /dev/null
+++ b/src/test/modules/csnsnapshot/csn_snapshot.conf
@@ -0,0 +1 @@
+track_commit_timestamp = on
diff --git a/src/test/modules/csnsnapshot/expected/csnsnapshot.out b/src/test/modules/csnsnapshot/expected/csnsnapshot.out
new file mode 100644
index 0000000000..ac28e417b6
--- /dev/null
+++ b/src/test/modules/csnsnapshot/expected/csnsnapshot.out
@@ -0,0 +1 @@
+create table t1(i int, j int, k varchar);
diff --git a/src/test/modules/csnsnapshot/t/001_base.pl b/src/test/modules/csnsnapshot/t/001_base.pl
new file mode 100644
index 0000000000..3b5a09000b
--- /dev/null
+++ b/src/test/modules/csnsnapshot/t/001_base.pl
@@ -0,0 +1,103 @@
+# Single-node test: value can be set, and is still present after recovery
+
+use strict;
+use warnings;
+
+use TestLib;
+use Test::More tests => 5;
+use PostgresNode;
+
+my $node = PostgresNode->new('csntest');
+$node->init;
+$node->append_conf('postgresql.conf', qq{
+ enable_csn_snapshot = on
+ csn_snapshot_defer_time = 10
+ max_prepared_transactions = 10
+ });
+$node->start;
+
+my $test_1 = 1;
+
+# Create a table
+$node->safe_psql('postgres', 'create table t1(i int, j int)');
+
+# insert test record
+$node->safe_psql('postgres', 'insert into t1 values(1,1)');
+# export csn snapshot
+my $test_snapshot = $node->safe_psql('postgres', 'select pg_csn_snapshot_export()');
+# insert test record
+$node->safe_psql('postgres', 'insert into t1 values(2,1)');
+
+my $count1 = $node->safe_psql('postgres', "select count(*) from t1");
+is($count1, '2', 'Get right number in normal query');
+my $count2 = $node->safe_psql('postgres', "
+ begin transaction isolation level repeatable read;
+ select pg_csn_snapshot_import($test_snapshot);
+ select count(*) from t1;
+ commit;"
+ );
+
+is($count2, '
+1', 'Get right number in csn import query');
+
+#prepare transaction test
+$node->safe_psql('postgres', "
+ begin;
+ insert into t1 values(3,1);
+ insert into t1 values(3,2);
+ prepare transaction 'pt3';
+ ");
+$node->safe_psql('postgres', "
+ begin;
+ insert into t1 values(4,1);
+ insert into t1 values(4,2);
+ prepare transaction 'pt4';
+ ");
+$node->safe_psql('postgres', "
+ begin;
+ insert into t1 values(5,1);
+ insert into t1 values(5,2);
+ prepare transaction 'pt5';
+ ");
+$node->safe_psql('postgres', "
+ begin;
+ insert into t1 values(6,1);
+ insert into t1 values(6,2);
+ prepare transaction 'pt6';
+ ");
+$node->safe_psql('postgres', "commit prepared 'pt4';");
+
+# restart with enable_csn_snapshot off
+$node->append_conf('postgresql.conf', "enable_csn_snapshot = off");
+$node->restart;
+$node->safe_psql('postgres', "
+ insert into t1 values(7,1);
+ insert into t1 values(7,2);
+ ");
+$node->safe_psql('postgres', "commit prepared 'pt3';");
+$count1 = $node->safe_psql('postgres', "select count(*) from t1");
+is($count1, '8', 'Get right number in normal query');
+
+
+# restart with enable_csn_snapshot on
+$node->append_conf('postgresql.conf', "enable_csn_snapshot = on");
+$node->restart;
+$node->safe_psql('postgres', "
+ insert into t1 values(8,1);
+ insert into t1 values(8,2);
+ ");
+$node->safe_psql('postgres', "commit prepared 'pt5';");
+$count1 = $node->safe_psql('postgres', "select count(*) from t1");
+is($count1, '12', 'Get right number in normal query');
+
+# restart with enable_csn_snapshot off
+$node->append_conf('postgresql.conf', "enable_csn_snapshot = on");
+$node->restart;
+$node->safe_psql('postgres', "
+ insert into t1 values(9,1);
+ insert into t1 values(9,2);
+ ");
+$node->safe_psql('postgres', "commit prepared 'pt6';");
+
+$count1 = $node->safe_psql('postgres', "select count(*) from t1");
+is($count1, '16', 'Get right number in normal query');
diff --git a/src/test/modules/csnsnapshot/t/002_standby.pl b/src/test/modules/csnsnapshot/t/002_standby.pl
new file mode 100644
index 0000000000..2b09712141
--- /dev/null
+++ b/src/test/modules/csnsnapshot/t/002_standby.pl
@@ -0,0 +1,66 @@
+# Test simple scenario involving a standby
+
+use strict;
+use warnings;
+
+use TestLib;
+use Test::More tests => 6;
+use PostgresNode;
+
+my $bkplabel = 'backup';
+my $master = PostgresNode->new('master');
+$master->init(allows_streaming => 1);
+
+$master->append_conf(
+ 'postgresql.conf', qq{
+ enable_csn_snapshot = on
+ max_wal_senders = 5
+ });
+$master->start;
+$master->backup($bkplabel);
+
+my $standby = PostgresNode->new('standby');
+$standby->init_from_backup($master, $bkplabel, has_streaming => 1);
+$standby->start;
+
+$master->safe_psql('postgres', "create table t1(i int, j int)");
+
+my $guc_on_master = $master->safe_psql('postgres', 'show enable_csn_snapshot');
+is($guc_on_master, 'on', "GUC on master");
+
+my $guc_on_standby = $standby->safe_psql('postgres', 'show enable_csn_snapshot');
+is($guc_on_standby, 'on', "GUC on standby");
+
+$master->append_conf('postgresql.conf', 'enable_csn_snapshot = off');
+$master->restart;
+
+$guc_on_master = $master->safe_psql('postgres', 'show enable_csn_snapshot');
+is($guc_on_master, 'off', "GUC off master");
+
+$guc_on_standby = $standby->safe_psql('postgres', 'show enable_csn_snapshot');
+is($guc_on_standby, 'on', "GUC on standby");
+
+# We consume a large number of transaction,for skip page
+for my $i (1 .. 4096) #4096
+{
+ $master->safe_psql('postgres', "insert into t1 values(1,$i)");
+}
+$master->safe_psql('postgres', "select pg_sleep(2)");
+$master->append_conf('postgresql.conf', 'enable_csn_snapshot = on');
+$master->restart;
+
+my $count_standby = $standby->safe_psql('postgres', 'select count(*) from t1');
+is($count_standby, '4096', "Ok for siwtch xid-base > csn-base"); #4096
+
+# We consume a large number of transaction,for skip page
+for my $i (1 .. 4096) #4096
+{
+ $master->safe_psql('postgres', "insert into t1 values(1,$i)");
+}
+$master->safe_psql('postgres', "select pg_sleep(2)");
+
+$master->append_conf('postgresql.conf', 'enable_csn_snapshot = off');
+$master->restart;
+
+$count_standby = $standby->safe_psql('postgres', 'select count(*) from t1');
+is($count_standby, '8192', "Ok for switch csn-base > xid-base"); #8192
\ No newline at end of file
diff --git a/src/test/modules/csnsnapshot/t/003_time_skew.pl b/src/test/modules/csnsnapshot/t/003_time_skew.pl
new file mode 100644
index 0000000000..f2496ea883
--- /dev/null
+++ b/src/test/modules/csnsnapshot/t/003_time_skew.pl
@@ -0,0 +1,214 @@
+use strict;
+use warnings;
+
+use TestLib;
+use Test::More tests => 13;
+use PostgresNode;
+
+my $node1 = PostgresNode->new('csn1');
+$node1->init;
+$node1->append_conf('postgresql.conf', qq{
+ enable_csn_snapshot = on
+ csn_snapshot_defer_time = 10
+ max_prepared_transactions = 10
+ csn_time_shift = 0
+ shared_preload_libraries = 'postgres_fdw'
+ postgres_fdw.use_csn_snapshots = true
+ });
+$node1->start;
+my $node2 = PostgresNode->new('csn2');
+$node2->init;
+$node2->append_conf('postgresql.conf', qq{
+ enable_csn_snapshot = on
+ csn_snapshot_defer_time = 10
+ max_prepared_transactions = 10
+ csn_time_shift = 0
+ shared_preload_libraries = 'postgres_fdw'
+ postgres_fdw.use_csn_snapshots = true
+ });
+$node2->start;
+
+$node1->safe_psql('postgres', "
+ CREATE EXTENSION postgres_fdw;
+ CREATE SERVER remote FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '".$node2->port."');
+ CREATE USER MAPPING FOR PUBLIC SERVER remote;
+ CREATE TABLE summary(value int, ntrans int);
+ INSERT INTO summary (value, ntrans) VALUES (0, 0);
+");
+$node2->safe_psql('postgres', "
+ CREATE EXTENSION postgres_fdw;
+ CREATE SERVER remote FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '".$node1->port."');
+ CREATE USER MAPPING FOR PUBLIC SERVER remote;
+ CREATE FOREIGN TABLE summary(value int, ntrans int) SERVER remote;
+");
+
+$node1->safe_psql('postgres', "
+ CREATE TABLE t (id int, payload int) PARTITION BY HASH(id);
+ CREATE TABLE t_1 PARTITION OF t FOR VALUES WITH (modulus 2, remainder 0);
+ CREATE FOREIGN TABLE t_2 PARTITION OF t FOR VALUES WITH (modulus 2, remainder 1) SERVER remote;
+");
+$node2->safe_psql('postgres', "
+ CREATE TABLE t (id serial, payload int) PARTITION BY HASH(id);
+ CREATE FOREIGN TABLE t_1 PARTITION OF t FOR VALUES WITH (modulus 2, remainder 0) SERVER remote;
+ CREATE TABLE t_2 PARTITION OF t FOR VALUES WITH (modulus 2, remainder 1);
+");
+
+$node1->safe_psql('postgres', "INSERT INTO t(id, payload) (SELECT gs.*, 1 FROM generate_series(1,100) AS gs)");
+$node2->safe_psql('postgres', "INSERT INTO t(id, payload) (SELECT gs.*, 2 FROM generate_series(101,200) AS gs)");
+my $count1 = $node1->safe_psql('postgres', "SELECT SUM(payload) FROM t");
+my $count2 = $node2->safe_psql('postgres', "SELECT SUM(payload) FROM t");
+is( (($count1 == 300) and ($count1 == $count2)), 1, 'Correct insert');
+
+# ##############################################################################
+#
+# Basic test. Check REPEATABLE READ anomaly.
+# ntrans is needed to control that some transactions were committed.
+#
+# ##############################################################################
+
+my $q1 = File::Temp->new();
+append_to_file($q1, q{
+ START TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+ UPDATE summary SET value = value + (SELECT SUM(payload) FROM t);
+ UPDATE summary SET value = value - (SELECT SUM(payload) FROM t);
+ UPDATE summary SET ntrans = ntrans + 1;
+ COMMIT;
+});
+my $q2 = File::Temp->new();
+append_to_file($q2, q{
+ BEGIN;
+ \set pl random(-100, 100)
+ \set id random(1, 200)
+ UPDATE t SET payload = :pl WHERE id = :id;
+ COMMIT;
+});
+
+my $seconds = 5;
+my $pgb_handle1;
+my $pgb_handle2;
+
+$pgb_handle1 = $node1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $q1, 'postgres' );
+$pgb_handle2 = $node2->pgbench_async(-n, -c => 1, -T => $seconds, -f => $q2, 'postgres' );
+$node1->pgbench_await($pgb_handle1);
+$node2->pgbench_await($pgb_handle2);
+
+$count1 = $node1->safe_psql('postgres', "SELECT SUM(value) FROM summary");
+$count2 = $node2->safe_psql('postgres', "SELECT SUM(value) FROM summary");
+my $ntrans = $node2->safe_psql('postgres', "SELECT SUM(ntrans) FROM summary");
+note("$count1, $count2, $ntrans");
+is( ( ($ntrans > 0) and ($count1 == 0) and ($count1 == $count2)), 1, 'Correct update');
+
+# ##############################################################################
+#
+# Test on 'snapshot too old'
+#
+# ##############################################################################
+$node1->safe_psql('postgres', "UPDATE summary SET ntrans = 0;");
+$node2->safe_psql('postgres', "ALTER SYSTEM SET csn_time_shift = -100");
+$node2->restart();
+
+# READ COMMITTED transactions ignores the time skew.
+$node2->psql('postgres', "UPDATE summary SET ntrans = 1");
+$ntrans = $node1->safe_psql('postgres', "SELECT ntrans FROM summary");
+note("$ntrans");
+is( $ntrans, 1, 'Read committed behavior if snapshot turn sour');
+
+# But REPEATABLE READ transactions isn't
+$node1->safe_psql('postgres', "ALTER SYSTEM SET csn_time_shift = +100");
+$node1->restart();
+my $err = '';
+$node2->psql('postgres', "START TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE summary SET ntrans = 2; COMMIT;", stderr => \$err);
+$ntrans = $node1->safe_psql('postgres', "SELECT ntrans FROM summary");
+note("$ntrans");
+is( (($ntrans == 1) and (index($err, 'csn snapshot too old') != -1)), 1, 'Read committed can\'t update if snapshot turn sour');
+
+# ##############################################################################
+#
+# Test on issue #1:
+# 'xact confirmed as committed, so any following xact must see its effects'.
+#
+# ##############################################################################
+$node1->safe_psql('postgres', "delete from t");
+$node1->safe_psql('postgres', "ALTER SYSTEM SET csn_time_shift = 5");
+$node2->safe_psql('postgres', "ALTER SYSTEM SET csn_time_shift = 0");
+$node1->restart();
+$node2->restart();
+
+my $st_sec; my $end_sec;
+my $time_diff;
+
+$node2->safe_psql('postgres', "START TRANSACTION ISOLATION LEVEL REPEATABLE READ; INSERT INTO t VALUES(1,1), (3,1); COMMIT;");
+$ntrans = $node2->safe_psql('postgres', "START TRANSACTION ISOLATION LEVEL REPEATABLE READ; SELECT count(*) FROM t; COMMIT;");
+is( $ntrans, 2, 'Slow node can see mix node data change');
+$ntrans = $node1->safe_psql('postgres', "START TRANSACTION ISOLATION LEVEL REPEATABLE READ; SELECT count(*) FROM t; COMMIT;");
+is( $ntrans, 2, 'Fast node can see mix node data change');
+
+$node2->safe_psql('postgres', "START TRANSACTION ISOLATION LEVEL REPEATABLE READ; INSERT INTO t VALUES(1,1); COMMIT;");
+$ntrans = $node2->safe_psql('postgres', "START TRANSACTION ISOLATION LEVEL REPEATABLE READ; SELECT count(*) FROM t; COMMIT;");
+is( $ntrans, 3, 'CURRENTLY FAILED:Data change to fast node on slow node, and slow node can see data change');
+
+# READ COMMITED mode ignores the time skew.
+$node1->safe_psql('postgres', "UPDATE summary SET ntrans = 1");
+$ntrans = $node2->safe_psql('postgres', "SELECT ntrans FROM summary");
+note("ntrans: $ntrans\n");
+is( $ntrans, 1, 'See committed values in the READ COMMITTED mode');
+
+# Access from the future
+$node1->safe_psql('postgres', "START TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE summary SET ntrans = ntrans + 1; COMMIT;");
+$ntrans = $node2->safe_psql('postgres', "START TRANSACTION ISOLATION LEVEL REPEATABLE READ; SELECT ntrans FROM summary; COMMIT;");
+note("ntrans: $ntrans\n");
+is( $ntrans, 1, 'Do not see values, committed in the future at the REPEATABLE READ mode');
+
+# But...
+$node1->safe_psql('postgres', "ALTER SYSTEM SET csn_time_shift = 0");
+$node2->safe_psql('postgres', "ALTER SYSTEM SET csn_time_shift = 5");
+$node1->restart();
+$node2->restart();
+
+# Check READ COMMITED mode
+$node2->safe_psql('postgres', "UPDATE summary SET ntrans = 2");
+$ntrans = $node1->safe_psql('postgres', "SELECT ntrans FROM summary");
+note("ntrans: $ntrans\n");
+is( $ntrans, 2, 'See committed values in the READ COMMITTED mode, step 2');
+
+# Node from the future will wait for a time before UPDATE table.
+($st_sec) = localtime();
+$node2->safe_psql('postgres', "START TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE summary SET ntrans = 3; COMMIT;");
+($end_sec) = localtime(); $time_diff = $end_sec - $st_sec;
+$ntrans = $node1->safe_psql('postgres', "START TRANSACTION ISOLATION LEVEL REPEATABLE READ; SELECT ntrans FROM summary; COMMIT;");
+note("ntrans: $ntrans, Test time: $time_diff seconds");
+is( ($ntrans == 3), 1, 'The test execution time correlates with the time offset.');
+
+# Node from the future will wait for a time before SELECT from a table.
+$node1->safe_psql('postgres', "START TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE summary SET ntrans = 4; COMMIT;");
+($st_sec) = localtime();
+$ntrans = $node2->safe_psql('postgres', "START TRANSACTION ISOLATION LEVEL REPEATABLE READ; SELECT ntrans FROM summary; COMMIT;");
+($end_sec) = localtime(); $time_diff = $end_sec - $st_sec;
+note("ntrans: $ntrans, Test time: $time_diff seconds ($end_sec, $st_sec)");
+is( ($ntrans == 4), 1, 'See values, committed in the past. The test execution time correlates with the time offset.');
+
+$node1->safe_psql('postgres', "UPDATE summary SET ntrans = 0, value = 0");
+$q1 = File::Temp->new();
+append_to_file($q1, q{
+ UPDATE summary SET value = value + 1, ntrans = ntrans + 1;
+});
+$q2 = File::Temp->new();
+append_to_file($q2, q{
+ START TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+ UPDATE summary SET value = value + (SELECT SUM(ntrans) FROM summary);
+ UPDATE summary SET value = value - (SELECT SUM(ntrans) FROM summary);
+ COMMIT;
+});
+$seconds = 3;
+$pgb_handle1 = $node1->pgbench_async(-n, -c => 1, -T => $seconds, -f => $q1, 'postgres' );
+$pgb_handle2 = $node2->pgbench_async(-n, -c => 1, -T => $seconds, -f => $q2, 'postgres' );
+$node1->pgbench_await($pgb_handle1);
+$node2->pgbench_await($pgb_handle2);
+
+$count1 = $node1->safe_psql('postgres', "SELECT SUM(value) FROM summary");
+$count2 = $node1->safe_psql('postgres', "SELECT SUM(ntrans) FROM summary");
+note("$count1, $count2");
+is( ( ($count1 > 0) and ($count1 == $count2)), 1, 'Skew test');
+
+$node1->stop();
+$node2->stop();
diff --git a/src/test/modules/csnsnapshot/t/004_read_committed.pl b/src/test/modules/csnsnapshot/t/004_read_committed.pl
new file mode 100644
index 0000000000..ba27536a7a
--- /dev/null
+++ b/src/test/modules/csnsnapshot/t/004_read_committed.pl
@@ -0,0 +1,97 @@
+use strict;
+use warnings;
+
+use TestLib;
+use Test::More tests => 2;
+use PostgresNode;
+
+my $node1 = PostgresNode->new('csn1');
+$node1->init;
+$node1->append_conf('postgresql.conf', qq{
+ max_prepared_transactions = 20
+ shared_preload_libraries = 'postgres_fdw'
+ enable_csn_snapshot = on
+ csn_snapshot_defer_time = 10
+ postgres_fdw.use_csn_snapshots = true
+ csn_time_shift = 0
+ });
+$node1->start;
+my $node2 = PostgresNode->new('csn2');
+$node2->init;
+$node2->append_conf('postgresql.conf', qq{
+ max_prepared_transactions = 20
+ shared_preload_libraries = 'postgres_fdw'
+ enable_csn_snapshot = on
+ csn_snapshot_defer_time = 10
+ postgres_fdw.use_csn_snapshots = true
+ csn_time_shift = 0
+ });
+$node2->start;
+
+# Create foreign servers
+$node1->safe_psql('postgres', "
+ CREATE EXTENSION postgres_fdw;
+ CREATE SERVER remote FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '".$node2->port."');
+ CREATE USER MAPPING FOR PUBLIC SERVER remote;
+");
+$node2->safe_psql('postgres', "
+ CREATE EXTENSION postgres_fdw;
+ CREATE SERVER remote FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '".$node1->port."');
+ CREATE USER MAPPING FOR PUBLIC SERVER remote;
+");
+
+# Create sharded table
+$node1->safe_psql('postgres', "
+ CREATE TABLE dept1(name TEXT);
+ CREATE FOREIGN TABLE dept2 (name TEXT) SERVER remote;
+");
+$node2->safe_psql('postgres', "
+ CREATE TABLE dept2(name TEXT);
+ CREATE FOREIGN TABLE dept1 (name TEXT) SERVER remote;
+ CREATE TABLE results(success_tx int);
+ INSERT INTO results (success_tx) VALUES (0);
+");
+
+# Fill the table
+$node1->safe_psql('postgres', "INSERT INTO dept1 (name) VALUES ('Jonathan')");
+$node1->safe_psql('postgres', "INSERT INTO dept2 (name) VALUES ('Hoshi')");
+$node2->safe_psql('postgres', "INSERT INTO dept1 (name) VALUES ('Leonard')");
+my $count1 = $node1->safe_psql('postgres', "SELECT count(*) FROM ((SELECT * FROM dept1) UNION (SELECT * FROM dept2)) AS a");
+my $count2 = $node2->safe_psql('postgres', "SELECT count(*) FROM ((SELECT * FROM dept1) UNION (SELECT * FROM dept2)) AS a");
+note("$count1, $count2");
+is( (($count1 == 3) and ($count1 == $count2)), 1, 'Correct insert');
+
+# Queries
+my $q1 = File::Temp->new();
+append_to_file($q1, q{
+ BEGIN;
+ SELECT count(*) AS cnt FROM dept1; \gset
+ \if :cnt > 0
+ INSERT INTO dept2 (SELECT * FROM dept1);
+ DELETE FROM dept1;
+ \else
+ INSERT INTO dept1 (SELECT * FROM dept2);
+ DELETE FROM dept2;
+ \endif
+
+ COMMIT;
+});
+my $q2 = File::Temp->new();
+append_to_file($q2, q{
+ SELECT count(*) AS cnt FROM ((SELECT * FROM dept1) UNION (SELECT * FROM dept2)) AS a; \gset
+ \if :cnt = 3
+ UPDATE results SET success_tx = success_tx + 1;
+ \endif
+});
+my $transactions = 1000;
+my $pgb_handle1 = $node1->pgbench_async(-n, -c => 1, -t => $transactions, -f => $q1, 'postgres' );
+my $pgb_handle2 = $node2->pgbench_async(-n, -c => 20, -t => $transactions, -f => $q2, 'postgres' );
+$node1->pgbench_await($pgb_handle1);
+$node2->pgbench_await($pgb_handle2);
+
+$count2 = $node2->safe_psql('postgres', "SELECT success_tx FROM results");
+note("$count2");
+is( $count2, 20*$transactions, 'Correct READ COMMITTED updates');
+
+$node1->stop();
+$node2->stop();
diff --git a/src/test/modules/csnsnapshot/t/005_basic_visibility.pl b/src/test/modules/csnsnapshot/t/005_basic_visibility.pl
new file mode 100644
index 0000000000..93fa348e7b
--- /dev/null
+++ b/src/test/modules/csnsnapshot/t/005_basic_visibility.pl
@@ -0,0 +1,181 @@
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 3;
+
+my $node1 = PostgreSQL::Test::Cluster->new('csn1');
+$node1->init;
+$node1->append_conf('postgresql.conf', qq{
+ enable_csn_snapshot = on
+ csn_snapshot_defer_time = 10
+ max_prepared_transactions = 30
+ csn_time_shift = 0
+ shared_preload_libraries = 'postgres_fdw'
+ postgres_fdw.use_csn_snapshots = true
+ log_statement = none
+ default_transaction_isolation = 'REPEATABLE READ'
+ log_min_messages = LOG
+ });
+
+my $node2 = PostgreSQL::Test::Cluster->new('csn2');
+$node2->init;
+$node2->append_conf('postgresql.conf', qq{
+ enable_csn_snapshot = on
+ csn_snapshot_defer_time = 10
+ max_prepared_transactions = 30
+ csn_time_shift = 0
+ shared_preload_libraries = 'postgres_fdw'
+ postgres_fdw.use_csn_snapshots = true
+ log_statement = none
+ default_transaction_isolation = 'REPEATABLE READ'
+ log_min_messages = LOG
+ });
+$node1->start;
+$node2->start;
+
+$node1->safe_psql('postgres', "
+ CREATE EXTENSION postgres_fdw;
+ CREATE SERVER remote FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '".$node2->port."');
+ CREATE USER MAPPING FOR PUBLIC SERVER remote;
+ CREATE TABLE test(key int, x bigint) PARTITION BY LIST (key);
+ CREATE TABLE t1 PARTITION OF test FOR VALUES IN (1);
+ CREATE FOREIGN TABLE t2 PARTITION OF test FOR VALUES IN (2) SERVER remote;
+");
+$node2->safe_psql('postgres', "
+ CREATE EXTENSION postgres_fdw;
+ CREATE SERVER remote FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '".$node1->port."');
+ CREATE USER MAPPING FOR PUBLIC SERVER remote;
+ CREATE TABLE test(key int, x bigint) PARTITION BY LIST (key);
+ CREATE FOREIGN TABLE t1 PARTITION OF test FOR VALUES IN (1) SERVER remote;
+ CREATE TABLE t2 PARTITION OF test FOR VALUES IN (2);
+");
+$node1->safe_psql('postgres', "
+ INSERT INTO test (key, x) VALUES (1, -1);
+ INSERT INTO test (key, x) VALUES (2, 1);
+");
+
+$node1->safe_psql('postgres', "VACUUM FULL");
+$node2->safe_psql('postgres', "VACUUM FULL");
+
+# ##############################################################################
+#
+# Tests
+#
+# ##############################################################################
+
+my $updates = File::Temp->new();
+append_to_file($updates, q{
+ BEGIN;
+ UPDATE test SET x = x + 1;
+ UPDATE test SET x = x - 1;
+ END;
+});
+
+my $local_update1 = File::Temp->new();
+append_to_file($local_update1, q{
+ BEGIN;
+ UPDATE t1 SET x = x + 1;
+ UPDATE t1 SET x = x - 1;
+ END;
+});
+my $local_update2 = File::Temp->new();
+append_to_file($local_update2, q{
+ BEGIN;
+ UPDATE t2 SET x = x + 1;
+ UPDATE t2 SET x = x - 1;
+ END;
+});
+
+my ($pgb_handle1, $pgb_handle2, $pgb_handle3, $sum1, $sum2, $errors, $selects, $started);
+my $test_time = 30;
+my $result;
+
+# ##############################################################################
+#
+# Concurrent local UPDATE and global SELECT
+#
+# ##############################################################################
+$errors = 0;
+$selects = 0;
+$started = time();
+$pgb_handle1 = $node1->pgbench_async(-n, -c => 5, -T => $test_time, -f => $local_update1, 'postgres' );
+while (time() - $started < $test_time)
+{
+ $result = $node2->safe_psql('postgres', "
+ SELECT 'sum=' || sum(x), (SELECT x FROM t1), (SELECT x FROM t2)
+ FROM test;");
+
+ if ( index($result, "sum=0") < 0 )
+ {
+ diag("[$selects] Isolation error. result = [ $result ]");
+ $errors++;
+ $node1->stop();
+ $node2->stop();
+ exit(1);
+ }
+ $selects++;
+}
+$node1->pgbench_await($pgb_handle1);
+note("TOTAL: selects = $selects, errors = $errors");
+is($errors == 0, 1, 'Local updates');
+#exit(1);
+
+# ##############################################################################
+#
+# Global UPDATE and global SELECT
+#
+# ##############################################################################
+$errors = 0;
+$selects = 0;
+$started = time();
+$pgb_handle1 = $node1->pgbench_async(-n, -c => 5, -T => $test_time, -f => $updates, 'postgres' );
+while (time() - $started < $test_time)
+{
+ $result = $node2->safe_psql('postgres', "
+ SELECT 'sum=' || sum(x), (SELECT x FROM t1), (SELECT x FROM t2)
+ FROM test;");
+
+ if ( index($result, "sum=0") < 0 )
+ {
+ diag("[$selects] Isolation error. result = [ $result ]");
+ $errors++;
+ }
+ $selects++;
+}
+$node1->pgbench_await($pgb_handle1);
+note("TOTAL: selects = $selects, errors = $errors");
+is($errors == 0, 1, 'Distributed updates');
+
+# ##############################################################################
+#
+# Local UPDATEs, global UPDATE and global SELECT
+#
+# ##############################################################################
+$errors = 0;
+$selects = 0;
+$started = time();
+$pgb_handle1 = $node1->pgbench_async(-n, -c => 2, -T => $test_time, -f => $updates, 'postgres' );
+$pgb_handle2 = $node2->pgbench_async(-n, -c => 2, -T => $test_time, -f => $local_update2, 'postgres' );
+$pgb_handle3 = $node1->pgbench_async(-n, -c => 2, -T => $test_time, -f => $local_update1, 'postgres' );
+while (time() - $started < $test_time)
+{
+ $sum1 = $node1->safe_psql('postgres', "SELECT sum(x) FROM test;");
+ $sum2 = $node2->safe_psql('postgres', "SELECT sum(x) FROM test;");
+
+ if ( ($sum1 ne 0) or ($sum2 ne 0) )
+ {
+ diag("[$selects] Isolation error. Sums = [ $sum1, $sum2 ]");
+ $errors++;
+ }
+ $selects++;
+}
+$node1->pgbench_await($pgb_handle1);
+$node1->pgbench_await($pgb_handle3);
+$node2->pgbench_await($pgb_handle2);
+note("TOTAL: selects = $selects, errors = $errors");
+is($errors == 0, 1, 'Mix of local and distributed updates');
+
+$node1->stop();
+$node2->stop();
diff --git a/src/test/modules/snapshot_too_old/sto.conf b/src/test/modules/snapshot_too_old/sto.conf
index 7eeaeeb0dc..3177cc0e15 100644
--- a/src/test/modules/snapshot_too_old/sto.conf
+++ b/src/test/modules/snapshot_too_old/sto.conf
@@ -1,2 +1,3 @@
autovacuum = off
old_snapshot_threshold = 0
+enable_csn_snapshot = false
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 2088857615..010b3b3144 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -104,6 +104,8 @@ select name, setting from pg_settings where name like 'enable%';
--------------------------------+---------
enable_async_append | on
enable_bitmapscan | on
+ enable_csn_snapshot | on
+ enable_csn_wal | on
enable_gathermerge | on
enable_hashagg | on
enable_hashjoin | on
@@ -122,7 +124,7 @@ select name, setting from pg_settings where name like 'enable%';
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(20 rows)
+(22 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail
--
2.25.1