On 2015-01-27 07:16:27 -0500, Robert Haas wrote:
> On Mon, Jan 26, 2015 at 4:03 PM, Andres Freund <and...@2ndquadrant.com> wrote:
> >> I basically have two ideas to fix this.
> >>
> >> 1) Make do_pg_start_backup() acquire a SHARE lock on
> >>    pg_database. That'll prevent it from starting while a movedb() is
> >>    still in progress. Then additionally add pg_backup_in_progress()
> >>    function to xlog.c that checks (XLogCtl->Insert.exclusiveBackup ||
> >>    XLogCtl->Insert.nonExclusiveBackups != 0). Use that in createdb() and
> >>    movedb() to error out if a backup is in progress.
> >
> > Attached is a patch trying to this. Doesn't look too bad and lead me to
> > discover missing recovery conflicts during a AD ST.
> >
> > But: It doesn't actually work on standbys, because lock.c prevents any
> > stronger lock than RowExclusive from being acquired. And we need need a
> > lock that can conflict with WAL replay of DBASE_CREATE, to handle base
> > backups that are executed on the primary. Those obviously can't detect
> > whether any standby is currently doing a base backup...
> >
> > I currently don't have a good idea how to mangle lock.c to allow
> > this. I've played with doing it like in the second patch, but that
> > doesn't actually work because of some asserts around ProcSleep - leading
> > to locks on database objects not working in the startup process (despite
> > already being used).
> >
> > The easiest thing would be to just use a lwlock instead of a heavyweight
> > lock - but those aren't canceleable...
> 
> How about just wrapping an lwlock around a flag variable?  movedb()
> increments the variable when starting and decrements it when done
> (must use PG_ENSURE_ERROR_CLEANUP).  Starting a backup errors out (or
> waits in 1-second increments) if it's non-zero.

That'd end up essentially being a re-emulation of locks. Don't find that
all that pretty. But maybe we have to go there.


Here's an alternative approach. I think it generally is superior and
going in the right direction, but I'm not sure it's backpatchable.

It basically consists out of:
1) Make GetLockConflicts() actually work.
2) Allow the startup process to actually acquire locks other than
   AccessExclusiveLocks. There already is code acquiring other locks,
   but it's currently broken because they're acquired in blocking mode
   which isn't actually supported in the startup mode. Using this
   infrastructure we can actually fix a couple small races around
   database creation/drop.
3) Allow session locks during recovery to be heavier than
   RowExclusiveLock - since relation/object session locks aren't ever
   taken out by the user (without a plain lock first) that's safe.
4) Perform streaming base backups from within a transaction command, to
   provide error handling.
5) Make walsender actually react to recovery conflict interrupts
6) Prevent access to the template database of a CREATE DATABASE during
   WAL replay.
7) Add an interlock to prevent base backups and movedb() to run
   concurrently. What we actually came here for.

Comments?

At the very least it's missing some documentation updates about the
locking changes in ALTER DATABASE, CREATE DATABASE and the base backup
sections.

Greetings,

Andres Freund

-- 
 Andres Freund                     http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
>From 6e196d17e3dc3ae923321c1b49eb46ccd5ac75b0 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Tue, 27 Jan 2015 19:52:11 +0100
Subject: [PATCH] Fix various issues around WAL replay and ALTER DATABASE SET
 TABLESPACE.

<Danger of basebackups vs. AD ST>
Discussion: 20150120152819.gc24...@alap3.anarazel.de

Fix GetLockConflicts() to properly terminate the list of conflicting
backends. It's unclear why this hasn't caused more problems.

Discussion: 20150127142713.gd29...@awork2.anarazel.de

Don't acquire blocking locks on the database in dbase_redo(), not
enough state setup.
Discussion: 20150126212458.ga29...@awork2.anarazel.de

Don't allow access to the template database during the replay of a
CREATE DATABASE.
---
 src/backend/access/transam/xlog.c    |  29 +++++++++
 src/backend/commands/dbcommands.c    | 104 ++++++++++++++++++++++++++-----
 src/backend/replication/basebackup.c |  15 +++++
 src/backend/replication/walsender.c  |  14 +----
 src/backend/storage/ipc/standby.c    | 117 ++++++++++++++++++-----------------
 src/backend/storage/lmgr/lmgr.c      |  31 ++++++++++
 src/backend/storage/lmgr/lock.c      |  13 ++--
 src/backend/utils/init/postinit.c    |   2 +-
 src/include/access/xlog.h            |   1 +
 src/include/storage/lmgr.h           |   3 +
 src/include/storage/standby.h        |   2 +-
 11 files changed, 240 insertions(+), 91 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 629a457..38e7dff 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -53,6 +53,7 @@
 #include "storage/ipc.h"
 #include "storage/large_object.h"
 #include "storage/latch.h"
