On 27 March 2017 at 14:08, Craig Ringer <cr...@2ndquadrant.com> wrote:

> So this patch makes ReplicationSlotAcquire check that the slot
> database matches the current database and refuse to acquire the slot
> if it does not.

New patch attached that drops above requirement, so slots can still be
dropped from any DB.

This introduces a narrow race window where DROP DATABASE may ERROR if
somebody connects to a different database and runs a
pg_drop_replication_slot(...) for one of the slots being dropped by
DROP DATABASE after we check for active slots but before we've dropped
the slot. But it's hard to hit and it's pretty harmless; the worst
possible result is dropping one or more of the slots before we ERROR
out of the DROP. But you clearly didn't want them anyway, since you
were dropping the DB and dropping some slots at the same time.

I think this one's ready to go.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 99d5313d3a265bcc57ca6845230b9ec49d188710 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 22 Mar 2017 13:21:09 +0800
Subject: [PATCH] Make DROP DATABASE drop logical slots for the DB

Automatically drop all logical replication slots associated with a
database when the database is dropped.
---
 doc/src/sgml/func.sgml                             |  3 +-
 doc/src/sgml/protocol.sgml                         |  2 +
 src/backend/commands/dbcommands.c                  | 32 +++++---
 src/backend/replication/slot.c                     | 88 ++++++++++++++++++++++
 src/include/replication/slot.h                     |  1 +
 src/test/recovery/t/006_logical_decoding.pl        | 40 +++++++++-
 .../recovery/t/010_logical_decoding_timelines.pl   | 30 +++++++-
 7 files changed, 182 insertions(+), 14 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index ba6f8dd..78508d7 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -18876,7 +18876,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
        <entry>
         Drops the physical or logical replication slot
         named <parameter>slot_name</parameter>. Same as replication protocol
-        command <literal>DROP_REPLICATION_SLOT</>.
+        command <literal>DROP_REPLICATION_SLOT</>. For logical slots, this must
+        be called when connected to the same database the slot was created on.
        </entry>
       </row>
 
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index b3a5026..5f97141 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2034,6 +2034,8 @@ The commands accepted in walsender mode are:
      <para>
       Drops a replication slot, freeing any reserved server-side resources. If
       the slot is currently in use by an active connection, this command fails.
+      If the slot is a logical slot that was created in a database other than
+      the database the walsender is connected to, this command fails.
      </para>
      <variablelist>
       <varlistentry>
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 5a63b1a..c0ba2b4 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -845,19 +845,22 @@ dropdb(const char *dbname, bool missing_ok)
 				 errmsg("cannot drop the currently open database")));
 
 	/*
-	 * Check whether there are, possibly unconnected, logical slots that refer
-	 * to the to-be-dropped database. The database lock we are holding
-	 * prevents the creation of new slots using the database.
+	 * Check whether there are active logical slots that refer to the
+	 * to-be-dropped database. The database lock we are holding prevents the
+	 * creation of new slots using the database or existing slots becoming
+	 * active.
 	 */
-	if (ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active))
+	(void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
+	if (nslots_active)
+	{
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_IN_USE),
-			  errmsg("database \"%s\" is used by a logical replication slot",
+			  errmsg("database \"%s\" is used by an active logical replication slot",
 					 dbname),
-				 errdetail_plural("There is %d slot, %d of them active.",
-								  "There are %d slots, %d of them active.",
-								  nslots,
-								  nslots, nslots_active)));
+				 errdetail_plural("There is %d active slot",
+								  "There are %d active slots",
+								  nslots_active, nslots_active)));
+	}
 
 	/*
 	 * Check for other backends in the target database.  (Because we hold the
@@ -915,6 +918,11 @@ dropdb(const char *dbname, bool missing_ok)
 	dropDatabaseDependencies(db_id);
 
 	/*
+	 * Drop db-specific replication slots.
+	 */
+	ReplicationSlotsDropDBSlots(db_id);
+
+	/*
 	 * Drop pages for this database that are in the shared buffer cache. This
 	 * is important to ensure that no remaining backend tries to write out a
 	 * dirty buffer to the dead database later...
@@ -2124,11 +2132,17 @@ dbase_redo(XLogReaderState *record)
 			 * InitPostgres() cannot fully re-execute concurrently. This
 			 * avoids backends re-connecting automatically to same database,
 			 * which can happen in some cases.
+			 *
+			 * This will lock out walsenders trying to connect to db-specific
+			 * slots for logical decoding too, so it's safe for us to drop slots.
 			 */
 			LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
 			ResolveRecoveryConflictWithDatabase(xlrec->db_id);
 		}
 
