Rebased onto current master (fb544735f1). -- Andrey Lepikhov Postgres Professional https://postgrespro.com The Russian Postgres Company
>From 29183c42a8ae31b830ab5af0dfcfdaadd6229700 Mon Sep 17 00:00:00 2001 From: "Andrey V. Lepikhov" <a.lepik...@postgrespro.ru> Date: Tue, 12 May 2020 08:29:54 +0500 Subject: [PATCH 1/3] GlobalCSNLog-SLRU-v3
--- src/backend/access/transam/Makefile | 1 + src/backend/access/transam/global_csn_log.c | 439 ++++++++++++++++++++ src/backend/access/transam/twophase.c | 1 + src/backend/access/transam/varsup.c | 2 + src/backend/access/transam/xlog.c | 12 + src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/ipc/procarray.c | 3 + src/backend/storage/lmgr/lwlocknames.txt | 1 + src/backend/tcop/postgres.c | 1 + src/backend/utils/misc/guc.c | 9 + src/backend/utils/probes.d | 2 + src/bin/initdb/initdb.c | 3 +- src/include/access/global_csn_log.h | 30 ++ src/include/storage/lwlock.h | 1 + src/include/utils/snapshot.h | 3 + 15 files changed, 510 insertions(+), 1 deletion(-) create mode 100644 src/backend/access/transam/global_csn_log.c create mode 100644 src/include/access/global_csn_log.h diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 595e02de72..60ff8b141e 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ clog.o \ commit_ts.o \ + global_csn_log.o \ generic_xlog.o \ multixact.o \ parallel.o \ diff --git a/src/backend/access/transam/global_csn_log.c b/src/backend/access/transam/global_csn_log.c new file mode 100644 index 0000000000..6f7fded350 --- /dev/null +++ b/src/backend/access/transam/global_csn_log.c @@ -0,0 +1,439 @@ +/*----------------------------------------------------------------------------- + * + * global_csn_log.c + * Track global commit sequence numbers of finished transactions + * + * Implementation of cross-node transaction isolation relies on commit sequence + * number (CSN) based visibility rules. This module provides SLRU to store + * CSN for each transaction. This mapping need to be kept only for xid's + * greater then oldestXid, but that can require arbitrary large amounts of + * memory in case of long-lived transactions. Because of same lifetime and + * persistancy requirements this module is quite similar to subtrans.c + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/transam/global_csn_log.c + * + *----------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/global_csn_log.h" +#include "access/slru.h" +#include "access/subtrans.h" +#include "access/transam.h" +#include "miscadmin.h" +#include "pg_trace.h" +#include "utils/snapmgr.h" + +bool track_global_snapshots; + +/* + * Defines for GlobalCSNLog page sizes. A page is the same BLCKSZ as is used + * everywhere else in Postgres. + * + * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, + * GlobalCSNLog page numbering also wraps around at + * 0xFFFFFFFF/GLOBAL_CSN_LOG_XACTS_PER_PAGE, and GlobalCSNLog segment numbering at + * 0xFFFFFFFF/CLOG_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no + * explicit notice of that fact in this module, except when comparing segment + * and page numbers in TruncateGlobalCSNLog (see GlobalCSNLogPagePrecedes). + */ + +/* We store the commit GlobalCSN for each xid */ +#define GCSNLOG_XACTS_PER_PAGE (BLCKSZ / sizeof(GlobalCSN)) + +#define TransactionIdToPage(xid) ((xid) / (TransactionId) GCSNLOG_XACTS_PER_PAGE) +#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) GCSNLOG_XACTS_PER_PAGE) + +/* + * Link to shared-memory data structures for CLOG control + */ +static SlruCtlData GlobalCSNLogCtlData; +#define GlobalCsnlogCtl (&GlobalCSNLogCtlData) + +static int ZeroGlobalCSNLogPage(int pageno); +static bool GlobalCSNLogPagePrecedes(int page1, int page2); +static void GlobalCSNLogSetPageStatus(TransactionId xid, int nsubxids, + TransactionId *subxids, + GlobalCSN csn, int pageno); +static void GlobalCSNLogSetCSNInSlot(TransactionId xid, GlobalCSN csn, + int slotno); + +/* + * GlobalCSNLogSetCSN + * + * Record GlobalCSN of transaction and its subtransaction tree. + * + * xid is a single xid to set status for. This will typically be the top level + * transactionid for a top level commit or abort. It can also be a + * subtransaction when we record transaction aborts. + * + * subxids is an array of xids of length nsubxids, representing subtransactions + * in the tree of xid. In various cases nsubxids may be zero. + * + * csn is the commit sequence number of the transaction. It should be + * AbortedGlobalCSN for abort cases. + */ +void +GlobalCSNLogSetCSN(TransactionId xid, int nsubxids, + TransactionId *subxids, GlobalCSN csn) +{ + int pageno; + int i = 0; + int offset = 0; + + /* Callers of GlobalCSNLogSetCSN() must check GUC params */ + Assert(track_global_snapshots); + + Assert(TransactionIdIsValid(xid)); + + pageno = TransactionIdToPage(xid); /* get page of parent */ + for (;;) + { + int num_on_page = 0; + + while (i < nsubxids && TransactionIdToPage(subxids[i]) == pageno) + { + num_on_page++; + i++; + } + + GlobalCSNLogSetPageStatus(xid, + num_on_page, subxids + offset, + csn, pageno); + if (i >= nsubxids) + break; + + offset = i; + pageno = TransactionIdToPage(subxids[offset]); + xid = InvalidTransactionId; + } +} + +/* + * Record the final state of transaction entries in the csn log for + * all entries on a single page. Atomic only on this page. + * + * Otherwise API is same as TransactionIdSetTreeStatus() + */ +static void +GlobalCSNLogSetPageStatus(TransactionId xid, int nsubxids, + TransactionId *subxids, + GlobalCSN csn, int pageno) +{ + int slotno; + int i; + + LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE); + + slotno = SimpleLruReadPage(GlobalCsnlogCtl, pageno, true, xid); + + /* Subtransactions first, if needed ... */ + for (i = 0; i < nsubxids; i++) + { + Assert(GlobalCsnlogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i])); + GlobalCSNLogSetCSNInSlot(subxids[i], csn, slotno); + } + + /* ... then the main transaction */ + if (TransactionIdIsValid(xid)) + GlobalCSNLogSetCSNInSlot(xid, csn, slotno); + + GlobalCsnlogCtl->shared->page_dirty[slotno] = true; + + LWLockRelease(GlobalCSNLogControlLock); +} + +/* + * Sets the commit status of a single transaction. + */ +static void +GlobalCSNLogSetCSNInSlot(TransactionId xid, GlobalCSN csn, int slotno) +{ + int entryno = TransactionIdToPgIndex(xid); + GlobalCSN *ptr; + + Assert(LWLockHeldByMe(GlobalCSNLogControlLock)); + + ptr = (GlobalCSN *) (GlobalCsnlogCtl->shared->page_buffer[slotno] + entryno * sizeof(XLogRecPtr)); + + *ptr = csn; +} + +/* + * Interrogate the state of a transaction in the log. + * + * NB: this is a low-level routine and is NOT the preferred entry point + * for most uses; TransactionIdGetGlobalCSN() in global_snapshot.c is the + * intended caller. + */ +GlobalCSN +GlobalCSNLogGetCSN(TransactionId xid) +{ + int pageno = TransactionIdToPage(xid); + int entryno = TransactionIdToPgIndex(xid); + int slotno; + GlobalCSN *ptr; + GlobalCSN global_csn; + + /* Callers of GlobalCSNLogGetCSN() must check GUC params */ + Assert(track_global_snapshots); + + /* Can't ask about stuff that might not be around anymore */ + Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin)); + + /* lock is acquired by SimpleLruReadPage_ReadOnly */ + + slotno = SimpleLruReadPage_ReadOnly(GlobalCsnlogCtl, pageno, xid); + ptr = (GlobalCSN *) (GlobalCsnlogCtl->shared->page_buffer[slotno] + entryno * sizeof(XLogRecPtr)); + global_csn = *ptr; + + LWLockRelease(GlobalCSNLogControlLock); + + return global_csn; +} + +/* + * Number of shared GlobalCSNLog buffers. + */ +static Size +GlobalCSNLogShmemBuffers(void) +{ + return Min(32, Max(4, NBuffers / 512)); +} + +/* + * Reserve shared memory for GlobalCsnlogCtl. + */ +Size +GlobalCSNLogShmemSize(void) +{ + if (!track_global_snapshots) + return 0; + + return SimpleLruShmemSize(GlobalCSNLogShmemBuffers(), 0); +} + +/* + * Initialization of shared memory for GlobalCSNLog. + */ +void +GlobalCSNLogShmemInit(void) +{ + if (!track_global_snapshots) + return; + + GlobalCsnlogCtl->PagePrecedes = GlobalCSNLogPagePrecedes; + SimpleLruInit(GlobalCsnlogCtl, "GlobalCSNLog Ctl", GlobalCSNLogShmemBuffers(), 0, + GlobalCSNLogControlLock, "pg_global_csn", LWTRANCHE_GLOBAL_CSN_LOG_BUFFERS); +} + +/* + * This func must be called ONCE on system install. It creates the initial + * GlobalCSNLog segment. The pg_global_csn directory is assumed to have been + * created by initdb, and GlobalCSNLogShmemInit must have been called already. + */ +void +BootStrapGlobalCSNLog(void) +{ + int slotno; + + if (!track_global_snapshots) + return; + + LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE); + + /* Create and zero the first page of the commit log */ + slotno = ZeroGlobalCSNLogPage(0); + + /* Make sure it's written out */ + SimpleLruWritePage(GlobalCsnlogCtl, slotno); + Assert(!GlobalCsnlogCtl->shared->page_dirty[slotno]); + + LWLockRelease(GlobalCSNLogControlLock); +} + +/* + * Initialize (or reinitialize) a page of GlobalCSNLog to zeroes. + * + * The page is not actually written, just set up in shared memory. + * The slot number of the new page is returned. + * + * Control lock must be held at entry, and will be held at exit. + */ +static int +ZeroGlobalCSNLogPage(int pageno) +{ + Assert(LWLockHeldByMe(GlobalCSNLogControlLock)); + return SimpleLruZeroPage(GlobalCsnlogCtl, pageno); +} + +/* + * This must be called ONCE during postmaster or standalone-backend startup, + * after StartupXLOG has initialized ShmemVariableCache->nextXid. + * + * oldestActiveXID is the oldest XID of any prepared transaction, or nextXid + * if there are none. + */ +void +StartupGlobalCSNLog(TransactionId oldestActiveXID) +{ + int startPage; + int endPage; + + if (!track_global_snapshots) + return; + + /* + * Since we don't expect pg_global_csn to be valid across crashes, we + * initialize the currently-active page(s) to zeroes during startup. + * Whenever we advance into a new page, ExtendGlobalCSNLog will likewise + * zero the new page without regard to whatever was previously on disk. + */ + LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE); + + startPage = TransactionIdToPage(oldestActiveXID); + endPage = TransactionIdToPage(XidFromFullTransactionId(ShmemVariableCache->nextFullXid)); + + while (startPage != endPage) + { + (void) ZeroGlobalCSNLogPage(startPage); + startPage++; + /* must account for wraparound */ + if (startPage > TransactionIdToPage(MaxTransactionId)) + startPage = 0; + } + (void) ZeroGlobalCSNLogPage(startPage); + + LWLockRelease(GlobalCSNLogControlLock); +} + +/* + * This must be called ONCE during postmaster or standalone-backend shutdown + */ +void +ShutdownGlobalCSNLog(void) +{ + if (!track_global_snapshots) + return; + + /* + * Flush dirty GlobalCSNLog pages to disk. + * + * This is not actually necessary from a correctness point of view. We do + * it merely as a debugging aid. + */ + TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_START(false); + SimpleLruFlush(GlobalCsnlogCtl, false); + TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_DONE(false); +} + +/* + * Perform a checkpoint --- either during shutdown, or on-the-fly + */ +void +CheckPointGlobalCSNLog(void) +{ + if (!track_global_snapshots) + return; + + /* + * Flush dirty GlobalCSNLog pages to disk. + * + * This is not actually necessary from a correctness point of view. We do + * it merely to improve the odds that writing of dirty pages is done by + * the checkpoint process and not by backends. + */ + TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_START(true); + SimpleLruFlush(GlobalCsnlogCtl, true); + TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_DONE(true); +} + +/* + * Make sure that GlobalCSNLog has room for a newly-allocated XID. + * + * NB: this is called while holding XidGenLock. We want it to be very fast + * most of the time; even when it's not so fast, no actual I/O need happen + * unless we're forced to write out a dirty clog or xlog page to make room + * in shared memory. + */ +void +ExtendGlobalCSNLog(TransactionId newestXact) +{ + int pageno; + + if (!track_global_snapshots) + return; + + /* + * No work except at first XID of a page. But beware: just after + * wraparound, the first XID of page zero is FirstNormalTransactionId. + */ + if (TransactionIdToPgIndex(newestXact) != 0 && + !TransactionIdEquals(newestXact, FirstNormalTransactionId)) + return; + + pageno = TransactionIdToPage(newestXact); + + LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE); + + /* Zero the page and make an XLOG entry about it */ + ZeroGlobalCSNLogPage(pageno); + + LWLockRelease(GlobalCSNLogControlLock); +} + +/* + * Remove all GlobalCSNLog segments before the one holding the passed + * transaction ID. + * + * This is normally called during checkpoint, with oldestXact being the + * oldest TransactionXmin of any running transaction. + */ +void +TruncateGlobalCSNLog(TransactionId oldestXact) +{ + int cutoffPage; + + if (!track_global_snapshots) + return; + + /* + * The cutoff point is the start of the segment containing oldestXact. We + * pass the *page* containing oldestXact to SimpleLruTruncate. We step + * back one transaction to avoid passing a cutoff page that hasn't been + * created yet in the rare case that oldestXact would be the first item on + * a page and oldestXact == next XID. In that case, if we didn't subtract + * one, we'd trigger SimpleLruTruncate's wraparound detection. + */ + TransactionIdRetreat(oldestXact); + cutoffPage = TransactionIdToPage(oldestXact); + + SimpleLruTruncate(GlobalCsnlogCtl, cutoffPage); +} + +/* + * Decide which of two GlobalCSNLog page numbers is "older" for truncation + * purposes. + * + * We need to use comparison of TransactionIds here in order to do the right + * thing with wraparound XID arithmetic. However, if we are asked about + * page number zero, we don't want to hand InvalidTransactionId to + * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So, + * offset both xids by FirstNormalTransactionId to avoid that. + */ +static bool +GlobalCSNLogPagePrecedes(int page1, int page2) +{ + TransactionId xid1; + TransactionId xid2; + + xid1 = ((TransactionId) page1) * GCSNLOG_XACTS_PER_PAGE; + xid1 += FirstNormalTransactionId; + xid2 = ((TransactionId) page2) * GCSNLOG_XACTS_PER_PAGE; + xid2 += FirstNormalTransactionId; + + return TransactionIdPrecedes(xid1, xid2); +} diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 2f7d4ed59a..0ecc02a3dd 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -77,6 +77,7 @@ #include <unistd.h> #include "access/commit_ts.h" +#include "access/global_csn_log.h" #include "access/htup_details.h" #include "access/subtrans.h" #include "access/transam.h" diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 2570e7086a..882bc66825 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -15,6 +15,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/global_csn_log.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" @@ -173,6 +174,7 @@ GetNewTransactionId(bool isSubXact) * Extend pg_subtrans and pg_commit_ts too. */ ExtendCLOG(xid); + ExtendGlobalCSNLog(xid); ExtendCommitTs(xid); ExtendSUBTRANS(xid); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0d3d670928..285d9d442e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -24,6 +24,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/global_csn_log.h" #include "access/heaptoast.h" #include "access/multixact.h" #include "access/rewriteheap.h" @@ -5345,6 +5346,7 @@ BootStrapXLOG(void) /* Bootstrap the commit log, too */ BootStrapCLOG(); + BootStrapGlobalCSNLog(); BootStrapCommitTs(); BootStrapSUBTRANS(); BootStrapMultiXact(); @@ -7054,6 +7056,7 @@ StartupXLOG(void) * maintained during recovery and need not be started yet. */ StartupCLOG(); + StartupGlobalCSNLog(oldestActiveXID); StartupSUBTRANS(oldestActiveXID); /* @@ -7871,6 +7874,7 @@ StartupXLOG(void) if (standbyState == STANDBY_DISABLED) { StartupCLOG(); + StartupGlobalCSNLog(oldestActiveXID); StartupSUBTRANS(oldestActiveXID); } @@ -8518,6 +8522,7 @@ ShutdownXLOG(int code, Datum arg) CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); } ShutdownCLOG(); + ShutdownGlobalCSNLog(); ShutdownCommitTs(); ShutdownSUBTRANS(); ShutdownMultiXact(); @@ -9090,7 +9095,10 @@ CreateCheckPoint(int flags) * StartupSUBTRANS hasn't been called yet. */ if (!RecoveryInProgress()) + { TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT)); + TruncateGlobalCSNLog(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT)); + } /* Real work is done, but log and update stats before releasing lock. */ LogCheckpointEnd(false); @@ -9166,6 +9174,7 @@ static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags) { CheckPointCLOG(); + CheckPointGlobalCSNLog(); CheckPointCommitTs(); CheckPointSUBTRANS(); CheckPointMultiXact(); @@ -9450,7 +9459,10 @@ CreateRestartPoint(int flags) * this because StartupSUBTRANS hasn't been called yet. */ if (EnableHotStandby) + { TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT)); + TruncateGlobalCSNLog(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT)); + } /* Real work is done, but log and update before releasing lock. */ LogCheckpointEnd(true); diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 427b0d59cd..dc2d2959c4 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -16,6 +16,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/global_csn_log.h" #include "access/heapam.h" #include "access/multixact.h" #include "access/nbtree.h" @@ -125,6 +126,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, ProcGlobalShmemSize()); size = add_size(size, XLOGShmemSize()); size = add_size(size, CLOGShmemSize()); + size = add_size(size, GlobalCSNLogShmemSize()); size = add_size(size, CommitTsShmemSize()); size = add_size(size, SUBTRANSShmemSize()); size = add_size(size, TwoPhaseShmemSize()); @@ -213,6 +215,7 @@ CreateSharedMemoryAndSemaphores(void) */ XLOGShmemInit(); CLOGShmemInit(); + GlobalCSNLogShmemInit(); CommitTsShmemInit(); SUBTRANSShmemInit(); MultiXactShmemInit(); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 363000670b..8ae4906474 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -46,6 +46,7 @@ #include <signal.h> #include "access/clog.h" +#include "access/global_csn_log.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/twophase.h" @@ -835,6 +836,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) while (TransactionIdPrecedes(latestObservedXid, running->nextXid)) { ExtendSUBTRANS(latestObservedXid); + ExtendGlobalCSNLog(latestObservedXid); TransactionIdAdvance(latestObservedXid); } TransactionIdRetreat(latestObservedXid); /* = running->nextXid - 1 */ @@ -3337,6 +3339,7 @@ RecordKnownAssignedTransactionIds(TransactionId xid) while (TransactionIdPrecedes(next_expected_xid, xid)) { TransactionIdAdvance(next_expected_xid); + ExtendGlobalCSNLog(next_expected_xid); ExtendSUBTRANS(next_expected_xid); } Assert(next_expected_xid == xid); diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index db47843229..fe18c93b61 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -49,3 +49,4 @@ MultiXactTruncationLock 41 OldSnapshotTimeMapLock 42 LogicalRepWorkerLock 43 CLogTruncationLock 44 +GlobalCSNLogControlLock 45 diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 802b1ec22f..d0bb64870c 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -42,6 +42,7 @@ #include "catalog/pg_type.h" #include "commands/async.h" #include "commands/prepare.h" +#include "common/hashfn.h" #include "executor/spi.h" #include "jit/jit.h" #include "libpq/libpq.h" diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 53a6cd2436..0ca331c6f9 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1175,6 +1175,15 @@ static struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + { + {"track_global_snapshots", PGC_POSTMASTER, RESOURCES_MEM, + gettext_noop("Enable global snapshot tracking."), + gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.") + }, + &track_global_snapshots, + true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */ + NULL, NULL, NULL + }, { {"ssl", PGC_SIGHUP, CONN_AUTH_SSL, gettext_noop("Enables SSL connections."), diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index a0b0458108..f900e7f3b4 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -77,6 +77,8 @@ provider postgresql { probe clog__checkpoint__done(bool); probe subtrans__checkpoint__start(bool); probe subtrans__checkpoint__done(bool); + probe globalcsnlog__checkpoint__start(bool); + probe globalcsnlog__checkpoint__done(bool); probe multixact__checkpoint__start(bool); probe multixact__checkpoint__done(bool); probe twophase__checkpoint__start(); diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index a6577486ce..d0afab9d33 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -220,7 +220,8 @@ static const char *const subdirs[] = { "pg_xact", "pg_logical", "pg_logical/snapshots", - "pg_logical/mappings" + "pg_logical/mappings", + "pg_global_csn" }; diff --git a/src/include/access/global_csn_log.h b/src/include/access/global_csn_log.h new file mode 100644 index 0000000000..417c26c8a3 --- /dev/null +++ b/src/include/access/global_csn_log.h @@ -0,0 +1,30 @@ +/* + * global_csn_log.h + * + * Commit-Sequence-Number log. + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/global_csn_log.h + */ +#ifndef CSNLOG_H +#define CSNLOG_H + +#include "access/xlog.h" +#include "utils/snapshot.h" + +extern void GlobalCSNLogSetCSN(TransactionId xid, int nsubxids, + TransactionId *subxids, GlobalCSN csn); +extern GlobalCSN GlobalCSNLogGetCSN(TransactionId xid); + +extern Size GlobalCSNLogShmemSize(void); +extern void GlobalCSNLogShmemInit(void); +extern void BootStrapGlobalCSNLog(void); +extern void StartupGlobalCSNLog(TransactionId oldestActiveXID); +extern void ShutdownGlobalCSNLog(void); +extern void CheckPointGlobalCSNLog(void); +extern void ExtendGlobalCSNLog(TransactionId newestXact); +extern void TruncateGlobalCSNLog(TransactionId oldestXact); + +#endif /* CSNLOG_H */ \ No newline at end of file diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 8fda8e4f78..c303042663 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -198,6 +198,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_CLOG_BUFFERS = NUM_INDIVIDUAL_LWLOCKS, LWTRANCHE_COMMITTS_BUFFERS, LWTRANCHE_SUBTRANS_BUFFERS, + LWTRANCHE_GLOBAL_CSN_LOG_BUFFERS, LWTRANCHE_MXACTOFFSET_BUFFERS, LWTRANCHE_MXACTMEMBER_BUFFERS, LWTRANCHE_ASYNC_BUFFERS, diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index 4796edb63a..57d2dfaa67 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -20,6 +20,9 @@ #include "storage/buf.h" +typedef uint64 GlobalCSN; +extern bool track_global_snapshots; + /* * The different snapshot types. We use SnapshotData structures to represent * both "regular" (MVCC) snapshots and "special" snapshots that have non-MVCC -- 2.17.1
>From 25a5288764e9e70a0a61a4a1b32111ce8b29c966 Mon Sep 17 00:00:00 2001 From: "Andrey V. Lepikhov" <a.lepik...@postgrespro.ru> Date: Tue, 12 May 2020 08:30:46 +0500 Subject: [PATCH 2/3] Global-snapshots-v3 --- src/backend/access/transam/Makefile | 1 + src/backend/access/transam/global_snapshot.c | 755 ++++++++++++++++++ src/backend/access/transam/twophase.c | 156 ++++ src/backend/access/transam/xact.c | 29 + src/backend/access/transam/xlog.c | 2 + src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/ipc/procarray.c | 92 ++- src/backend/storage/lmgr/lwlocknames.txt | 1 + src/backend/storage/lmgr/proc.c | 5 + src/backend/utils/misc/guc.c | 13 +- src/backend/utils/misc/postgresql.conf.sample | 2 + src/backend/utils/time/snapmgr.c | 167 +++- src/include/access/global_snapshot.h | 72 ++ src/include/access/twophase.h | 1 + src/include/catalog/pg_proc.dat | 14 + src/include/datatype/timestamp.h | 3 + src/include/fmgr.h | 1 + src/include/portability/instr_time.h | 10 + src/include/storage/proc.h | 15 + src/include/storage/procarray.h | 8 + src/include/utils/snapmgr.h | 3 + src/include/utils/snapshot.h | 8 + 22 files changed, 1354 insertions(+), 7 deletions(-) create mode 100644 src/backend/access/transam/global_snapshot.c create mode 100644 src/include/access/global_snapshot.h diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 60ff8b141e..6de567a79b 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -16,6 +16,7 @@ OBJS = \ clog.o \ commit_ts.o \ global_csn_log.o \ + global_snapshot.o \ generic_xlog.o \ multixact.o \ parallel.o \ diff --git a/src/backend/access/transam/global_snapshot.c b/src/backend/access/transam/global_snapshot.c new file mode 100644 index 0000000000..bac16828bb --- /dev/null +++ b/src/backend/access/transam/global_snapshot.c @@ -0,0 +1,755 @@ +/*------------------------------------------------------------------------- + * + * global_snapshot.c + * Support for cross-node snapshot isolation. + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/transam/global_snapshot.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/global_csn_log.h" +#include "access/global_snapshot.h" +#include "access/transam.h" +#include "access/twophase.h" +#include "access/xact.h" +#include "portability/instr_time.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/snapmgr.h" +#include "miscadmin.h" + +/* Raise a warning if imported global_csn exceeds ours by this value. */ +#define SNAP_DESYNC_COMPLAIN (1*NSECS_PER_SEC) /* 1 second */ + +/* + * GlobalSnapshotState + * + * Do not trust local clocks to be strictly monotonical and save last acquired + * value so later we can compare next timestamp with it. Accessed through + * GlobalSnapshotGenerate() and GlobalSnapshotSync(). + */ +typedef struct +{ + GlobalCSN last_global_csn; + volatile slock_t lock; +} GlobalSnapshotState; + +static GlobalSnapshotState *gsState; + + +/* + * GUC to delay advance of oldestXid for this amount of time. Also determines + * the size GlobalSnapshotXidMap circular buffer. + */ +int global_snapshot_defer_time; + +/* + * Enables this module. + */ +extern bool track_global_snapshots; + +/* + * GlobalSnapshotXidMap + * + * To be able to install global snapshot that points to past we need to keep + * old versions of tuples and therefore delay advance of oldestXid. Here we + * keep track of correspondence between snapshot's global_csn and oldestXid + * that was set at the time when the snapshot was taken. Much like the + * snapshot too old's OldSnapshotControlData does, but with finer granularity + * to seconds. + * + * Different strategies can be employed to hold oldestXid (e.g. we can track + * oldest global_csn-based snapshot among cluster nodes and map it oldestXid + * on each node) but here implemented one that tries to avoid cross-node + * communications which are tricky in case of postgres_fdw. + * + * On each snapshot acquisition GlobalSnapshotMapXmin() is called and stores + * correspondence between current global_csn and oldestXmin in a sparse way: + * global_csn is rounded to seconds (and here we use the fact that global_csn + * is just a timestamp) and oldestXmin is stored in the circular buffer where + * rounded global_csn acts as an offset from current circular buffer head. + * Size of the circular buffer is controlled by global_snapshot_defer_time GUC. + * + * When global snapshot arrives from different node we check that its + * global_csn is still in our map, otherwise we'll error out with "snapshot too + * old" message. If global_csn is successfully mapped to oldestXid we move + * backend's pgxact->xmin to proc->originalXmin and fill pgxact->xmin to + * mapped oldestXid. That way GetOldestXmin() can take into account backends + * with imported global snapshot and old tuple versions will be preserved. + * + * Also while calculating oldestXmin for our map in presence of imported + * global snapshots we should use proc->originalXmin instead of pgxact->xmin + * that was set during import. Otherwise, we can create a feedback loop: + * xmin's of imported global snapshots were calculated using our map and new + * entries in map going to be calculated based on that xmin's, and there is + * a risk to stuck forever with one non-increasing oldestXmin. All other + * callers of GetOldestXmin() are using pgxact->xmin so the old tuple versions + * are preserved. + */ +typedef struct GlobalSnapshotXidMap +{ + int head; /* offset of current freshest value */ + int size; /* total size of circular buffer */ + GlobalCSN_atomic last_csn_seconds; /* last rounded global_csn that changed + * xmin_by_second[] */ + TransactionId *xmin_by_second; /* circular buffer of oldestXmin's */ +} +GlobalSnapshotXidMap; + +static GlobalSnapshotXidMap *gsXidMap; + + +/* Estimate shared memory space needed */ +Size +GlobalSnapshotShmemSize(void) +{ + Size size = 0; + + if (track_global_snapshots || global_snapshot_defer_time > 0) + { + size += MAXALIGN(sizeof(GlobalSnapshotState)); + } + + if (global_snapshot_defer_time > 0) + { + size += sizeof(GlobalSnapshotXidMap); + size += global_snapshot_defer_time*sizeof(TransactionId); + size = MAXALIGN(size); + } + + return size; +} + +/* Init shared memory structures */ +void +GlobalSnapshotShmemInit() +{ + bool found; + + if (track_global_snapshots || global_snapshot_defer_time > 0) + { + gsState = ShmemInitStruct("gsState", + sizeof(GlobalSnapshotState), + &found); + if (!found) + { + gsState->last_global_csn = 0; + SpinLockInit(&gsState->lock); + } + } + + if (global_snapshot_defer_time > 0) + { + gsXidMap = ShmemInitStruct("gsXidMap", + sizeof(GlobalSnapshotXidMap), + &found); + if (!found) + { + int i; + + pg_atomic_init_u64(&gsXidMap->last_csn_seconds, 0); + gsXidMap->head = 0; + gsXidMap->size = global_snapshot_defer_time; + gsXidMap->xmin_by_second = + ShmemAlloc(sizeof(TransactionId)*gsXidMap->size); + + for (i = 0; i < gsXidMap->size; i++) + gsXidMap->xmin_by_second[i] = InvalidTransactionId; + } + } +} + +/* + * GlobalSnapshotStartup + * + * Set gsXidMap entries to oldestActiveXID during startup. + */ +void +GlobalSnapshotStartup(TransactionId oldestActiveXID) +{ + /* + * Run only if we have initialized shared memory and gsXidMap + * is enabled. + */ + if (IsNormalProcessingMode() && + track_global_snapshots && global_snapshot_defer_time > 0) + { + int i; + + Assert(TransactionIdIsValid(oldestActiveXID)); + for (i = 0; i < gsXidMap->size; i++) + gsXidMap->xmin_by_second[i] = oldestActiveXID; + ProcArraySetGlobalSnapshotXmin(oldestActiveXID); + } +} + +/* + * GlobalSnapshotMapXmin + * + * Maintain circular buffer of oldestXmins for several seconds in past. This + * buffer allows to shift oldestXmin in the past when backend is importing + * global transaction. Otherwise old versions of tuples that were needed for + * this transaction can be recycled by other processes (vacuum, HOT, etc). + * + * Locking here is not trivial. Called upon each snapshot creation after + * ProcArrayLock is released. Such usage creates several race conditions. It + * is possible that backend who got global_csn called GlobalSnapshotMapXmin() + * only after other backends managed to get snapshot and complete + * GlobalSnapshotMapXmin() call, or even committed. This is safe because + * + * * We already hold our xmin in MyPgXact, so our snapshot will not be + * harmed even though ProcArrayLock is released. + * + * * snapshot_global_csn is always pessmistically rounded up to the next + * second. + * + * * For performance reasons, xmin value for particular second is filled + * only once. Because of that instead of writing to buffer just our + * xmin (which is enough for our snapshot), we bump oldestXmin there -- + * it mitigates the possibility of damaging someone else's snapshot by + * writing to the buffer too advanced value in case of slowness of + * another backend who generated csn earlier, but didn't manage to + * insert it before us. + * + * * if GlobalSnapshotMapXmin() founds a gap in several seconds between + * current call and latest completed call then it should fill that gap + * with latest known values instead of new one. Otherwise it is + * possible (however highly unlikely) that this gap also happend + * between taking snapshot and call to GlobalSnapshotMapXmin() for some + * backend. And we are at risk to fill circullar buffer with + * oldestXmin's that are bigger then they actually were. + */ +void +GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn) +{ + int offset, gap, i; + GlobalCSN csn_seconds; + GlobalCSN last_csn_seconds; + volatile TransactionId oldest_deferred_xmin; + TransactionId current_oldest_xmin, previous_oldest_xmin; + + /* Callers should check config values */ + Assert(global_snapshot_defer_time > 0); + Assert(gsXidMap != NULL); + + /* + * Round up global_csn to the next second -- pessimistically and safely. + */ + csn_seconds = (snapshot_global_csn / NSECS_PER_SEC + 1); + + /* + * Fast-path check. Avoid taking exclusive GlobalSnapshotXidMapLock lock + * if oldestXid was already written to xmin_by_second[] for this rounded + * global_csn. + */ + if (pg_atomic_read_u64(&gsXidMap->last_csn_seconds) >= csn_seconds) + return; + + /* Ok, we have new entry (or entries) */ + LWLockAcquire(GlobalSnapshotXidMapLock, LW_EXCLUSIVE); + + /* Re-check last_csn_seconds under lock */ + last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds); + if (last_csn_seconds >= csn_seconds) + { + LWLockRelease(GlobalSnapshotXidMapLock); + return; + } + pg_atomic_write_u64(&gsXidMap->last_csn_seconds, csn_seconds); + + /* + * Count oldest_xmin. + * + * It was possible to calculate oldest_xmin during corresponding snapshot + * creation, but GetSnapshotData() intentionally reads only PgXact, but not + * PgProc. And we need info about originalXmin (see comment to gsXidMap) + * which is stored in PgProc because of threats in comments around PgXact + * about extending it with new fields. So just calculate oldest_xmin again, + * that anyway happens quite rarely. + */ + current_oldest_xmin = GetOldestXmin(NULL, PROCARRAY_NON_IMPORTED_XMIN); + + previous_oldest_xmin = gsXidMap->xmin_by_second[gsXidMap->head]; + + Assert(TransactionIdIsNormal(current_oldest_xmin)); + Assert(TransactionIdIsNormal(previous_oldest_xmin) || !track_global_snapshots); + + gap = csn_seconds - last_csn_seconds; + offset = csn_seconds % gsXidMap->size; + + /* Sanity check before we update head and gap */ + Assert( gap >= 1 ); + Assert( (gsXidMap->head + gap) % gsXidMap->size == offset ); + + gap = gap > gsXidMap->size ? gsXidMap->size : gap; + gsXidMap->head = offset; + + /* Fill new entry with current_oldest_xmin */ + gsXidMap->xmin_by_second[offset] = current_oldest_xmin; + + /* + * If we have gap then fill it with previous_oldest_xmin for reasons + * outlined in comment above this function. + */ + for (i = 1; i < gap; i++) + { + offset = (offset + gsXidMap->size - 1) % gsXidMap->size; + gsXidMap->xmin_by_second[offset] = previous_oldest_xmin; + } + + oldest_deferred_xmin = + gsXidMap->xmin_by_second[ (gsXidMap->head + 1) % gsXidMap->size ]; + + LWLockRelease(GlobalSnapshotXidMapLock); + + /* + * Advance procArray->global_snapshot_xmin after we released + * GlobalSnapshotXidMapLock. Since we gather not xmin but oldestXmin, it + * never goes backwards regardless of how slow we can do that. + */ + Assert(TransactionIdFollowsOrEquals(oldest_deferred_xmin, + ProcArrayGetGlobalSnapshotXmin())); + ProcArraySetGlobalSnapshotXmin(oldest_deferred_xmin); +} + + +/* + * GlobalSnapshotToXmin + * + * Get oldestXmin that took place when snapshot_global_csn was taken. + */ +TransactionId +GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn) +{ + TransactionId xmin; + GlobalCSN csn_seconds; + volatile GlobalCSN last_csn_seconds; + + /* Callers should check config values */ + Assert(global_snapshot_defer_time > 0); + Assert(gsXidMap != NULL); + + /* Round down to get conservative estimates */ + csn_seconds = (snapshot_global_csn / NSECS_PER_SEC); + + LWLockAcquire(GlobalSnapshotXidMapLock, LW_SHARED); + last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds); + if (csn_seconds > last_csn_seconds) + { + /* we don't have entry for this global_csn yet, return latest known */ + xmin = gsXidMap->xmin_by_second[gsXidMap->head]; + } + else if (last_csn_seconds - csn_seconds < gsXidMap->size) + { + /* we are good, retrieve value from our map */ + Assert(last_csn_seconds % gsXidMap->size == gsXidMap->head); + xmin = gsXidMap->xmin_by_second[csn_seconds % gsXidMap->size]; + } + else + { + /* requested global_csn is too old, let caller know */ + xmin = InvalidTransactionId; + } + LWLockRelease(GlobalSnapshotXidMapLock); + + return xmin; +} + +/* + * GlobalSnapshotGenerate + * + * Generate GlobalCSN which is actually a local time. Also we are forcing + * this time to be always increasing. Since now it is not uncommon to have + * millions of read transactions per second we are trying to use nanoseconds + * if such time resolution is available. + */ +GlobalCSN +GlobalSnapshotGenerate(bool locked) +{ + instr_time current_time; + GlobalCSN global_csn; + + Assert(track_global_snapshots || global_snapshot_defer_time > 0); + + /* + * TODO: create some macro that add small random shift to current time. + */ + INSTR_TIME_SET_CURRENT(current_time); + global_csn = (GlobalCSN) INSTR_TIME_GET_NANOSEC(current_time); + + /* TODO: change to atomics? */ + if (!locked) + SpinLockAcquire(&gsState->lock); + + if (global_csn <= gsState->last_global_csn) + global_csn = ++gsState->last_global_csn; + else + gsState->last_global_csn = global_csn; + + if (!locked) + SpinLockRelease(&gsState->lock); + + return global_csn; +} + +/* + * GlobalSnapshotSync + * + * Due to time desynchronization on different nodes we can receive global_csn + * which is greater than global_csn on this node. To preserve proper isolation + * this node needs to wait when such global_csn comes on local clock. + * + * This should happend relatively rare if nodes have running NTP/PTP/etc. + * Complain if wait time is more than SNAP_SYNC_COMPLAIN. + */ +void +GlobalSnapshotSync(GlobalCSN remote_gcsn) +{ + GlobalCSN local_gcsn; + GlobalCSN delta; + + Assert(track_global_snapshots); + + for(;;) + { + SpinLockAcquire(&gsState->lock); + if (gsState->last_global_csn > remote_gcsn) + { + /* Everything is fine */ + SpinLockRelease(&gsState->lock); + return; + } + else if ((local_gcsn = GlobalSnapshotGenerate(true)) >= remote_gcsn) + { + /* + * Everything is fine too, but last_global_csn wasn't updated for + * some time. + */ + SpinLockRelease(&gsState->lock); + return; + } + SpinLockRelease(&gsState->lock); + + /* Okay we need to sleep now */ + delta = remote_gcsn - local_gcsn; + if (delta > SNAP_DESYNC_COMPLAIN) + ereport(WARNING, + (errmsg("remote global snapshot exceeds ours by more than a second"), + errhint("Consider running NTPd on servers participating in global transaction"))); + + /* TODO: report this sleeptime somewhere? */ + pg_usleep((long) (delta/NSECS_PER_USEC)); + + /* + * Loop that checks to ensure that we actually slept for specified + * amount of time. + */ + } + + Assert(false); /* Should not happend */ + return; +} + +/* + * TransactionIdGetGlobalCSN + * + * Get GlobalCSN for specified TransactionId taking care about special xids, + * xids beyond TransactionXmin and InDoubt states. + */ +GlobalCSN +TransactionIdGetGlobalCSN(TransactionId xid) +{ + GlobalCSN global_csn; + + Assert(track_global_snapshots); + + /* Handle permanent TransactionId's for which we don't have mapping */ + if (!TransactionIdIsNormal(xid)) + { + if (xid == InvalidTransactionId) + return AbortedGlobalCSN; + if (xid == FrozenTransactionId || xid == BootstrapTransactionId) + return FrozenGlobalCSN; + Assert(false); /* Should not happend */ + } + + /* + * For xids which less then TransactionXmin GlobalCSNLog can be already + * trimmed but we know that such transaction is definetly not concurrently + * running according to any snapshot including timetravel ones. Callers + * should check TransactionDidCommit after. + */ + if (TransactionIdPrecedes(xid, TransactionXmin)) + return FrozenGlobalCSN; + + /* Read GlobalCSN from SLRU */ + global_csn = GlobalCSNLogGetCSN(xid); + + /* + * If we faced InDoubt state then transaction is beeing committed and we + * should wait until GlobalCSN will be assigned so that visibility check + * could decide whether tuple is in snapshot. See also comments in + * GlobalSnapshotPrecommit(). + */ + if (GlobalCSNIsInDoubt(global_csn)) + { + XactLockTableWait(xid, NULL, NULL, XLTW_None); + global_csn = GlobalCSNLogGetCSN(xid); + Assert(GlobalCSNIsNormal(global_csn) || + GlobalCSNIsAborted(global_csn)); + } + + Assert(GlobalCSNIsNormal(global_csn) || + GlobalCSNIsInProgress(global_csn) || + GlobalCSNIsAborted(global_csn)); + + return global_csn; +} + +/* + * XidInvisibleInGlobalSnapshot + * + * Version of XidInMVCCSnapshot for global transactions. For non-imported + * global snapshots this should give same results as XidInLocalMVCCSnapshot + * (except that aborts will be shown as invisible without going to clog) and to + * ensure such behaviour XidInMVCCSnapshot is coated with asserts that checks + * identicalness of XidInvisibleInGlobalSnapshot/XidInLocalMVCCSnapshot in + * case of ordinary snapshot. + */ +bool +XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot) +{ + GlobalCSN csn; + + Assert(track_global_snapshots); + + csn = TransactionIdGetGlobalCSN(xid); + + if (GlobalCSNIsNormal(csn)) + { + if (csn < snapshot->global_csn) + return false; + else + return true; + } + else if (GlobalCSNIsFrozen(csn)) + { + /* It is bootstrap or frozen transaction */ + return false; + } + else + { + /* It is aborted or in-progress */ + Assert(GlobalCSNIsAborted(csn) || GlobalCSNIsInProgress(csn)); + if (GlobalCSNIsAborted(csn)) + Assert(TransactionIdDidAbort(xid)); + return true; + } +} + + +/***************************************************************************** + * Functions to handle distributed commit on transaction coordinator: + * GlobalSnapshotPrepareCurrent() / GlobalSnapshotAssignCsnCurrent(). + * Correspoding functions for remote nodes are defined in twophase.c: + * pg_global_snapshot_prepare/pg_global_snapshot_assign. + *****************************************************************************/ + + +/* + * GlobalSnapshotPrepareCurrent + * + * Set InDoubt state for currently active transaction and return commit's + * global snapshot. + */ +GlobalCSN +GlobalSnapshotPrepareCurrent() +{ + TransactionId xid = GetCurrentTransactionIdIfAny(); + + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + if (TransactionIdIsValid(xid)) + { + TransactionId *subxids; + int nsubxids = xactGetCommittedChildren(&subxids); + GlobalCSNLogSetCSN(xid, nsubxids, + subxids, InDoubtGlobalCSN); + } + + /* Nothing to write if we don't have xid */ + + return GlobalSnapshotGenerate(false); +} + +/* + * GlobalSnapshotAssignCsnCurrent + * + * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly + * maximal among of values returned by GlobalSnapshotPrepareCurrent and + * pg_global_snapshot_prepare. + */ +void +GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn) +{ + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + if (!GlobalCSNIsNormal(global_csn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_global_snapshot_assign expects normal global_csn"))); + + /* Skip emtpty transactions */ + if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + return; + + /* Set global_csn and defuse ProcArrayEndTransaction from assigning one */ + pg_atomic_write_u64(&MyProc->assignedGlobalCsn, global_csn); +} + + +/***************************************************************************** + * Functions to handle global and local transactions commit. + * + * For local transactions GlobalSnapshotPrecommit sets InDoubt state before + * ProcArrayEndTransaction is called and transaction data potetntially becomes + * visible to other backends. ProcArrayEndTransaction (or ProcArrayRemove in + * twophase case) then acquires global_csn under ProcArray lock and stores it + * in proc->assignedGlobalCsn. It's important that global_csn for commit is + * generated under ProcArray lock, otherwise global and local snapshots won't + * be equivalent. Consequent call to GlobalSnapshotCommit will write + * proc->assignedGlobalCsn to GlobalCSNLog. + * + * Same rules applies to global transaction, except that global_csn is already + * assigned by GlobalSnapshotAssignCsnCurrent/pg_global_snapshot_assign and + * GlobalSnapshotPrecommit is basically no-op. + * + * GlobalSnapshotAbort is slightly different comparing to commit because abort + * can skip InDoubt phase and can be called for transaction subtree. + *****************************************************************************/ + + +/* + * GlobalSnapshotAbort + * + * Abort transaction in GlobalCsnLog. We can skip InDoubt state for aborts + * since no concurrent transactions allowed to see aborted data anyway. + */ +void +GlobalSnapshotAbort(PGPROC *proc, TransactionId xid, + int nsubxids, TransactionId *subxids) +{ + if (!track_global_snapshots) + return; + + GlobalCSNLogSetCSN(xid, nsubxids, subxids, AbortedGlobalCSN); + + /* + * Clean assignedGlobalCsn anyway, as it was possibly set in + * GlobalSnapshotAssignCsnCurrent. + */ + pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN); +} + +/* + * GlobalSnapshotPrecommit + * + * Set InDoubt status for local transaction that we are going to commit. + * This step is needed to achieve consistency between local snapshots and + * global csn-based snapshots. We don't hold ProcArray lock while writing + * csn for transaction in SLRU but instead we set InDoubt status before + * transaction is deleted from ProcArray so the readers who will read csn + * in the gap between ProcArray removal and GlobalCSN assignment can wait + * until GlobalCSN is finally assigned. See also TransactionIdGetGlobalCSN(). + * + * For global transaction this does nothing as InDoubt state was written + * earlier. + * + * This should be called only from parallel group leader before backend is + * deleted from ProcArray. + */ +void +GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid, + int nsubxids, TransactionId *subxids) +{ + GlobalCSN oldAssignedGlobalCsn = InProgressGlobalCSN; + bool in_progress; + + if (!track_global_snapshots) + return; + + /* Set InDoubt status if it is local transaction */ + in_progress = pg_atomic_compare_exchange_u64(&proc->assignedGlobalCsn, + &oldAssignedGlobalCsn, + InDoubtGlobalCSN); + if (in_progress) + { + Assert(GlobalCSNIsInProgress(oldAssignedGlobalCsn)); + GlobalCSNLogSetCSN(xid, nsubxids, + subxids, InDoubtGlobalCSN); + } + else + { + /* Otherwise we should have valid GlobalCSN by this time */ + Assert(GlobalCSNIsNormal(oldAssignedGlobalCsn)); + /* Also global transaction should already be in InDoubt state */ + Assert(GlobalCSNIsInDoubt(GlobalCSNLogGetCSN(xid))); + } +} + +/* + * GlobalSnapshotCommit + * + * Write GlobalCSN that were acquired earlier to GlobalCsnLog. Should be + * preceded by GlobalSnapshotPrecommit() so readers can wait until we finally + * finished writing to SLRU. + * + * Should be called after ProcArrayEndTransaction, but before releasing + * transaction locks, so that TransactionIdGetGlobalCSN can wait on this + * lock for GlobalCSN. + */ +void +GlobalSnapshotCommit(PGPROC *proc, TransactionId xid, + int nsubxids, TransactionId *subxids) +{ + volatile GlobalCSN assigned_global_csn; + + if (!track_global_snapshots) + return; + + if (!TransactionIdIsValid(xid)) + { + assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn); + Assert(GlobalCSNIsInProgress(assigned_global_csn)); + return; + } + + /* Finally write resulting GlobalCSN in SLRU */ + assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn); + Assert(GlobalCSNIsNormal(assigned_global_csn)); + GlobalCSNLogSetCSN(xid, nsubxids, + subxids, assigned_global_csn); + + /* Reset for next transaction */ + pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN); +} diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 0ecc02a3dd..c5d59fa2f2 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -77,6 +77,7 @@ #include <unistd.h> #include "access/commit_ts.h" +#include "access/global_snapshot.h" #include "access/global_csn_log.h" #include "access/htup_details.h" #include "access/subtrans.h" @@ -1477,8 +1478,34 @@ FinishPreparedTransaction(const char *gid, bool isCommit) hdr->nabortrels, abortrels, gid); + /* + * GlobalSnapshot callbacks that should be called right before we are + * going to become visible. Details in comments to this functions. + */ + if (isCommit) + GlobalSnapshotPrecommit(proc, xid, hdr->nsubxacts, children); + else + GlobalSnapshotAbort(proc, xid, hdr->nsubxacts, children); + + ProcArrayRemove(proc, latestXid); + /* + * Stamp our transaction with GlobalCSN in GlobalCsnLog. + * Should be called after ProcArrayEndTransaction, but before releasing + * transaction locks, since TransactionIdGetGlobalCSN relies on + * XactLockTableWait to await global_csn. + */ + if (isCommit) + { + GlobalSnapshotCommit(proc, xid, hdr->nsubxacts, children); + } + else + { + Assert(GlobalCSNIsInProgress( + pg_atomic_read_u64(&proc->assignedGlobalCsn))); + } + /* * In case we fail while running the callbacks, mark the gxact invalid so * no one else will try to commit/rollback, and so it will be recycled if @@ -2439,3 +2466,132 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) RemoveTwoPhaseFile(xid, giveWarning); RemoveGXact(gxact); } + +/* + * GlobalSnapshotPrepareTwophase + * + * Set InDoubt state for currently active transaction and return commit's + * global snapshot. + * + * This function is a counterpart of GlobalSnapshotPrepareCurrent() for + * twophase transactions. + */ +static GlobalCSN +GlobalSnapshotPrepareTwophase(const char *gid) +{ + GlobalTransaction gxact; + PGXACT *pgxact; + char *buf; + TransactionId xid; + xl_xact_parsed_prepare parsed; + + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + /* + * Validate the GID, and lock the GXACT to ensure that two backends do not + * try to access the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + xid = pgxact->xid; + + if (gxact->ondisk) + buf = ReadTwoPhaseFile(xid, true); + else + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); + + ParsePrepareRecord(0, (xl_xact_prepare *) buf, &parsed); + + GlobalCSNLogSetCSN(xid, parsed.nsubxacts, + parsed.subxacts, InDoubtGlobalCSN); + + /* Unlock our GXACT */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + gxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); + + pfree(buf); + + return GlobalSnapshotGenerate(false); +} + +/* + * SQL interface to GlobalSnapshotPrepareTwophase() + * + * TODO: Rewrite this as PREPARE TRANSACTION 'gid' RETURNING SNAPSHOT + */ +Datum +pg_global_snapshot_prepare(PG_FUNCTION_ARGS) +{ + const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0)); + GlobalCSN global_csn; + + global_csn = GlobalSnapshotPrepareTwophase(gid); + + PG_RETURN_INT64(global_csn); +} + + +/* + * TwoPhaseAssignGlobalCsn + * + * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly + * maximal among of values returned by GlobalSnapshotPrepareCurrent and + * pg_global_snapshot_prepare. + * + * This function is a counterpart of GlobalSnapshotAssignCsnCurrent() for + * twophase transactions. + */ +static void +GlobalSnapshotAssignCsnTwoPhase(const char *gid, GlobalCSN global_csn) +{ + GlobalTransaction gxact; + PGPROC *proc; + + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + if (!GlobalCSNIsNormal(global_csn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_global_snapshot_assign expects normal global_csn"))); + + /* + * Validate the GID, and lock the GXACT to ensure that two backends do not + * try to access the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + proc = &ProcGlobal->allProcs[gxact->pgprocno]; + + /* Set global_csn and defuse ProcArrayRemove from assigning one. */ + pg_atomic_write_u64(&proc->assignedGlobalCsn, global_csn); + + /* Unlock our GXACT */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + gxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); +} + +/* + * SQL interface to GlobalSnapshotAssignCsnTwoPhase() + * + * TODO: Rewrite this as COMMIT PREPARED 'gid' SNAPSHOT 'global_csn' + */ +Datum +pg_global_snapshot_assign(PG_FUNCTION_ARGS) +{ + const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0)); + GlobalCSN global_csn = PG_GETARG_INT64(1); + + GlobalSnapshotAssignCsnTwoPhase(gid, global_csn); + PG_RETURN_VOID(); +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 3984dd3e1a..8fddb6edaf 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -21,6 +21,7 @@ #include <unistd.h> #include "access/commit_ts.h" +#include "access/global_snapshot.h" #include "access/multixact.h" #include "access/parallel.h" #include "access/subtrans.h" @@ -1433,6 +1434,14 @@ RecordTransactionCommit(void) /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd = 0; + + /* + * Mark our transaction as InDoubt in GlobalCsnLog and get ready for + * commit. + */ + if (markXidCommitted) + GlobalSnapshotPrecommit(MyProc, xid, nchildren, children); + cleanup: /* Clean up local data */ if (rels) @@ -1694,6 +1703,11 @@ RecordTransactionAbort(bool isSubXact) */ TransactionIdAbortTree(xid, nchildren, children); + /* + * Mark our transaction as Aborted in GlobalCsnLog. + */ + GlobalSnapshotAbort(MyProc, xid, nchildren, children); + END_CRIT_SECTION(); /* Compute latestXid while we have the child XIDs handy */ @@ -2183,6 +2197,21 @@ CommitTransaction(void) */ ProcArrayEndTransaction(MyProc, latestXid); + /* + * Stamp our transaction with GlobalCSN in GlobalCsnLog. + * Should be called after ProcArrayEndTransaction, but before releasing + * transaction locks. + */ + if (!is_parallel_worker) + { + TransactionId xid = GetTopTransactionIdIfAny(); + TransactionId *subxids; + int nsubxids; + + nsubxids = xactGetCommittedChildren(&subxids); + GlobalSnapshotCommit(MyProc, xid, nsubxids, subxids); + } + /* * This is all post-commit cleanup. Note that if an error is raised here, * it's too late to abort the transaction. This should be just diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 285d9d442e..b485db5456 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7058,6 +7058,7 @@ StartupXLOG(void) StartupCLOG(); StartupGlobalCSNLog(oldestActiveXID); StartupSUBTRANS(oldestActiveXID); + GlobalSnapshotStartup(oldestActiveXID); /* * If we're beginning at a shutdown checkpoint, we know that @@ -7876,6 +7877,7 @@ StartupXLOG(void) StartupCLOG(); StartupGlobalCSNLog(oldestActiveXID); StartupSUBTRANS(oldestActiveXID); + GlobalSnapshotStartup(oldestActiveXID); } /* diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index dc2d2959c4..d1819dc2c8 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -22,6 +22,7 @@ #include "access/nbtree.h" #include "access/subtrans.h" #include "access/twophase.h" +#include "access/global_snapshot.h" #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" @@ -145,6 +146,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, WalSndShmemSize()); size = add_size(size, WalRcvShmemSize()); size = add_size(size, ApplyLauncherShmemSize()); + size = add_size(size, GlobalSnapshotShmemSize()); size = add_size(size, SnapMgrShmemSize()); size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); @@ -266,6 +268,7 @@ CreateSharedMemoryAndSemaphores(void) BTreeShmemInit(); SyncScanShmemInit(); AsyncShmemInit(); + GlobalSnapshotShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 8ae4906474..23db5039a4 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -47,6 +47,7 @@ #include "access/clog.h" #include "access/global_csn_log.h" +#include "access/global_snapshot.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/twophase.h" @@ -95,6 +96,8 @@ typedef struct ProcArrayStruct TransactionId replication_slot_xmin; /* oldest catalog xmin of any replication slot */ TransactionId replication_slot_catalog_xmin; + /* xmin of oldest active global snapshot */ + TransactionId global_snapshot_xmin; /* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */ int pgprocnos[FLEXIBLE_ARRAY_MEMBER]; @@ -250,6 +253,7 @@ CreateSharedProcArray(void) procArray->lastOverflowedXid = InvalidTransactionId; procArray->replication_slot_xmin = InvalidTransactionId; procArray->replication_slot_catalog_xmin = InvalidTransactionId; + procArray->global_snapshot_xmin = InvalidTransactionId; } allProcs = ProcGlobal->allProcs; @@ -355,6 +359,17 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid) if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, latestXid)) ShmemVariableCache->latestCompletedXid = latestXid; + + /* + * Assign global csn while holding ProcArrayLock for non-global + * COMMIT PREPARED. After lock is released consequent + * GlobalSnapshotCommit() will write this value to GlobalCsnLog. + * + * In case of global commit proc->assignedGlobalCsn is already set + * by prior AssignGlobalCsn(). + */ + if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn))) + pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false)); } else { @@ -435,6 +450,8 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; + proc->originalXmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; proc->delayChkpt = false; /* be sure this is cleared in abort */ @@ -457,6 +474,8 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact, pgxact->xid = InvalidTransactionId; proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; + proc->originalXmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; proc->delayChkpt = false; /* be sure this is cleared in abort */ @@ -470,6 +489,20 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact, if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, latestXid)) ShmemVariableCache->latestCompletedXid = latestXid; + + /* + * Assign global csn while holding ProcArrayLock for non-global + * COMMIT. After lock is released consequent GlobalSnapshotFinish() will + * write this value to GlobalCsnLog. + * + * In case of global commit MyProc->assignedGlobalCsn is already set + * by prior AssignGlobalCsn(). + * + * TODO: in case of group commit we can generate one GlobalSnapshot for + * whole group to save time on timestamp aquisition. + */ + if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn))) + pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false)); } /* @@ -613,6 +646,7 @@ ProcArrayClearTransaction(PGPROC *proc) pgxact->xid = InvalidTransactionId; proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; + proc->originalXmin = InvalidTransactionId; proc->recoveryConflictPending = false; /* redundant, but just in case */ @@ -1315,6 +1349,7 @@ GetOldestXmin(Relation rel, int flags) TransactionId replication_slot_xmin = InvalidTransactionId; TransactionId replication_slot_catalog_xmin = InvalidTransactionId; + TransactionId global_snapshot_xmin = InvalidTransactionId; /* * If we're not computing a relation specific limit, or if a shared @@ -1351,8 +1386,9 @@ GetOldestXmin(Relation rel, int flags) proc->databaseId == MyDatabaseId || proc->databaseId == 0) /* always include WalSender */ { - /* Fetch xid just once - see GetNewTransactionId */ + /* Fetch both xids just once - see GetNewTransactionId */ TransactionId xid = UINT32_ACCESS_ONCE(pgxact->xid); + TransactionId original_xmin = UINT32_ACCESS_ONCE(proc->originalXmin); /* First consider the transaction's own Xid, if any */ if (TransactionIdIsNormal(xid) && @@ -1365,8 +1401,17 @@ GetOldestXmin(Relation rel, int flags) * We must check both Xid and Xmin because a transaction might * have an Xmin but not (yet) an Xid; conversely, if it has an * Xid, that could determine some not-yet-set Xmin. + * + * In case of oldestXmin calculation for GlobalSnapshotMapXmin() + * pgxact->xmin should be changed to proc->originalXmin. Details + * in commets to GlobalSnapshotMapXmin. */ - xid = UINT32_ACCESS_ONCE(pgxact->xmin); + if ((flags & PROCARRAY_NON_IMPORTED_XMIN) && + TransactionIdIsValid(original_xmin)) + xid = original_xmin; + else + xid = UINT32_ACCESS_ONCE(pgxact->xmin); + if (TransactionIdIsNormal(xid) && TransactionIdPrecedes(xid, result)) result = xid; @@ -1380,6 +1425,7 @@ GetOldestXmin(Relation rel, int flags) */ replication_slot_xmin = procArray->replication_slot_xmin; replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + global_snapshot_xmin = ProcArrayGetGlobalSnapshotXmin(); if (RecoveryInProgress()) { @@ -1421,6 +1467,11 @@ GetOldestXmin(Relation rel, int flags) result = FirstNormalTransactionId; } + if (!(flags & PROCARRAY_NON_IMPORTED_XMIN) && + TransactionIdIsValid(global_snapshot_xmin) && + NormalTransactionIdPrecedes(global_snapshot_xmin, result)) + result = global_snapshot_xmin; + /* * Check whether there are replication slots requiring an older xmin. */ @@ -1515,8 +1566,10 @@ GetSnapshotData(Snapshot snapshot) int count = 0; int subcount = 0; bool suboverflowed = false; + GlobalCSN global_csn = FrozenGlobalCSN; TransactionId replication_slot_xmin = InvalidTransactionId; TransactionId replication_slot_catalog_xmin = InvalidTransactionId; + TransactionId global_snapshot_xmin = InvalidTransactionId; Assert(snapshot != NULL); @@ -1708,10 +1761,18 @@ GetSnapshotData(Snapshot snapshot) */ replication_slot_xmin = procArray->replication_slot_xmin; replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + global_snapshot_xmin = ProcArrayGetGlobalSnapshotXmin(); if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin; + /* + * Take GlobalCSN under ProcArrayLock so the local/global snapshot stays + * synchronized. + */ + if (track_global_snapshots) + global_csn = GlobalSnapshotGenerate(false); + LWLockRelease(ProcArrayLock); /* @@ -1727,6 +1788,10 @@ GetSnapshotData(Snapshot snapshot) if (!TransactionIdIsNormal(RecentGlobalXmin)) RecentGlobalXmin = FirstNormalTransactionId; + if (/*track_global_snapshots && */TransactionIdIsValid(global_snapshot_xmin) && + TransactionIdPrecedes(global_snapshot_xmin, RecentGlobalXmin)) + RecentGlobalXmin = global_snapshot_xmin; + /* Check whether there's a replication slot requiring an older xmin. */ if (TransactionIdIsValid(replication_slot_xmin) && NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin)) @@ -1782,6 +1847,11 @@ GetSnapshotData(Snapshot snapshot) MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin); } + snapshot->imported_global_csn = false; + snapshot->global_csn = global_csn; + if (global_snapshot_defer_time > 0 && IsUnderPostmaster) + GlobalSnapshotMapXmin(snapshot->global_csn); + return snapshot; } @@ -3129,6 +3199,24 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin, LWLockRelease(ProcArrayLock); } +/* + * ProcArraySetGlobalSnapshotXmin + */ +void +ProcArraySetGlobalSnapshotXmin(TransactionId xmin) +{ + /* We rely on atomic fetch/store of xid */ + procArray->global_snapshot_xmin = xmin; +} + +/* + * ProcArrayGetGlobalSnapshotXmin + */ +TransactionId +ProcArrayGetGlobalSnapshotXmin(void) +{ + return procArray->global_snapshot_xmin; +} #define XidCacheRemove(i) \ do { \ diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index fe18c93b61..86d0a0acae 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -50,3 +50,4 @@ OldSnapshotTimeMapLock 42 LogicalRepWorkerLock 43 CLogTruncationLock 44 GlobalCSNLogControlLock 45 +GlobalSnapshotXidMapLock 46 diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 5aa19d3f78..8a47a2d375 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -37,6 +37,7 @@ #include "access/transam.h" #include "access/twophase.h" +#include "access/global_snapshot.h" #include "access/xact.h" #include "miscadmin.h" #include "pgstat.h" @@ -441,6 +442,9 @@ InitProcess(void) MyProc->clogGroupMemberLsn = InvalidXLogRecPtr; Assert(pg_atomic_read_u32(&MyProc->clogGroupNext) == INVALID_PGPROCNO); + MyProc->originalXmin = InvalidTransactionId; + pg_atomic_init_u64(&MyProc->assignedGlobalCsn, InProgressGlobalCSN); + /* * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch * on it. That allows us to repoint the process latch, which so far @@ -584,6 +588,7 @@ InitAuxiliaryProcess(void) MyProc->lwWaitMode = 0; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; + MyProc->originalXmin = InvalidTransactionId; #ifdef USE_ASSERT_CHECKING { int i; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 0ca331c6f9..7154c3499e 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -28,6 +28,7 @@ #include "access/commit_ts.h" #include "access/gin.h" +#include "access/global_snapshot.h" #include "access/rmgr.h" #include "access/tableam.h" #include "access/transam.h" @@ -1181,7 +1182,7 @@ static struct config_bool ConfigureNamesBool[] = gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.") }, &track_global_snapshots, - true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */ + false, /* XXX: Seems that RESOURCES_MEM isn't the best catagory */ NULL, NULL, NULL }, { @@ -2495,6 +2496,16 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"global_snapshot_defer_time", PGC_POSTMASTER, REPLICATION_MASTER, + gettext_noop("Minimal age of records which allowed to be vacuumed, in seconds."), + NULL + }, + &global_snapshot_defer_time, + 5, 0, INT_MAX, + NULL, NULL, NULL + }, + /* * See also CheckRequiredParameterValues() if this parameter changes */ diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 995b6ca155..0fd7d8501c 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -306,6 +306,8 @@ # and comma-separated list of application_name # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed +#global_snapshot_defer_time = 0 # minimal age of records which allowed to be + # vacuumed, in seconds # - Standby Servers - diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 1c063c592c..3d925a7866 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -48,6 +48,7 @@ #include <sys/stat.h> #include <unistd.h> +#include "access/global_snapshot.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" @@ -247,6 +248,8 @@ typedef struct SerializedSnapshotData CommandId curcid; TimestampTz whenTaken; XLogRecPtr lsn; + GlobalCSN global_csn; + bool imported_global_csn; } SerializedSnapshotData; Size @@ -1024,7 +1027,9 @@ SnapshotResetXmin(void) pairingheap_first(&RegisteredSnapshots)); if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin)) + { MyPgXact->xmin = minSnapshot->xmin; + } } /* @@ -2115,6 +2120,8 @@ SerializeSnapshot(Snapshot snapshot, char *start_address) serialized_snapshot.curcid = snapshot->curcid; serialized_snapshot.whenTaken = snapshot->whenTaken; serialized_snapshot.lsn = snapshot->lsn; + serialized_snapshot.global_csn = snapshot->global_csn; + serialized_snapshot.imported_global_csn = snapshot->imported_global_csn; /* * Ignore the SubXID array if it has overflowed, unless the snapshot was @@ -2189,6 +2196,8 @@ RestoreSnapshot(char *start_address) snapshot->curcid = serialized_snapshot.curcid; snapshot->whenTaken = serialized_snapshot.whenTaken; snapshot->lsn = serialized_snapshot.lsn; + snapshot->global_csn = serialized_snapshot.global_csn; + snapshot->imported_global_csn = serialized_snapshot.imported_global_csn; /* Copy XIDs, if present. */ if (serialized_snapshot.xcnt > 0) @@ -2228,8 +2237,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc) } /* - * XidInMVCCSnapshot - * Is the given XID still-in-progress according to the snapshot? + * XidInLocalMVCCSnapshot + * Is the given XID still-in-progress according to the local snapshot? * * Note: GetSnapshotData never stores either top xid or subxids of our own * backend into a snapshot, so these xids will not be reported as "running" @@ -2237,8 +2246,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc) * TransactionIdIsCurrentTransactionId first, except when it's known the * XID could not be ours anyway. */ -bool -XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) +static bool +XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot) { uint32 i; @@ -2348,3 +2357,153 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) return false; } + +/* + * XidInMVCCSnapshot + * + * Check whether this xid is in snapshot, taking into account fact that + * snapshot can be global. When track_global_snapshots is switched off + * just call XidInLocalMVCCSnapshot(). + */ +bool +XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) +{ + bool in_snapshot; + + if (snapshot->imported_global_csn) + { + Assert(track_global_snapshots); + /* No point to using snapshot info except CSN */ + return XidInvisibleInGlobalSnapshot(xid, snapshot); + } + + in_snapshot = XidInLocalMVCCSnapshot(xid, snapshot); + + if (!track_global_snapshots) + { + Assert(GlobalCSNIsFrozen(snapshot->global_csn)); + return in_snapshot; + } + + if (in_snapshot) + { + /* + * This xid may be already in unknown state and in that case + * we must wait and recheck. + * + * TODO: this check can be skipped if we know for sure that there were + * no global transactions when this snapshot was taken. That requires + * some changes to mechanisms of global snapshots exprot/import (if + * backend set xmin then we should have a-priori knowledge that this + * transaction going to be global or local -- right now this is not + * enforced). Leave that for future and don't complicate this patch. + */ + return XidInvisibleInGlobalSnapshot(xid, snapshot); + } + else + { +#ifdef USE_ASSERT_CHECKING + /* Check that global snapshot gives the same results as local one */ + if (XidInvisibleInGlobalSnapshot(xid, snapshot)) + { + GlobalCSN gcsn = TransactionIdGetGlobalCSN(xid); + Assert(GlobalCSNIsAborted(gcsn)); + } +#endif + return false; + } +} + +/* + * ExportGlobalSnapshot + * + * Export global_csn so that caller can expand this transaction to other + * nodes. + * + * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and + * add some additional checks that transaction did not yet acquired xid, but + * for current iteration of this patch I don't want to hack on parser. + */ +GlobalCSN +ExportGlobalSnapshot() +{ + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not export global snapshot"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + return CurrentSnapshot->global_csn; +} + +/* SQL accessor to ExportGlobalSnapshot() */ +Datum +pg_global_snapshot_export(PG_FUNCTION_ARGS) +{ + GlobalCSN global_csn = ExportGlobalSnapshot(); + PG_RETURN_UINT64(global_csn); +} + +/* + * ImportGlobalSnapshot + * + * Import global_csn and retract this backends xmin to the value that was + * actual when we had such global_csn. + * + * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and + * add some additional checks that transaction did not yet acquired xid, but + * for current iteration of this patch I don't want to hack on parser. + */ +void +ImportGlobalSnapshot(GlobalCSN snap_global_csn) +{ + volatile TransactionId xmin; + + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import global snapshot"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + if (global_snapshot_defer_time <= 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import global snapshot"), + errhint("Make sure the configuration parameter \"%s\" is positive.", + "global_snapshot_defer_time"))); + + /* + * Call GlobalSnapshotToXmin under ProcArrayLock to avoid situation that + * resulting xmin will be evicted from map before we will set it into our + * backend's xmin. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + xmin = GlobalSnapshotToXmin(snap_global_csn); + if (!TransactionIdIsValid(xmin)) + { + LWLockRelease(ProcArrayLock); + elog(ERROR, "GlobalSnapshotToXmin: global snapshot too old"); + } + MyProc->originalXmin = MyPgXact->xmin; + MyPgXact->xmin = TransactionXmin = xmin; + LWLockRelease(ProcArrayLock); + + CurrentSnapshot->xmin = xmin; /* defuse SnapshotResetXmin() */ + CurrentSnapshot->global_csn = snap_global_csn; + CurrentSnapshot->imported_global_csn = true; + GlobalSnapshotSync(snap_global_csn); + + Assert(TransactionIdPrecedesOrEquals(RecentGlobalXmin, xmin)); + Assert(TransactionIdPrecedesOrEquals(RecentGlobalDataXmin, xmin)); +} + +/* SQL accessor to ImportGlobalSnapshot() */ +Datum +pg_global_snapshot_import(PG_FUNCTION_ARGS) +{ + GlobalCSN global_csn = PG_GETARG_UINT64(0); + ImportGlobalSnapshot(global_csn); + PG_RETURN_VOID(); +} diff --git a/src/include/access/global_snapshot.h b/src/include/access/global_snapshot.h new file mode 100644 index 0000000000..246b180cfd --- /dev/null +++ b/src/include/access/global_snapshot.h @@ -0,0 +1,72 @@ +/*------------------------------------------------------------------------- + * + * global_snapshot.h + * Support for cross-node snapshot isolation. + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/global_snapshot.h + * + *------------------------------------------------------------------------- + */ +#ifndef GLOBAL_SNAPSHOT_H +#define GLOBAL_SNAPSHOT_H + +#include "port/atomics.h" +#include "storage/lock.h" +#include "utils/snapshot.h" +#include "utils/guc.h" + +/* + * snapshot.h is used in frontend code so atomic variant of GlobalCSN type + * is defined here. + */ +typedef pg_atomic_uint64 GlobalCSN_atomic; + +#define InProgressGlobalCSN UINT64CONST(0x0) +#define AbortedGlobalCSN UINT64CONST(0x1) +#define FrozenGlobalCSN UINT64CONST(0x2) +#define InDoubtGlobalCSN UINT64CONST(0x3) +#define FirstNormalGlobalCSN UINT64CONST(0x4) + +#define GlobalCSNIsInProgress(csn) ((csn) == InProgressGlobalCSN) +#define GlobalCSNIsAborted(csn) ((csn) == AbortedGlobalCSN) +#define GlobalCSNIsFrozen(csn) ((csn) == FrozenGlobalCSN) +#define GlobalCSNIsInDoubt(csn) ((csn) == InDoubtGlobalCSN) +#define GlobalCSNIsNormal(csn) ((csn) >= FirstNormalGlobalCSN) + + +extern int global_snapshot_defer_time; + + +extern Size GlobalSnapshotShmemSize(void); +extern void GlobalSnapshotShmemInit(void); +extern void GlobalSnapshotStartup(TransactionId oldestActiveXID); + +extern void GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn); +extern TransactionId GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn); + +extern GlobalCSN GlobalSnapshotGenerate(bool locked); + +extern bool XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot); + +extern void GlobalSnapshotSync(GlobalCSN remote_gcsn); + +extern GlobalCSN TransactionIdGetGlobalCSN(TransactionId xid); + +extern GlobalCSN GlobalSnapshotPrepareGlobal(const char *gid); +extern void GlobalSnapshotAssignCsnGlobal(const char *gid, + GlobalCSN global_csn); + +extern GlobalCSN GlobalSnapshotPrepareCurrent(void); +extern void GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn); + +extern void GlobalSnapshotAbort(PGPROC *proc, TransactionId xid, int nsubxids, + TransactionId *subxids); +extern void GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids, + TransactionId *subxids); +extern void GlobalSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids, + TransactionId *subxids); + +#endif /* GLOBAL_SNAPSHOT_H */ diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 2ca71c3445..b4899f3754 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -18,6 +18,7 @@ #include "access/xlogdefs.h" #include "datatype/timestamp.h" #include "storage/lock.h" +#include "utils/snapshot.h" /* * GlobalTransactionData is defined in twophase.c; other places have no diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index a85c78e796..64c3c71df3 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10945,4 +10945,18 @@ proname => 'is_normalized', prorettype => 'bool', proargtypes => 'text text', prosrc => 'unicode_is_normalized' }, +# global transaction handling +{ oid => '4388', descr => 'export global transaction snapshot', + proname => 'pg_global_snapshot_export', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => '', prosrc => 'pg_global_snapshot_export' }, +{ oid => '4389', descr => 'import global transaction snapshot', + proname => 'pg_global_snapshot_import', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => 'int8', prosrc => 'pg_global_snapshot_import' }, +{ oid => '4390', descr => 'prepare distributed transaction for commit, get global_csn', + proname => 'pg_global_snapshot_prepare', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => 'text', prosrc => 'pg_global_snapshot_prepare' }, +{ oid => '4391', descr => 'assign global_csn to distributed transaction', + proname => 'pg_global_snapshot_assign', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => 'text int8', prosrc => 'pg_global_snapshot_assign' }, + ] diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h index 6be6d35d1e..583b1beea5 100644 --- a/src/include/datatype/timestamp.h +++ b/src/include/datatype/timestamp.h @@ -93,6 +93,9 @@ typedef struct #define USECS_PER_MINUTE INT64CONST(60000000) #define USECS_PER_SEC INT64CONST(1000000) +#define NSECS_PER_SEC INT64CONST(1000000000) +#define NSECS_PER_USEC INT64CONST(1000) + /* * We allow numeric timezone offsets up to 15:59:59 either way from Greenwich. * Currently, the record holders for wackiest offsets in actual use are zones diff --git a/src/include/fmgr.h b/src/include/fmgr.h index d349510b7c..5cdf2e17cb 100644 --- a/src/include/fmgr.h +++ b/src/include/fmgr.h @@ -280,6 +280,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum); #define PG_GETARG_FLOAT4(n) DatumGetFloat4(PG_GETARG_DATUM(n)) #define PG_GETARG_FLOAT8(n) DatumGetFloat8(PG_GETARG_DATUM(n)) #define PG_GETARG_INT64(n) DatumGetInt64(PG_GETARG_DATUM(n)) +#define PG_GETARG_UINT64(n) DatumGetUInt64(PG_GETARG_DATUM(n)) /* use this if you want the raw, possibly-toasted input datum: */ #define PG_GETARG_RAW_VARLENA_P(n) ((struct varlena *) PG_GETARG_POINTER(n)) /* use this if you want the input datum de-toasted: */ diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h index d6459327cc..4ac23da654 100644 --- a/src/include/portability/instr_time.h +++ b/src/include/portability/instr_time.h @@ -141,6 +141,9 @@ typedef struct timespec instr_time; #define INSTR_TIME_GET_MICROSEC(t) \ (((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) ((t).tv_nsec / 1000)) +#define INSTR_TIME_GET_NANOSEC(t) \ + (((uint64) (t).tv_sec * (uint64) 1000000000) + (uint64) ((t).tv_nsec)) + #else /* !HAVE_CLOCK_GETTIME */ /* Use gettimeofday() */ @@ -205,6 +208,10 @@ typedef struct timeval instr_time; #define INSTR_TIME_GET_MICROSEC(t) \ (((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) (t).tv_usec) +#define INSTR_TIME_GET_NANOSEC(t) \ + (((uint64) (t).tv_sec * (uint64) 1000000000) + \ + (uint64) (t).tv_usec * (uint64) 1000) + #endif /* HAVE_CLOCK_GETTIME */ #else /* WIN32 */ @@ -237,6 +244,9 @@ typedef LARGE_INTEGER instr_time; #define INSTR_TIME_GET_MICROSEC(t) \ ((uint64) (((double) (t).QuadPart * 1000000.0) / GetTimerFrequency())) +#define INSTR_TIME_GET_NANOSEC(t) \ + ((uint64) (((double) (t).QuadPart * 1000000000.0) / GetTimerFrequency())) + static inline double GetTimerFrequency(void) { diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index ae4f573ab4..da84dbf04c 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -15,8 +15,10 @@ #define _PROC_H_ #include "access/clog.h" +#include "access/global_snapshot.h" #include "access/xlogdefs.h" #include "lib/ilist.h" +#include "utils/snapshot.h" #include "storage/latch.h" #include "storage/lock.h" #include "storage/pg_sema.h" @@ -57,6 +59,7 @@ struct XidCache #define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical * decoding outside xact */ #define PROC_RESERVED 0x20 /* reserved for procarray */ +#define PROC_RESERVED2 0x40 /* reserved for procarray */ /* flags reset at EOXact */ #define PROC_VACUUM_STATE_MASK \ @@ -205,6 +208,18 @@ struct PGPROC PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */ dlist_head lockGroupMembers; /* list of members, if I'm a leader */ dlist_node lockGroupLink; /* my member link, if I'm a member */ + + /* + * assignedGlobalCsn holds GlobalCSN for this transaction. It is generated + * under a ProcArray lock and later is writter to a GlobalCSNLog. This + * variable defined as atomic only for case of group commit, in all other + * scenarios only backend responsible for this proc entry is working with + * this variable. + */ + GlobalCSN_atomic assignedGlobalCsn; + + /* Original xmin of this backend before global snapshot was imported */ + TransactionId originalXmin; }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index a5c7d0c064..452ae5d547 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -36,6 +36,10 @@ #define PROCARRAY_SLOTS_XMIN 0x20 /* replication slot xmin, * catalog_xmin */ + +#define PROCARRAY_NON_IMPORTED_XMIN 0x40 /* use originalXmin instead + * of xmin to properly + * maintain gsXidMap */ /* * Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching * PGXACT->vacuumFlags. Other flags are used for different purposes and @@ -125,4 +129,8 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin); +extern void ProcArraySetGlobalSnapshotXmin(TransactionId xmin); + +extern TransactionId ProcArrayGetGlobalSnapshotXmin(void); + #endif /* PROCARRAY_H */ diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index b28d13ce84..f4768bc6d4 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -127,6 +127,9 @@ extern void AtSubCommit_Snapshot(int level); extern void AtSubAbort_Snapshot(int level); extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin); +extern GlobalCSN ExportGlobalSnapshot(void); +extern void ImportGlobalSnapshot(GlobalCSN snap_global_csn); + extern void ImportSnapshot(const char *idstr); extern bool XactHasExportedSnapshots(void); extern void DeleteAllExportedSnapshotFiles(void); diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index 57d2dfaa67..71c92c69f4 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -204,6 +204,14 @@ typedef struct SnapshotData TimestampTz whenTaken; /* timestamp when snapshot was taken */ XLogRecPtr lsn; /* position in the WAL stream when taken */ + + /* + * GlobalCSN for cross-node snapshot isolation support. + * Will be used only if track_global_snapshots is enabled. + */ + GlobalCSN global_csn; + /* Did we have our own global_csn or imported one from different node */ + bool imported_global_csn; } SnapshotData; #endif /* SNAPSHOT_H */ -- 2.17.1
>From d3b8cb68ca1bb8721f738c4993083ea4cca3d255 Mon Sep 17 00:00:00 2001 From: "Andrey V. Lepikhov" <a.lepik...@postgrespro.ru> Date: Tue, 12 May 2020 08:31:59 +0500 Subject: [PATCH 3/3] postgres_fdw-support-for-global-snapshots-v3 --- contrib/postgres_fdw/Makefile | 9 + contrib/postgres_fdw/connection.c | 290 ++++++++++++++++-- contrib/postgres_fdw/postgres_fdw.c | 12 + contrib/postgres_fdw/postgres_fdw.h | 2 + .../postgres_fdw/t/001_bank_coordinator.pl | 264 ++++++++++++++++ .../postgres_fdw/t/002_bank_participant.pl | 240 +++++++++++++++ src/test/perl/PostgresNode.pm | 35 +++ 7 files changed, 826 insertions(+), 26 deletions(-) create mode 100644 contrib/postgres_fdw/t/001_bank_coordinator.pl create mode 100644 contrib/postgres_fdw/t/002_bank_participant.pl diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile index ee8a80a392..07091f630e 100644 --- a/contrib/postgres_fdw/Makefile +++ b/contrib/postgres_fdw/Makefile @@ -29,3 +29,12 @@ top_builddir = ../.. include $(top_builddir)/src/Makefile.global include $(top_srcdir)/contrib/contrib-global.mk endif + +# Global makefile will do temp-install for 'check'. Since REGRESS is defined, +# PGXS (included from contrib-global.mk or directly) will care to add +# postgres_fdw to it as EXTRA_INSTALL and build pg_regress. It will also +# actually run pg_regress, so the only thing left is tap tests. +check: tapcheck + +tapcheck: temp-install + $(prove_check) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index e45647f3ea..8e33ae0af7 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -12,8 +12,10 @@ */ #include "postgres.h" +#include "access/global_snapshot.h" #include "access/htup_details.h" #include "access/xact.h" +#include "access/xlog.h" /* GetSystemIdentifier() */ #include "catalog/pg_user_mapping.h" #include "commands/defrem.h" #include "mb/pg_wchar.h" @@ -25,6 +27,8 @@ #include "utils/hsearch.h" #include "utils/inval.h" #include "utils/memutils.h" +#include "utils/snapmgr.h" +#include "utils/snapshot.h" #include "utils/syscache.h" /* @@ -65,6 +69,21 @@ typedef struct ConnCacheEntry */ static HTAB *ConnectionHash = NULL; +/* + * FdwTransactionState + * + * Holds number of open remote transactions and shared state + * needed for all connection entries. + */ +typedef struct FdwTransactionState +{ + char *gid; + int nparticipants; + GlobalCSN global_csn; + bool two_phase_commit; +} FdwTransactionState; +static FdwTransactionState *fdwTransState; + /* for assigning cursor numbers and prepared statement numbers */ static unsigned int cursor_number = 0; static unsigned int prep_stmt_number = 0; @@ -72,6 +91,9 @@ static unsigned int prep_stmt_number = 0; /* tracks whether any work is needed in callback functions */ static bool xact_got_connection = false; +/* counter of prepared tx made by this backend */ +static int two_phase_xact_count = 0; + /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void disconnect_pg_server(ConnCacheEntry *entry); @@ -80,6 +102,7 @@ static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); static void begin_remote_xact(ConnCacheEntry *entry); static void pgfdw_xact_callback(XactEvent event, void *arg); +static void deallocate_prepared_stmts(ConnCacheEntry *entry); static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, @@ -136,6 +159,15 @@ GetConnection(UserMapping *user, bool will_prep_stmt) pgfdw_inval_callback, (Datum) 0); } + /* allocate FdwTransactionState */ + if (fdwTransState == NULL) + { + MemoryContext oldcxt; + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + fdwTransState = palloc0(sizeof(FdwTransactionState)); + MemoryContextSwitchTo(oldcxt); + } + /* Set flag that we did GetConnection during the current transaction */ xact_got_connection = true; @@ -446,7 +478,8 @@ configure_remote_session(PGconn *conn) } /* - * Convenience subroutine to issue a non-data-returning SQL command to remote + * Convenience subroutine to issue a non-data-returning SQL command or + * statement to remote node. */ static void do_sql_command(PGconn *conn, const char *sql) @@ -456,7 +489,8 @@ do_sql_command(PGconn *conn, const char *sql) if (!PQsendQuery(conn, sql)) pgfdw_report_error(ERROR, NULL, conn, false, sql); res = pgfdw_get_result(conn, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + if (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); } @@ -484,6 +518,10 @@ begin_remote_xact(ConnCacheEntry *entry) elog(DEBUG3, "starting remote transaction on connection %p", entry->conn); + if (UseGlobalSnapshots && (!IsolationUsesXactSnapshot() || + IsolationIsSerializable())) + elog(ERROR, "Global snapshots support only REPEATABLE READ"); + if (IsolationIsSerializable()) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; else @@ -492,6 +530,23 @@ begin_remote_xact(ConnCacheEntry *entry) do_sql_command(entry->conn, sql); entry->xact_depth = 1; entry->changing_xact_state = false; + + if (UseGlobalSnapshots) + { + char import_sql[128]; + + /* Export our snapshot */ + if (fdwTransState->global_csn == 0) + fdwTransState->global_csn = ExportGlobalSnapshot(); + + snprintf(import_sql, sizeof(import_sql), + "SELECT pg_global_snapshot_import("UINT64_FORMAT")", + fdwTransState->global_csn); + + do_sql_command(entry->conn, import_sql); + } + + fdwTransState->nparticipants += 1; } /* @@ -699,6 +754,94 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, PG_END_TRY(); } +/* Callback typedef for BroadcastStmt */ +typedef bool (*BroadcastCmdResHandler) (PGresult *result, void *arg); + +/* Broadcast sql in parallel to all ConnectionHash entries */ +static bool +BroadcastStmt(char const * sql, unsigned expectedStatus, + BroadcastCmdResHandler handler, void *arg) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + bool allOk = true; + + /* Broadcast sql */ + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + pgfdw_reject_incomplete_xact_state_change(entry); + + if (entry->xact_depth > 0 && entry->conn != NULL) + { + if (!PQsendQuery(entry->conn, sql)) + { + PGresult *res = PQgetResult(entry->conn); + + elog(WARNING, "Failed to send command %s", sql); + pgfdw_report_error(WARNING, res, entry->conn, true, sql); + PQclear(res); + } + } + } + + /* Collect responses */ + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + if (entry->xact_depth > 0 && entry->conn != NULL) + { + PGresult *result = PQgetResult(entry->conn); + + if (PQresultStatus(result) != expectedStatus || + (handler && !handler(result, arg))) + { + elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus); + pgfdw_report_error(ERROR, result, entry->conn, true, sql); + allOk = false; + } + PQclear(result); + PQgetResult(entry->conn); /* consume NULL result */ + } + } + + return allOk; +} + +/* Wrapper for broadcasting commands */ +static bool +BroadcastCmd(char const *sql) +{ + return BroadcastStmt(sql, PGRES_COMMAND_OK, NULL, NULL); +} + +/* Wrapper for broadcasting statements */ +static bool +BroadcastFunc(char const *sql) +{ + return BroadcastStmt(sql, PGRES_TUPLES_OK, NULL, NULL); +} + +/* Callback for selecting maximal csn */ +static bool +MaxCsnCB(PGresult *result, void *arg) +{ + char *resp; + GlobalCSN *max_csn = (GlobalCSN *) arg; + GlobalCSN csn = 0; + + resp = PQgetvalue(result, 0, 0); + + if (resp == NULL || (*resp) == '\0' || + sscanf(resp, UINT64_FORMAT, &csn) != 1) + return false; + + if (*max_csn < csn) + *max_csn = csn; + + return true; +} + /* * pgfdw_xact_callback --- cleanup at main-transaction end. */ @@ -712,6 +855,86 @@ pgfdw_xact_callback(XactEvent event, void *arg) if (!xact_got_connection) return; + /* Handle possible two-phase commit */ + if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT) + { + bool include_local_tx = false; + + /* Should we take into account this node? */ + if (TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + { + include_local_tx = true; + fdwTransState->nparticipants += 1; + } + + /* Switch to 2PC mode there were more than one participant */ + if (UseGlobalSnapshots && fdwTransState->nparticipants > 1) + fdwTransState->two_phase_commit = true; + + if (fdwTransState->two_phase_commit) + { + GlobalCSN max_csn = InProgressGlobalCSN, + my_csn = InProgressGlobalCSN; + bool res; + char *sql; + + fdwTransState->gid = psprintf("pgfdw:%lld:%llu:%d:%u:%d:%d", + (long long) GetCurrentTimestamp(), + (long long) GetSystemIdentifier(), + MyProcPid, + GetCurrentTransactionIdIfAny(), + ++two_phase_xact_count, + fdwTransState->nparticipants); + + /* Broadcast PREPARE */ + sql = psprintf("PREPARE TRANSACTION '%s'", fdwTransState->gid); + res = BroadcastCmd(sql); + if (!res) + goto error; + + /* Broadcast pg_global_snapshot_prepare() */ + if (include_local_tx) + my_csn = GlobalSnapshotPrepareCurrent(); + + sql = psprintf("SELECT pg_global_snapshot_prepare('%s')", + fdwTransState->gid); + res = BroadcastStmt(sql, PGRES_TUPLES_OK, MaxCsnCB, &max_csn); + if (!res) + goto error; + + /* select maximal global csn */ + if (include_local_tx && my_csn > max_csn) + max_csn = my_csn; + + /* Broadcast pg_global_snapshot_assign() */ + if (include_local_tx) + GlobalSnapshotAssignCsnCurrent(max_csn); + sql = psprintf("SELECT pg_global_snapshot_assign('%s',"UINT64_FORMAT")", + fdwTransState->gid, max_csn); + res = BroadcastFunc(sql); + +error: + if (!res) + { + sql = psprintf("ABORT PREPARED '%s'", fdwTransState->gid); + BroadcastCmd(sql); + elog(ERROR, "Failed to PREPARE transaction on remote node"); + } + + /* + * Do not fall down. Consequent COMMIT event will clean thing up. + */ + return; + } + } + + /* COMMIT open transaction of we were doing 2PC */ + if (fdwTransState->two_phase_commit && + (event == XACT_EVENT_PARALLEL_COMMIT || event == XACT_EVENT_COMMIT)) + { + BroadcastCmd(psprintf("COMMIT PREPARED '%s'", fdwTransState->gid)); + } + /* * Scan all connection cache entries to find open remote transactions, and * close them. @@ -719,8 +942,6 @@ pgfdw_xact_callback(XactEvent event, void *arg) hash_seq_init(&scan, ConnectionHash); while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) { - PGresult *res; - /* Ignore cache entry if no open connection right now */ if (entry->conn == NULL) continue; @@ -737,6 +958,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) { case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_COMMIT: + Assert(!fdwTransState->two_phase_commit); /* * If abort cleanup previously failed for this connection, @@ -749,28 +971,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) do_sql_command(entry->conn, "COMMIT TRANSACTION"); entry->changing_xact_state = false; - /* - * If there were any errors in subtransactions, and we - * made prepared statements, do a DEALLOCATE ALL to make - * sure we get rid of all prepared statements. This is - * annoying and not terribly bulletproof, but it's - * probably not worth trying harder. - * - * DEALLOCATE ALL only exists in 8.3 and later, so this - * constrains how old a server postgres_fdw can - * communicate with. We intentionally ignore errors in - * the DEALLOCATE, so that we can hobble along to some - * extent with older servers (leaking prepared statements - * as we go; but we don't really support update operations - * pre-8.3 anyway). - */ - if (entry->have_prep_stmt && entry->have_error) - { - res = PQexec(entry->conn, "DEALLOCATE ALL"); - PQclear(res); - } - entry->have_prep_stmt = false; - entry->have_error = false; + deallocate_prepared_stmts(entry); break; case XACT_EVENT_PRE_PREPARE: @@ -789,6 +990,11 @@ pgfdw_xact_callback(XactEvent event, void *arg) break; case XACT_EVENT_PARALLEL_COMMIT: case XACT_EVENT_COMMIT: + if (fdwTransState->two_phase_commit) + deallocate_prepared_stmts(entry); + else /* Pre-commit should have closed the open transaction */ + elog(ERROR, "missed cleaning up connection during pre-commit"); + break; case XACT_EVENT_PREPARE: /* Pre-commit should have closed the open transaction */ elog(ERROR, "missed cleaning up connection during pre-commit"); @@ -884,6 +1090,38 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Also reset cursor numbering for next transaction */ cursor_number = 0; + + /* Reset fdwTransState */ + memset(fdwTransState, '\0', sizeof(FdwTransactionState)); +} + +/* + * If there were any errors in subtransactions, and we + * made prepared statements, do a DEALLOCATE ALL to make + * sure we get rid of all prepared statements. This is + * annoying and not terribly bulletproof, but it's + * probably not worth trying harder. + * + * DEALLOCATE ALL only exists in 8.3 and later, so this + * constrains how old a server postgres_fdw can + * communicate with. We intentionally ignore errors in + * the DEALLOCATE, so that we can hobble along to some + * extent with older servers (leaking prepared statements + * as we go; but we don't really support update operations + * pre-8.3 anyway). + */ +static void +deallocate_prepared_stmts(ConnCacheEntry *entry) +{ + PGresult *res; + + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + entry->have_prep_stmt = false; + entry->have_error = false; } /* diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9fc53cad68..03c5b0093a 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -301,6 +301,9 @@ typedef struct List *already_used; /* expressions already dealt with */ } ec_member_foreign_arg; +bool UseGlobalSnapshots; +void _PG_init(void); + /* * SQL functions */ @@ -6584,3 +6587,12 @@ find_em_expr_for_input_target(PlannerInfo *root, elog(ERROR, "could not find pathkey item to sort"); return NULL; /* keep compiler quiet */ } + +void +_PG_init(void) +{ + DefineCustomBoolVariable("postgres_fdw.use_global_snapshots", + "Use global snapshots for FDW transactions", NULL, + &UseGlobalSnapshots, false, PGC_USERSET, 0, NULL, + NULL, NULL); +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410db39..9d3ea077a1 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -208,4 +208,6 @@ extern const char *get_jointype_name(JoinType jointype); extern bool is_builtin(Oid objectId); extern bool is_shippable(Oid objectId, Oid classId, PgFdwRelationInfo *fpinfo); +extern bool UseGlobalSnapshots; + #endif /* POSTGRES_FDW_H */ diff --git a/contrib/postgres_fdw/t/001_bank_coordinator.pl b/contrib/postgres_fdw/t/001_bank_coordinator.pl new file mode 100644 index 0000000000..1e31f33349 --- /dev/null +++ b/contrib/postgres_fdw/t/001_bank_coordinator.pl @@ -0,0 +1,264 @@ +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +my $master = get_new_node("master"); +$master->init; +$master->append_conf('postgresql.conf', qq( + max_prepared_transactions = 30 + log_checkpoints = true + postgres_fdw.use_global_snapshots = on + track_global_snapshots = on + default_transaction_isolation = 'REPEATABLE READ' +)); +$master->start; + +my $shard1 = get_new_node("shard1"); +$shard1->init; +$shard1->append_conf('postgresql.conf', qq( + max_prepared_transactions = 30 + global_snapshot_defer_time = 15 + track_global_snapshots = on +)); +$shard1->start; + +my $shard2 = get_new_node("shard2"); +$shard2->init; +$shard2->append_conf('postgresql.conf', qq( + max_prepared_transactions = 30 + global_snapshot_defer_time = 15 + track_global_snapshots = on +)); +$shard2->start; + +############################################################################### +# Prepare nodes +############################################################################### + +$master->safe_psql('postgres', qq[ + CREATE EXTENSION postgres_fdw; + CREATE TABLE accounts(id integer primary key, amount integer); + CREATE TABLE global_transactions(tx_time timestamp); +]); + +foreach my $node ($shard1, $shard2) +{ + my $port = $node->port; + my $host = $node->host; + + $node->safe_psql('postgres', + "CREATE TABLE accounts(id integer primary key, amount integer)"); + + $master->safe_psql('postgres', qq[ + CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw options(dbname 'postgres', host '$host', port '$port'); + CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts) server shard_$port options(table_name 'accounts'); + CREATE USER MAPPING for CURRENT_USER SERVER shard_$port; + ]) +} + +$shard1->safe_psql('postgres', qq[ + insert into accounts select 2*id-1, 0 from generate_series(1, 10010) as id; + CREATE TABLE local_transactions(tx_time timestamp); +]); + +$shard2->safe_psql('postgres', qq[ + insert into accounts select 2*id, 0 from generate_series(1, 10010) as id; + CREATE TABLE local_transactions(tx_time timestamp); +]); + +diag("master: @{[$master->connstr('postgres')]}"); +diag("shard1: @{[$shard1->connstr('postgres')]}"); +diag("shard2: @{[$shard2->connstr('postgres')]}"); + +############################################################################### +# pgbench scripts +############################################################################### + +my $bank = File::Temp->new(); +append_to_file($bank, q{ + \set id random(1, 20000) + BEGIN; + WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *) + INSERT into global_transactions SELECT now() FROM upd; + UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1); + COMMIT; +}); + +my $bank1 = File::Temp->new(); +append_to_file($bank1, q{ + \set id random(1, 10000) + BEGIN; + WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = (2*:id + 1) RETURNING *) + INSERT into local_transactions SELECT now() FROM upd; + UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 3); + COMMIT; +}); + +my $bank2 = File::Temp->new(); +append_to_file($bank2, q{ + \set id random(1, 10000) + + BEGIN; + WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = 2*:id RETURNING *) + INSERT into local_transactions SELECT now() FROM upd; + UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 2); + COMMIT; +}); + +############################################################################### +# Helpers +############################################################################### + +sub count_and_delete_rows +{ + my ($node, $table) = @_; + my $count; + + $count = $node->safe_psql('postgres',"select count(*) from $table"); + $node->safe_psql('postgres',"delete from $table"); + diag($node->name, ": completed $count transactions"); + return $count; +} + +############################################################################### +# Concurrent global transactions +############################################################################### + +my ($err, $rc); +my $started; +my $seconds = 30; +my $selects; +my $total = '0'; +my $oldtotal = '0'; +my $isolation_errors = 0; + + +my $pgb_handle; + +$pgb_handle = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); + +$started = time(); +$selects = 0; +while (time() - $started < $seconds) +{ + $total = $master->safe_psql('postgres', "select sum(amount) from accounts"); + if ( ($total ne $oldtotal) and ($total ne '') ) + { + $isolation_errors++; + $oldtotal = $total; + diag("Isolation error. Total = $total"); + } + if ($total ne '') { $selects++; } +} + +$master->pgbench_await($pgb_handle); + +# sanity check +diag("completed $selects selects"); +die "no actual transactions happend" unless ( $selects > 0 && + count_and_delete_rows($master, 'global_transactions') > 0); + +is($isolation_errors, 0, 'isolation between concurrent global transaction'); + +############################################################################### +# Concurrent global and local transactions +############################################################################### + +my ($pgb_handle1, $pgb_handle2, $pgb_handle3); + +# global txses +$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); + +# concurrent local +$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' ); +$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' ); + +$started = time(); +$selects = 0; +$oldtotal = 0; +while (time() - $started < $seconds) +{ + $total = $master->safe_psql('postgres', "select sum(amount) from accounts"); + if ( ($total ne $oldtotal) and ($total ne '') ) + { + $isolation_errors++; + $oldtotal = $total; + diag("Isolation error. Total = $total"); + } + if ($total ne '') { $selects++; } +} + +diag("selects = $selects"); +$master->pgbench_await($pgb_handle1); +$shard1->pgbench_await($pgb_handle2); +$shard2->pgbench_await($pgb_handle3); + +diag("completed $selects selects"); +die "" unless ( $selects > 0 && + count_and_delete_rows($master, 'global_transactions') > 0 && + count_and_delete_rows($shard1, 'local_transactions') > 0 && + count_and_delete_rows($shard2, 'local_transactions') > 0); + +is($isolation_errors, 0, 'isolation between concurrent global and local transactions'); + + +############################################################################### +# Snapshot stability +############################################################################### + +my ($hashes, $hash1, $hash2); +my $stability_errors = 0; + +# global txses +$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); +# concurrent local +$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' ); +$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' ); + +$selects = 0; +$started = time(); +while (time() - $started < $seconds) +{ + foreach my $node ($master, $shard1, $shard2) + { + ($hash1, $_, $hash2) = split "\n", $node->safe_psql('postgres', qq[ + begin isolation level repeatable read; + select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t; + select pg_sleep(3); + select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t; + commit; + ]); + + if ($hash1 ne $hash2) + { + diag("oops"); + $stability_errors++; + } + elsif ($hash1 eq '' or $hash2 eq '') + { + die; + } + else + { + $selects++; + } + } +} + +$master->pgbench_await($pgb_handle1); +$shard1->pgbench_await($pgb_handle2); +$shard2->pgbench_await($pgb_handle3); + +die "" unless ( $selects > 0 && + count_and_delete_rows($master, 'global_transactions') > 0 && + count_and_delete_rows($shard1, 'local_transactions') > 0 && + count_and_delete_rows($shard2, 'local_transactions') > 0); + +is($stability_errors, 0, 'snapshot is stable during concurrent global and local transactions'); + +$master->stop; +$shard1->stop; +$shard2->stop; diff --git a/contrib/postgres_fdw/t/002_bank_participant.pl b/contrib/postgres_fdw/t/002_bank_participant.pl new file mode 100644 index 0000000000..04a2f1ba85 --- /dev/null +++ b/contrib/postgres_fdw/t/002_bank_participant.pl @@ -0,0 +1,240 @@ +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +my $shard1 = get_new_node("shard1"); +$shard1->init; +$shard1->append_conf('postgresql.conf', qq( + max_prepared_transactions = 30 + postgres_fdw.use_global_snapshots = on + global_snapshot_defer_time = 15 + track_global_snapshots = on + default_transaction_isolation = 'REPEATABLE READ' +)); +$shard1->start; + +my $shard2 = get_new_node("shard2"); +$shard2->init; +$shard2->append_conf('postgresql.conf', qq( + max_prepared_transactions = 30 + postgres_fdw.use_global_snapshots = on + global_snapshot_defer_time = 15 + track_global_snapshots = on + default_transaction_isolation = 'REPEATABLE READ' +)); +$shard2->start; + +############################################################################### +# Prepare nodes +############################################################################### + +my @shards = ($shard1, $shard2); + +foreach my $node (@shards) +{ + $node->safe_psql('postgres', qq[ + CREATE EXTENSION postgres_fdw; + CREATE TABLE accounts(id integer primary key, amount integer); + CREATE TABLE accounts_local() inherits(accounts); + CREATE TABLE global_transactions(tx_time timestamp); + CREATE TABLE local_transactions(tx_time timestamp); + ]); + + foreach my $neighbor (@shards) + { + next if ($neighbor eq $node); + + my $port = $neighbor->port; + my $host = $neighbor->host; + + $node->safe_psql('postgres', qq[ + CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw + options(dbname 'postgres', host '$host', port '$port'); + CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts) + server shard_$port options(table_name 'accounts_local'); + CREATE USER MAPPING for CURRENT_USER SERVER shard_$port; + ]); + } +} + +$shard1->psql('postgres', "insert into accounts_local select 2*id-1, 0 from generate_series(1, 10010) as id;"); +$shard2->psql('postgres', "insert into accounts_local select 2*id, 0 from generate_series(1, 10010) as id;"); + +############################################################################### +# pgbench scripts +############################################################################### + +my $bank = File::Temp->new(); +append_to_file($bank, q{ + \set id random(1, 20000) + BEGIN; + WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *) + INSERT into global_transactions SELECT now() FROM upd; + UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1); + COMMIT; +}); + +############################################################################### +# Helpers +############################################################################### + +sub count_and_delete_rows +{ + my ($node, $table) = @_; + my $count; + + $count = $node->safe_psql('postgres',"select count(*) from $table"); + $node->safe_psql('postgres',"delete from $table"); + diag($node->name, ": completed $count transactions"); + return $count; +} + +############################################################################### +# Concurrent global transactions +############################################################################### + +my ($err, $rc); +my $started; +my $seconds = 30; +my $selects; +my $total = '0'; +my $oldtotal = '0'; +my $isolation_errors = 0; +my $i; + + +my ($pgb_handle1, $pgb_handle2); + +$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); +$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); + +$started = time(); +$selects = 0; +$i = 0; +while (time() - $started < $seconds) +{ + my $shard = $shard1; + foreach my $shard (@shards) + { + $total = $shard->safe_psql('postgres', "select sum(amount) from accounts"); + if ( ($total ne $oldtotal) and ($total ne '') ) + { + $isolation_errors++; + $oldtotal = $total; + diag("$i: Isolation error. Total = $total"); + } + if ($total ne '') { $selects++; } + } + $i++; +} + +$shard1->pgbench_await($pgb_handle1); +$shard2->pgbench_await($pgb_handle2); + +# sanity check +diag("completed $selects selects"); +die "no actual transactions happend" unless ( $selects > 0 && + count_and_delete_rows($shard1, 'global_transactions') > 0 && + count_and_delete_rows($shard2, 'global_transactions') > 0); + +is($isolation_errors, 0, 'isolation between concurrent global transaction'); + +############################################################################### +# And do the same after soft restart +############################################################################### + +$shard1->restart; +$shard2->restart; +$shard1->poll_query_until('postgres', "select 't'") + or die "Timed out waiting for shard1 to became online"; +$shard2->poll_query_until('postgres', "select 't'") + or die "Timed out waiting for shard2 to became online"; + +$seconds = 15; +$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); +$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); + +$started = time(); +$selects = 0; +$i = 0; + +while (time() - $started < $seconds) +{ + my $shard = $shard1; + foreach my $shard (@shards) + { + $total = $shard->safe_psql('postgres', "select sum(amount) from accounts"); + if ( ($total ne $oldtotal) and ($total ne '') ) + { + $isolation_errors++; + $oldtotal = $total; + diag("$i: Isolation error. Total = $total"); + } + if ($total ne '') { $selects++; } + } + $i++; +} + +$shard1->pgbench_await($pgb_handle1); +$shard2->pgbench_await($pgb_handle2); + +# sanity check +diag("completed $selects selects"); +die "no actual transactions happend" unless ( $selects > 0 && + count_and_delete_rows($shard1, 'global_transactions') > 0 && + count_and_delete_rows($shard2, 'global_transactions') > 0); + +is($isolation_errors, 0, 'isolation between concurrent global transaction after restart'); + +############################################################################### +# And do the same after hard restart +############################################################################### + +$shard1->teardown_node; +$shard2->teardown_node; +$shard1->start; +$shard2->start; +$shard1->poll_query_until('postgres', "select 't'") + or die "Timed out waiting for shard1 to became online"; +$shard2->poll_query_until('postgres', "select 't'") + or die "Timed out waiting for shard2 to became online"; + + +$seconds = 15; +$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); +$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); + +$started = time(); +$selects = 0; +$i = 0; + +while (time() - $started < $seconds) +{ + my $shard = $shard1; + foreach my $shard (@shards) + { + $total = $shard->safe_psql('postgres', "select sum(amount) from accounts"); + if ( ($total ne $oldtotal) and ($total ne '') ) + { + $isolation_errors++; + $oldtotal = $total; + diag("$i: Isolation error. Total = $total"); + } + if ($total ne '') { $selects++; } + } + $i++; +} + +$shard1->pgbench_await($pgb_handle1); +$shard2->pgbench_await($pgb_handle2); + +# sanity check +diag("completed $selects selects"); +die "no actual transactions happend" unless ( $selects > 0 && + count_and_delete_rows($shard1, 'global_transactions') > 0 && + count_and_delete_rows($shard2, 'global_transactions') > 0); + +is($isolation_errors, 0, 'isolation between concurrent global transaction after hard restart'); diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 1d5450758e..ef4472170c 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -2115,6 +2115,41 @@ sub pg_recvlogical_upto } } +sub pgbench() +{ + my ($self, $node, @args) = @_; + my $pgbench_handle = $self->pgbench_async($node, @args); + $self->pgbench_await($pgbench_handle); +} + +sub pgbench_async() +{ + my ($self, @args) = @_; + + my ($in, $out, $err, $rc); + $in = ''; + $out = ''; + + my @pgbench_command = ( + 'pgbench', + -h => $self->host, + -p => $self->port, + @args + ); + my $handle = IPC::Run::start(\@pgbench_command, $in, $out); + return $handle; +} + +sub pgbench_await() +{ + my ($self, $pgbench_handle) = @_; + + # During run some pgbench threads can exit (for example due to + # serialization error). That will set non-zero returning code. + # So don't check return code here and leave it to a caller. + my $rc = IPC::Run::finish($pgbench_handle); +} + =pod =back -- 2.17.1