+#include "storage/lmgr.h"
 #include "storage/pmsignal.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -9291,6 +9292,12 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 				 errmsg("backup label too long (max %d bytes)",
 						MAXPGPATH)));
+	/*
+	 * Acquire lock on pg datababase to prevent concurrent CREATE DATABASE and
+	 * ALTER DATABASE ... SET TABLESPACE from running. In case of an error
+	 * it'll be released by the transaction abort code.
+	 */
+	LockRelationOidForSession(DatabaseRelationId, ShareLock);
 
 	/*
 	 * Mark backup active in shared memory.  We must do full-page WAL writes
@@ -9523,6 +9530,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
 
+	UnlockRelationOidForSession(DatabaseRelationId, ShareLock);
+
 	/*
 	 * We're done.  As a convenience, return the starting WAL location.
 	 */
@@ -9937,6 +9946,26 @@ do_pg_abort_backup(void)
 }
 
 /*
+ * Is a (exclusive or nonexclusive) base backup running?
+ *
+ * Note that this does not check whether any standby of this node has a
+ * basebackup running, or whether any upstream master (if this is a standby)
+ * has one in progress
+ */
+bool
+LocalBaseBackupInProgress(void)
+{
+	bool ret;
+
+	WALInsertLockAcquire();
+	ret = XLogCtl->Insert.exclusiveBackup ||
+		XLogCtl->Insert.nonExclusiveBackups > 0;
+	WALInsertLockRelease();
+
+	return ret;
+}
+
+/*
  * Get latest redo apply position.
  *
  * Exported to allow WALReceiver to read the pointer directly.
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 5e66961..e6a3352 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1071,6 +1071,9 @@ movedb(const char *dbname, const char *tblspcname)
 	LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
 							   AccessExclusiveLock);
 
+	/* Acquire lock on pg_database against concurrent base backups starting */
+	LockRelationOidForSession(DatabaseRelationId, RowExclusiveLock);
+
 	/*
 	 * Permission checks
 	 */
@@ -1087,6 +1090,30 @@ movedb(const char *dbname, const char *tblspcname)
 				 errmsg("cannot change the tablespace of the currently open database")));
 
 	/*
+	 * Prevent SET TABLESPACE from running concurrently with a base
+	 * backup. Without that check a base backup would potentially copy a
+	 * partially removed source database; which WAL replay then would copy
+	 * over the new database...
+	 *
+	 * Starting a base backup takes a SHARE lock on pg_database. In addition a
+	 * streaming basebackup takes the same lock for the entirety of the copy
+	 * of the data directory.  That, combined with this check, prevents base
+	 * backups from being taken at the same time a SET TABLESPACE is in
+	 * progress.
+	 *
+	 * Note that this check here will not trigger if a standby currently has a
+	 * base backup ongoing; instead WAL replay will have a recovery conflict
+	 * when replaying the DBASE_CREATE record. That is only safe because
+	 * standbys can only do nonexclusive base backups which hold the lock over
+	 * the entire runtime - otherwise no lock would prevent replay after
+	 * pg_start_backup().
+	 */
+	if (LocalBaseBackupInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot change a database's tablespace while a base backup is in progress")));
+
+	/*
 	 * Get tablespace's oid
 	 */
 	dst_tblspcoid = get_tablespace_oid(tblspcname, false);
@@ -1116,6 +1143,7 @@ movedb(const char *dbname, const char *tblspcname)
 		heap_close(pgdbrel, NoLock);
 		UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
 									 AccessExclusiveLock);
+		UnlockRelationOidForSession(DatabaseRelationId, RowExclusiveLock);
 		return;
 	}
 
