On 17/12/16 18:34, Steve Singer wrote:
> On 12/16/2016 07:49 AM, Petr Jelinek wrote:
>> Hi,
>>
>> attached is version 13 of the patch.
>>
>> I merged in changes from PeterE. And did following changes:
>> - fixed the ownership error messages for both provider and subscriber
>> - added ability to send invalidation message to invalidate whole
>> relcache and use it in publication code
>> - added the post creation/alter/drop hooks
>> - removed parts of docs that refer to initial sync (which does not exist
>> yet)
>> - added timeout handling/retry, etc to apply/launcher based on the GUCs
>> that exist for wal receiver (they could use renaming though)
>> - improved feedback behavior
>> - apply worker now uses owner of the subscription as connection user
>> - more tests
>> - check for max_replication_slots in launcher
>> - clarify the update 'K' sub-message description in protocol
> 
> A few things I've noticed so far
> 
> If I shutdown the publisher I see the following in the log
> 
> 2016-12-17 11:33:49.548 EST [1891] LOG:  worker process: ?)G? (PID 1987)
> exited with exit code 1
> 
> but then if I shutdown the subscriber postmaster and restart it switches to
> 2016-12-17 11:43:09.628 EST [2373] LOG:  worker process: ???? (PID 2393)
> exited with exit code 1
> 
> Not sure where the 'G' was coming from (other times I have seen an 'I'
> here or other random characters)
> 

Uninitialized bgw_name for apply worker. Rather silly bug. Fixed.

> 
> I don't think we are cleaning up subscriptions on a drop database
> 
> If I do the following
> 
> 1) Create a subscription in a new database
> 2) Stop the publisher
> 3) Drop the database on the subscriber
> 
> test=# create subscription mysuba connection 'host=localhost dbname=test
> port=5440' publication mypub;
> test=# \c b
> b=# drop database test;
> DROP DATABASE
> b=# select * FROM pg_subscription ;
>  subdbid | subname | subowner | subenabled | subconninfo              |
> subslotname | subpublications
> ---------+---------+----------+------------+--------------------------------------+-------------+-----------------
> 
>    16384 | mysuba  |       10 | t          | host=localhost dbname=test
> port=5440 | mysuba      | {mypub}
> 

Good one. I added check that prevents dropping database when there is
subscription defined for it. I think we can't cascade here as
subscription may or may not hold resources (slot) in another
instance/database so preventing the drop is best we can do.

> 
> Also I don't think I can now drop mysuba
> b=# drop subscription mysuba;
> ERROR:  subscription "mysuba" does not exist
> 

Yeah subscriptions are per database.

I don't want to make v14 just for these 2 changes as that would make
life harder for anybody code-reviewing the v13 so attached is diff with
above fixes that applies on top of v13.

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 8be9f39..2c58cc6 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -107,7 +107,41 @@ GetSubscription(Oid subid, bool missing_ok)
 }
 
 /*
- * Free memory allocated by subscription struct. */
+ * Return number of subscriptions defined in given database.
+ * Used by dropdb() to check if database can indeed be dropped.
+ */
+int
+CountDBSubscriptions(Oid dbid)
+{
+	int				nsubs = 0;
+	Relation		rel;
+	ScanKeyData		scankey;
+	SysScanDesc		scan;
+	HeapTuple		tup;
+
+	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
+
+	ScanKeyInit(&scankey,
+				Anum_pg_subscription_subdbid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(dbid));
+
+	scan = systable_beginscan(rel, InvalidOid, false,
+							  NULL, 1, &scankey);
+
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+		nsubs++;
+
+	systable_endscan(scan);
+
+	heap_close(rel, NoLock);
+
+	return nsubs;
+}
+
+/*
+ * Free memory allocated by subscription struct.
+ */
 void
 FreeSubscription(Subscription *sub)
 {
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 0919ad8..45d152c 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -37,6 +37,7 @@
 #include "catalog/pg_authid.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_subscription.h"
 #include "catalog/pg_tablespace.h"
 #include "commands/comment.h"
 #include "commands/dbcommands.h"
@@ -790,6 +791,7 @@ dropdb(const char *dbname, bool missing_ok)
 	int			npreparedxacts;
 	int			nslots,
 				nslots_active;
+	int			nsubscriptions;
 
 	/*
 	 * Look up the target database's OID, and get exclusive lock on it. We
@@ -875,6 +877,21 @@ dropdb(const char *dbname, bool missing_ok)
 				 errdetail_busy_db(notherbackends, npreparedxacts)));
 
 	/*
+	 * Check if there are subscriptions defined in the target database.
+	 *
+	 * We can't drop them automatically because they might be holding
+	 * resources in other databases/instances.
+	 */
+	if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_IN_USE),
+				 errmsg("database \"%s\" is being used by logical replication subscription",
+						dbname),
+				 errdetail_plural("There is %d subscription.",
+								  "There are %d subscriptions.",
+								  nsubscriptions, nsubscriptions)));
+
+	/*
 	 * Remove the database's tuple from pg_database.
 	 */
 	tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index fe30dda..bd865ef 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -370,6 +370,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt)
 
 	InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
 
+	ApplyLauncherWakeupAtCommit();
+
 	return myself;
 }
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 0f04cb3..783d97e 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -280,6 +280,8 @@ logicalrep_worker_launch(Oid dbid, Oid subid, Oid userid)
 		BGWORKER_BACKEND_DATABASE_CONNECTION;
 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 	bgw.bgw_main = ApplyWorkerMain;
+	snprintf(bgw.bgw_name, BGW_MAXLEN,
+			 "logical replication worker %u", subid);
 
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
 	bgw.bgw_notify_pid = MyProcPid;
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index f6a3bac..057b36e 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -77,4 +77,6 @@ extern Subscription *GetSubscription(Oid subid, bool missing_ok);
 extern void FreeSubscription(Subscription *sub);
 extern Oid get_subscription_oid(const char *subname, bool missing_ok);
 
+extern int CountDBSubscriptions(Oid dbid);
+
 #endif   /* PG_SUBSCRIPTION_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to