+		/* Drop any database-specific replication slots */
+		ReplicationSlotsDropDBSlots(xlrec->db_id);
+
 		/* Drop pages for this database that are in the shared buffer cache */
 		DropDatabaseBuffers(xlrec->db_id);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 5237a9f..d075eda 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -796,6 +796,94 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 	return false;
 }
 
+/*
+ * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
+ * passed database oid. The caller should hold an exclusive lock on the
+ * pg_database oid for the database to prevent creation of new slots on the db
+ * or replay from existing slots.
+ *
+ * This routine isn't as efficient as it could be - but we don't drop databases
+ * often, especially databases with lots of slots.
+ *
+ * Another session that concurrently acquires an existing slot on the target DB
+ * (most likely to drop it) may cause this function to ERROR. If that happens
+ * it may have dropped some but not all slots.
+ */
+void
+ReplicationSlotsDropDBSlots(Oid dboid)
+{
+	int			i;
+
+	if (max_replication_slots <= 0)
+		return;
+
+restart:
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s;
+		NameData slotname;
+		int active_pid;
+
+		s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* cannot change while ReplicationSlotCtlLock is held */
+		if (!s->in_use)
+			continue;
+
+		/* only logical slots are database specific, skip */
+		if (!SlotIsLogical(s))
+			continue;
+
+		/* not our database, skip */
+		if (s->data.database != dboid)
+			continue;
+
+		/* Claim the slot, as if ReplicationSlotAcquire()ing. */
+		SpinLockAcquire(&s->mutex);
+		strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
+		NameStr(slotname)[NAMEDATALEN-1] = '\0';
+		active_pid = s->active_pid;
+		if (active_pid == 0)
+		{
+			MyReplicationSlot = s;
+			s->active_pid = MyProcPid;
+		}
+		SpinLockRelease(&s->mutex);
+
+		/*
+		 * We might fail here if the slot was active. Even though we hold an
+		 * exclusive lock on the database object a logical slot for that DB can
+		 * still be active if it's being dropped by a backend connected to
+		 * another DB or is otherwise acquired.
+		 *
+		 * It's an unlikely race that'll only arise from concurrent user action,
+		 * so we'll just bail out.
+		 */
+		if (active_pid)
+			elog(ERROR, "replication slot %s is in use by pid %d",
+			 	 NameStr(slotname), active_pid);
+
+		/*
+		 * To avoid largely duplicating ReplicationSlotDropAcquired() or
+		 * complicating it with already_locked flags for ProcArrayLock,
+		 * ReplicationSlotControlLock and ReplicationSlotAllocationLock, we
+		 * just release our ReplicationSlotControlLock to drop the slot.
+		 *
+		 * For safety we'll restart our scan from the beginning each
+		 * time we release the lock.
+		 */
+		LWLockRelease(ReplicationSlotControlLock);
+		ReplicationSlotDropAcquired();
+		goto restart;
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/* recompute limits once after all slots are dropped */
+	ReplicationSlotsComputeRequiredXmin(false);
+	ReplicationSlotsComputeRequiredLSN();
+}
+
 
 /*
  * Check whether the server's configuration supports using replication
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 62cacdb..9a2dbd7 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -177,6 +177,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
+extern void ReplicationSlotsDropDBSlots(Oid dboid);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 66d5e4a..bf9b50a 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 5;
+use Test::More tests => 16;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -54,7 +54,7 @@ my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logi
 is($stdout_sql, $expected, 'got expected output from SQL decoding session');
 
 my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
-diag "waiting to replay $endpos";
+print "waiting to replay $endpos\n";
 
 my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
 chomp($stdout_recv);
@@ -64,5 +64,41 @@ $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpo
 chomp($stdout_recv);
 is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
 
+$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3,
+	'replaying logical slot from another database fails');
+
+$node_master->safe_psql('otherdb', qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]);
+
+# make sure you can't drop a slot while active
+my $pg_recvlogical = IPC::Run::start(['pg_recvlogical', '-d', $node_master->connstr('otherdb'), '-S', 'otherdb_slot', '-f', '-', '--start']);
+$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)");
+is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 3,
+	'dropping a DB with inactive logical slots fails');
+$pg_recvlogical->kill_kill;
+is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
+	'logical slot still exists');
+
+$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)");
+is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 0,
+	'dropping a DB with inactive logical slots succeeds');
+is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
+	'logical slot was actually dropped with DB');
+
+# Restarting a node with wal_level = logical that has existing
+# slots must succeed, but decoding from those slots must fail.
+$node_master->safe_psql('postgres', 'ALTER SYSTEM SET wal_level = replica');
+is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'logical', 'wal_level is still logical before restart');
+$node_master->restart;
+is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'replica', 'wal_level is replica');
+isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
+	'restored slot catalog_xmin is nonzero');
+is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
+	'reading from slot with wal_level < logical fails');
+is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
+	'can drop logical slot while wal_level = replica');
+is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
+
 # done with the node
 $node_master->stop;
diff --git a/src/test/recovery/t/010_logical_decoding_timelines.pl b/src/test/recovery/t/010_logical_decoding_timelines.pl
index 4561a06..b618132 100644
--- a/src/test/recovery/t/010_logical_decoding_timelines.pl
+++ b/src/test/recovery/t/010_logical_decoding_timelines.pl
@@ -15,12 +15,15 @@
 # This module uses the first approach to show that timeline following
 # on a logical slot works.
 #
+# (For convenience, it also tests some recovery-related operations
+# on logical slots).
+#
 use strict;
 use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 10;
+use Test::More tests => 13;
 use RecursiveCopy;
 use File::Copy;
 use IPC::Run ();
@@ -50,6 +53,16 @@ $node_master->safe_psql('postgres',
 $node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
 $node_master->safe_psql('postgres',
 	"INSERT INTO decoding(blah) VALUES ('beforebb');");
+
+# We also want to verify that DROP DATABASE on a standby with a logical
+# slot works. This isn't strictly related to timeline following, but
+# the only way to get a logical slot on a standby right now is to use
+# the same physical copy trick, so:
+$node_master->safe_psql('postgres', 'CREATE DATABASE dropme;');
+$node_master->safe_psql('dropme',
+"SELECT pg_create_logical_replication_slot('dropme_slot', 'test_decoding');"
+);
+
 $node_master->safe_psql('postgres', 'CHECKPOINT;');
 
 my $backup_name = 'b1';
@@ -68,6 +81,17 @@ $node_replica->append_conf(
 
 $node_replica->start;
 
+# If we drop 'dropme' on the master, the standby should drop the
+# db and associated slot.
+is($node_master->psql('postgres', 'DROP DATABASE dropme'), 0,
+	'dropped DB with logical slot OK on master');
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert'));
+is($node_replica->safe_psql('postgres', q[SELECT 1 FROM pg_database WHERE datname = 'dropme']), '',
+	'dropped DB dropme on standby');
+is($node_master->slot('dropme_slot')->{'slot_name'}, undef,
+	'logical slot was actually dropped on standby');
+
+# Back to testing failover...
 $node_master->safe_psql('postgres',
 "SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
 );
@@ -99,10 +123,13 @@ isnt($phys_slot->{'catalog_xmin'}, '',
 cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
 	   'xmin on physical slot must not be lower than catalog_xmin');
 
+$node_master->safe_psql('postgres', 'CHECKPOINT');
+
 # Boom, crash
 $node_master->stop('immediate');
 
 $node_replica->promote;
+print "waiting for replica to come up\n";
 $node_replica->poll_query_until('postgres',
 	"SELECT NOT pg_is_in_recovery();");
 
@@ -154,5 +181,4 @@ $stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup',
 chomp($stdout);
 is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
 
-# We don't need the standby anymore
 $node_replica->teardown_node();
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to