@@ -1204,6 +1232,12 @@ movedb(const char *dbname, const char *tblspcname)
 	}
 
 	/*
+	 * Force xid assignment, for the benefit of dbase_redo()'s internal
+	 * locking.
+	 */
+	GetTopTransactionId();
+
+	/*
 	 * Use an ENSURE block to make sure we remove the debris if the copy fails
 	 * (eg, due to out-of-disk-space).  This is not a 100% solution, because
 	 * of the possibility of failure during transaction commit, but it should
@@ -1314,6 +1348,12 @@ movedb(const char *dbname, const char *tblspcname)
 	StartTransactionCommand();
 
 	/*
+	 * Force xid assignment, for the benefit of dbase_redo()'s internal
+	 * locking.
+	 */
+	GetTopTransactionId();
+
+	/*
 	 * Remove files from the old tablespace
 	 */
 	if (!rmtree(src_dbpath, true))
@@ -1340,6 +1380,7 @@ movedb(const char *dbname, const char *tblspcname)
 	/* Now it's safe to release the database lock */
 	UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
 								 AccessExclusiveLock);
+	UnlockRelationOidForSession(DatabaseRelationId, RowExclusiveLock);
 }
 
 /* Error cleanup callback for movedb */
@@ -2053,6 +2094,38 @@ dbase_redo(XLogReaderState *record)
 		dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
 
 		/*
+		 * Can only do the locking if the server generating the record was new
+		 * enough to know it has to assign a xid.
+		 */
+		if (InHotStandby && TransactionIdIsValid(XLogRecGetXid(record)))
+		{
+			LOCKTAG locktag;
+
+			SET_LOCKTAG_OBJECT(locktag,
+							   InvalidOid,
+							   DatabaseRelationId,
+							   xlrec->src_db_id,
+							   0);
+
+			/* Lock source database, do avoid concurrent hint bit writes et al. */
+			StandbyAcquireLock(XLogRecGetXid(record), &locktag, AccessExclusiveLock);
+			ResolveRecoveryConflictWithDatabase(xlrec->src_db_id);
+
+			/* Lock target database, it'll be overwritten in a second */
+			SET_LOCKTAG_OBJECT(locktag,
+							   InvalidOid,
+							   DatabaseRelationId,
+							   xlrec->db_id,
+							   0);
+			StandbyAcquireLock(XLogRecGetXid(record), &locktag, AccessExclusiveLock);
+			ResolveRecoveryConflictWithDatabase(xlrec->src_db_id);
+
+			/* Lock pg_database, to conflict with base backups */
+			SET_LOCKTAG_RELATION(locktag, 0, DatabaseRelationId);
+			StandbyAcquireLock(XLogRecGetXid(record), &locktag, RowExclusiveLock);
+		}
+
+		/*
 		 * Our theory for replaying a CREATE is to forcibly drop the target
 		 * subdirectory if present, then re-copy the source data. This may be
 		 * more work than needed, but it is simple to implement.
@@ -2086,16 +2159,31 @@ dbase_redo(XLogReaderState *record)
 
 		dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
 
-		if (InHotStandby)
+		/*
+		 * Can only do the locking if the server generating the record was new
+		 * enough to know it has to assign a xid.
+		 */
+		if (InHotStandby && TransactionIdIsValid(XLogRecGetXid(record)))
 		{
+			LOCKTAG locktag;
+
 			/*
 			 * Lock database while we resolve conflicts to ensure that
 			 * InitPostgres() cannot fully re-execute concurrently. This
 			 * avoids backends re-connecting automatically to same database,
 			 * which can happen in some cases.
 			 */
-			LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
+			SET_LOCKTAG_OBJECT(locktag,
+							   InvalidOid,
+							   DatabaseRelationId,
+							   xlrec->db_id,
+							   0);
+			StandbyAcquireLock(XLogRecGetXid(record), &locktag, AccessExclusiveLock);
 			ResolveRecoveryConflictWithDatabase(xlrec->db_id);
+
+			/* Lock pg_database, to conflict with base backups */
+			SET_LOCKTAG_RELATION(locktag, 0, DatabaseRelationId);
+			StandbyAcquireLock(XLogRecGetXid(record), &locktag, RowExclusiveLock);
 		}
 
 		/* Drop pages for this database that are in the shared buffer cache */
@@ -2112,18 +2200,6 @@ dbase_redo(XLogReaderState *record)
 			ereport(WARNING,
 					(errmsg("some useless files may be left behind in old database directory \"%s\"",
 							dst_path)));
-
-		if (InHotStandby)
-		{
-			/*
-			 * Release locks prior to commit. XXX There is a race condition
-			 * here that may allow backends to reconnect, but the window for
-			 * this is small because the gap between here and commit is mostly
-			 * fairly small and it is unlikely that people will be dropping
-			 * databases that we are trying to connect to anyway.
-			 */
-			UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
-		}
 	}
 	else
 		elog(PANIC, "dbase_redo: unknown op code %u", info);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 3058ce9..a0b590f 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -17,8 +17,10 @@
 #include <unistd.h>
 #include <time.h>
 
+#include "access/xact.h"
 #include "access/xlog_internal.h"		/* for pg_start/stop_backup */
 #include "catalog/catalog.h"
+#include "catalog/pg_database.h"
 #include "catalog/pg_type.h"
 #include "lib/stringinfo.h"
 #include "libpq/libpq.h"
@@ -32,6 +34,8 @@
 #include "replication/walsender_private.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/lock.h"
 #include "utils/builtins.h"
 #include "utils/elog.h"
 #include "utils/ps_status.h"
@@ -134,6 +138,9 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
 
 	startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
 								  &labelfile);
+
+	LockRelationOidForSession(DatabaseRelationId, ShareLock);
+
 	/*
 	 * Once do_pg_start_backup has been called, ensure that any failure causes
 	 * us to abort the backup so we don't "leak" a backup counter. For this reason,
@@ -304,6 +311,9 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
 
+	/* release lock preventing base backups - no risk while backing up WAL */
+	UnlockRelationOidForSession(DatabaseRelationId, ShareLock);
+
 	endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
 
 	if (opt->includewal)
@@ -675,6 +685,9 @@ SendBaseBackup(BaseBackupCmd *cmd)
 		set_ps_display(activitymsg, false);
 	}
 
+	/* to provide error handling around locks */
+	StartTransactionCommand();
+
 	/* Make sure we can open the directory with tablespaces in it */
 	dir = AllocateDir("pg_tblspc");
 	if (!dir)
@@ -684,6 +697,8 @@ SendBaseBackup(BaseBackupCmd *cmd)
 	perform_base_backup(&opt, dir);
 
 	FreeDir(dir);
+
+	CommitTransactionCommand();
 }
 
 static void
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 05d2339..3f25ccf 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -185,7 +185,6 @@ static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
 
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
-static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
@@ -2566,17 +2565,6 @@ WalSndSigHupHandler(SIGNAL_ARGS)
 	errno = save_errno;
 }
 
-/* SIGUSR1: set flag to send WAL records */
-static void
-WalSndXLogSendHandler(SIGNAL_ARGS)
-{
-	int			save_errno = errno;
-
-	latch_sigusr1_handler();
-
-	errno = save_errno;
-}
-
 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
 static void
 WalSndLastCycleHandler(SIGNAL_ARGS)
@@ -2610,7 +2598,7 @@ WalSndSignals(void)
 	pqsignal(SIGQUIT, quickdie);	/* hard crash time */
 	InitializeTimeouts();		/* establishes SIGALRM handler */
 	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGUSR1, WalSndXLogSendHandler);	/* request WAL sending */
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
 	pqsignal(SIGUSR2, WalSndLastCycleHandler);	/* request a last cycle and
 												 * shutdown */
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 292bed5..98e89e1 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -38,10 +38,16 @@ int			max_standby_archive_delay = 30 * 1000;
 int			max_standby_streaming_delay = 30 * 1000;
 
 static List *RecoveryLockList;
+typedef struct RecoveryLockListEntry
+{
+	TransactionId xid;
+	LOCKMODE lockmode;
+	LOCKTAG locktag;
+} RecoveryLockListEntry;
 
 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 									   ProcSignalReason reason);
-static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
+static void ResolveRecoveryConflictWithLock(LOCKTAG *tag, LOCKMODE mode);
 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
@@ -320,10 +326,10 @@ ResolveRecoveryConflictWithDatabase(Oid dbid)
 	 * us. This is rare enough that we do this as simply as possible: no wait,
 	 * just force them off immediately.
 	 *
-	 * No locking is required here because we already acquired
-	 * AccessExclusiveLock. Anybody trying to connect while we do this will
-	 * block during InitPostgres() and then disconnect when they see the
-	 * database has been removed.
+	 * No locking is required here because we already acquired a
+	 * AccessExclusiveLock on the database in dbase_redo(). Anybody trying to
+	 * connect while we do this will block during InitPostgres() and then
+	 * disconnect when they see the database has been removed.
 	 */
 	while (CountDBBackends(dbid) > 0)
 	{
@@ -338,14 +344,11 @@ ResolveRecoveryConflictWithDatabase(Oid dbid)
 }
 
 static void
-ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
+ResolveRecoveryConflictWithLock(LOCKTAG *locktag, LOCKMODE mode)
 {
 	VirtualTransactionId *backends;
 	bool		lock_acquired = false;
 	int			num_attempts = 0;
-	LOCKTAG		locktag;
-
-	SET_LOCKTAG_RELATION(locktag, dbOid, relOid);
 
 	/*
 	 * If blowing away everybody with conflicting locks doesn't work, after
@@ -358,7 +361,7 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
 	while (!lock_acquired)
 	{
 		if (++num_attempts < 3)
-			backends = GetLockConflicts(&locktag, AccessExclusiveLock);
+			backends = GetLockConflicts(locktag, mode);
 		else
 			backends = GetConflictingVirtualXIDs(InvalidTransactionId,
 												 InvalidOid);
@@ -366,7 +369,7 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
 		ResolveRecoveryConflictWithVirtualXIDs(backends,
 											 PROCSIG_RECOVERY_CONFLICT_LOCK);
 
-		if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
+		if (LockAcquireExtended(locktag, mode, true, true, false)
 			!= LOCKACQUIRE_NOT_AVAIL)
 			lock_acquired = true;
 	}
@@ -544,19 +547,18 @@ StandbyTimeoutHandler(void)
  * this lock, so query access is not allowed at this time". So the Startup
  * process is the proxy by which the original locks are implemented.
  *
- * We only keep track of AccessExclusiveLocks, which are only ever held by
- * one transaction on one relation, and don't worry about lock queuing.
+ * We only keep track of the primary's AccessExclusiveLocks, which are only
+ * ever held by one transaction on one relation, and don't worry about lock
+ * queuing.  The startup process however does acquire other locks occasionally
+ * (c.f. dbase_redo()) - but even there no queuing is possible.
  *
  * We keep a single dynamically expandible list of locks in local memory,
  * RelationLockList, so we can keep track of the various entries made by
  * the Startup process's virtual xid in the shared lock table.
  *
  * We record the lock against the top-level xid, rather than individual
- * subtransaction xids. This means AccessExclusiveLocks held by aborted
- * subtransactions are not released as early as possible on standbys.
- *
- * List elements use type xl_rel_lock, since the WAL record type exactly
- * matches the information that we need to keep track of.
+ * subtransaction xids. This means locks held by aborted subtransactions are
+ * not released as early as possible on standbys.
  *
  * We use session locks rather than normal locks so we don't need
  * ResourceOwners.
@@ -564,10 +566,11 @@ StandbyTimeoutHandler(void)
 
 
 void
-StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
+StandbyAcquireLock(TransactionId xid, LOCKTAG *locktag, LOCKMODE mode)
 {
-	xl_standby_lock *newlock;
-	LOCKTAG		locktag;
+	RecoveryLockListEntry *newlock;
+	Oid dbOid = locktag->locktag_field1;
+	Oid relOid = locktag->locktag_field2;
 
 	/* Already processed? */
 	if (!TransactionIdIsValid(xid) ||
@@ -576,25 +579,24 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 		return;
 
 	elog(trace_recovery(DEBUG4),
-		 "adding recovery lock: db %u rel %u", dbOid, relOid);
+		 "adding recovery lock: db %u rel %u",
+		 dbOid, relOid);
 
 	/* dbOid is InvalidOid when we are locking a shared relation. */
 	Assert(OidIsValid(relOid));
 
-	newlock = palloc(sizeof(xl_standby_lock));
+	newlock = palloc(sizeof(RecoveryLockListEntry));
 	newlock->xid = xid;
-	newlock->dbOid = dbOid;
-	newlock->relOid = relOid;
+	newlock->lockmode = mode;
+	memcpy(&newlock->locktag, locktag, sizeof(LOCKTAG));
 	RecoveryLockList = lappend(RecoveryLockList, newlock);
 
 	/*
 	 * Attempt to acquire the lock as requested, if not resolve conflict
 	 */
-	SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
-
-	if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
+	if (LockAcquireExtended(locktag, mode, true, true, false)
 		== LOCKACQUIRE_NOT_AVAIL)
-		ResolveRecoveryConflictWithLock(newlock->dbOid, newlock->relOid);
+		ResolveRecoveryConflictWithLock(locktag, mode);
 }
 
 static void
@@ -610,22 +612,16 @@ StandbyReleaseLocks(TransactionId xid)
 	prev = NULL;
 	for (cell = list_head(RecoveryLockList); cell; cell = next)
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+		RecoveryLockListEntry *lock = (RecoveryLockListEntry *) lfirst(cell);
 
 		next = lnext(cell);
 
 		if (!TransactionIdIsValid(xid) || lock->xid == xid)
 		{
-			LOCKTAG		locktag;
-
-			elog(trace_recovery(DEBUG4),
-				 "releasing recovery lock: xid %u db %u rel %u",
-				 lock->xid, lock->dbOid, lock->relOid);
-			SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-			if (!LockRelease(&locktag, AccessExclusiveLock, true))
+			if (!LockRelease(&lock->locktag, lock->lockmode, true))
 				elog(LOG,
-					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-					 lock->xid, lock->dbOid, lock->relOid);
+					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u",
+					 lock->xid);
 
 			RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
 			pfree(lock);
@@ -662,25 +658,24 @@ StandbyReleaseAllLocks(void)
 	ListCell   *cell,
 			   *prev,
 			   *next;
-	LOCKTAG		locktag;
 
 	elog(trace_recovery(DEBUG2), "release all standby locks");
 
 	prev = NULL;
 	for (cell = list_head(RecoveryLockList); cell; cell = next)
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+		RecoveryLockListEntry *lock = (RecoveryLockListEntry *) lfirst(cell);
 
 		next = lnext(cell);
 
 		elog(trace_recovery(DEBUG4),
-			 "releasing recovery lock: xid %u db %u rel %u",
-			 lock->xid, lock->dbOid, lock->relOid);
-		SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-		if (!LockRelease(&locktag, AccessExclusiveLock, true))
+			 "releasing recovery lock for xid %u",
+			 lock->xid);
+
+		if (!LockRelease(&lock->locktag, lock->lockmode, true))
 			elog(LOG,
-				 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-				 lock->xid, lock->dbOid, lock->relOid);
+				 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u",
+				 lock->xid);
 		RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
 		pfree(lock);
 	}
@@ -697,12 +692,11 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 	ListCell   *cell,
 			   *prev,
 			   *next;
-	LOCKTAG		locktag;
 
 	prev = NULL;
 	for (cell = list_head(RecoveryLockList); cell; cell = next)
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+		RecoveryLockListEntry *lock = (RecoveryLockListEntry *) lfirst(cell);
 		bool		remove = false;
 
 		next = lnext(cell);
@@ -735,13 +729,13 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 		if (remove)
 		{
 			elog(trace_recovery(DEBUG4),
-				 "releasing recovery lock: xid %u db %u rel %u",
-				 lock->xid, lock->dbOid, lock->relOid);
-			SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-			if (!LockRelease(&locktag, AccessExclusiveLock, true))
+				 "releasing recovery lock: xid %u",
+				 lock->xid);
+
+			if (!LockRelease(&lock->locktag, lock->lockmode, true))
 				elog(LOG,
-					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-					 lock->xid, lock->dbOid, lock->relOid);
+					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u",
+					 lock->xid);
 			RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
 			pfree(lock);
 		}
@@ -776,9 +770,16 @@ standby_redo(XLogReaderState *record)
 		int			i;
 
 		for (i = 0; i < xlrec->nlocks; i++)
-			StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
-											  xlrec->locks[i].dbOid,
-											  xlrec->locks[i].relOid);
+		{
+			LOCKTAG locktag;
+
+			SET_LOCKTAG_RELATION(locktag,
+								 xlrec->locks[i].dbOid,
+								 xlrec->locks[i].relOid);
+			StandbyAcquireLock(xlrec->locks[i].xid,
+							   &locktag,
+							   AccessExclusiveLock);
+		}
 	}
 	else if (info == XLOG_RUNNING_XACTS)
 	{
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index d13a167..c428b38 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -248,6 +248,37 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
 }
 
 /*
+ *		LockRelationOidForSession
+ *
+ * Lock a relation in session mode, without requiring having it opened it in
+ * transaction local mode. That's likely only useful during WAL replay.
+ */
+void
+LockRelationOidForSession(Oid relid, LOCKMODE lockmode)
+{
+	LOCKTAG		tag;
+
+	SetLocktagRelationOid(&tag, relid);
+
+	(void) LockAcquire(&tag, lockmode, true, false);
+}
+
+/*
+ *		UnlockRelationOidForSession
+ *
+ * Unlock lock acquired by LockRelationOidForSession.
+ */
+void
+UnlockRelationOidForSession(Oid relid, LOCKMODE lockmode)
+{
+	LOCKTAG		tag;
+
+	SetLocktagRelationOid(&tag, relid);
+
+	LockRelease(&tag, lockmode, true);
+}
+
+/*
  *		LockHasWaitersRelation
  *
  * This is a functiion to check if someone else is waiting on a
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 61c8d21..f051ad6 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -710,13 +710,16 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	if (RecoveryInProgress() && !InRecovery &&
 		(locktag->locktag_type == LOCKTAG_OBJECT ||
 		 locktag->locktag_type == LOCKTAG_RELATION) &&
-		lockmode > RowExclusiveLock)
+		lockmode > RowExclusiveLock &&
+		!sessionLock)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
 						lockMethodTable->lockModeNames[lockmode]),
 				 errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));
 
+	Assert(!InRecovery || (sessionLock && dontWait));
+
 #ifdef LOCK_DEBUG
 	if (LOCK_DEBUG_ENABLED(locktag))
 		elog(LOG, "LockAcquire: lock [%u,%u] %s",
@@ -2804,6 +2807,8 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
 		 * on this lockable object.
 		 */
 		LWLockRelease(partitionLock);
+		vxids[count].backendId = InvalidBackendId;
+		vxids[count].localTransactionId = InvalidLocalTransactionId;
 		return vxids;
 	}
 
@@ -2857,6 +2862,8 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
 	if (count > MaxBackends)	/* should never happen */
 		elog(PANIC, "too many conflicting locks found");
 
+	vxids[count].backendId = InvalidBackendId;
+	vxids[count].localTransactionId = InvalidLocalTransactionId;
 	return vxids;
 }
 
@@ -3857,9 +3864,7 @@ lock_twophase_standby_recover(TransactionId xid, uint16 info,
 	if (lockmode == AccessExclusiveLock &&
 		locktag->locktag_type == LOCKTAG_RELATION)
 	{
-		StandbyAcquireAccessExclusiveLock(xid,
-										locktag->locktag_field1 /* dboid */ ,
-									  locktag->locktag_field2 /* reloid */ );
+		StandbyAcquireLock(xid, locktag, AccessExclusiveLock);
 	}
 }
 
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 1f5cf06..dfe2322 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -886,7 +886,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 			ereport(FATAL,
 					(errcode(ERRCODE_UNDEFINED_DATABASE),
 					 errmsg("database \"%s\" does not exist", dbname),
-			   errdetail("It seems to have just been dropped or renamed.")));
+			   errdetail("It seems to have just been dropped, moved or renamed.")));
 	}
 
 	/*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 138deaf..f3893f3 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -253,6 +253,7 @@ extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast,
 extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive,
 				  TimeLineID *stoptli_p);
 extern void do_pg_abort_backup(void);
+extern bool LocalBaseBackupInProgress(void);
 
 /* File path names (all relative to $PGDATA) */
 #define BACKUP_LABEL_FILE		"backup_label"
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index f5d70e5..2397f11 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -50,6 +50,9 @@ extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode);
 extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
 extern void UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
 
+extern void LockRelationOidForSession(Oid relid, LOCKMODE lockmode);
+extern void UnlockRelationOidForSession(Oid relid, LOCKMODE lockmode);
+
 /* Lock a relation for extension */
 extern void LockRelationForExtension(Relation relation, LOCKMODE lockmode);
 extern void UnlockRelationForExtension(Relation relation, LOCKMODE lockmode);
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index c32c963..f711281 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -45,7 +45,7 @@ extern void StandbyTimeoutHandler(void);
  * to make hot standby work. That includes logging AccessExclusiveLocks taken
  * by transactions and running-xacts snapshots.
  */
-extern void StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid);
+extern void StandbyAcquireLock(TransactionId xid, LOCKTAG *tag, LOCKMODE mode);
 extern void StandbyReleaseLockTree(TransactionId xid,
 					   int nsubxids, TransactionId *subxids);
 extern void StandbyReleaseAllLocks(void);
-- 
2.0.0.rc2.4.g1dc51c6.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to