On 17/12/16 18:34, Steve Singer wrote: > On 12/16/2016 07:49 AM, Petr Jelinek wrote: >> Hi, >> >> attached is version 13 of the patch. >> >> I merged in changes from PeterE. And did following changes: >> - fixed the ownership error messages for both provider and subscriber >> - added ability to send invalidation message to invalidate whole >> relcache and use it in publication code >> - added the post creation/alter/drop hooks >> - removed parts of docs that refer to initial sync (which does not exist >> yet) >> - added timeout handling/retry, etc to apply/launcher based on the GUCs >> that exist for wal receiver (they could use renaming though) >> - improved feedback behavior >> - apply worker now uses owner of the subscription as connection user >> - more tests >> - check for max_replication_slots in launcher >> - clarify the update 'K' sub-message description in protocol > > A few things I've noticed so far > > If I shutdown the publisher I see the following in the log > > 2016-12-17 11:33:49.548 EST [1891] LOG: worker process: ?)G? (PID 1987) > exited with exit code 1 > > but then if I shutdown the subscriber postmaster and restart it switches to > 2016-12-17 11:43:09.628 EST [2373] LOG: worker process: ???? (PID 2393) > exited with exit code 1 > > Not sure where the 'G' was coming from (other times I have seen an 'I' > here or other random characters) >
Uninitialized bgw_name for apply worker. Rather silly bug. Fixed. > > I don't think we are cleaning up subscriptions on a drop database > > If I do the following > > 1) Create a subscription in a new database > 2) Stop the publisher > 3) Drop the database on the subscriber > > test=# create subscription mysuba connection 'host=localhost dbname=test > port=5440' publication mypub; > test=# \c b > b=# drop database test; > DROP DATABASE > b=# select * FROM pg_subscription ; > subdbid | subname | subowner | subenabled | subconninfo | > subslotname | subpublications > ---------+---------+----------+------------+--------------------------------------+-------------+----------------- > > 16384 | mysuba | 10 | t | host=localhost dbname=test > port=5440 | mysuba | {mypub} > Good one. I added check that prevents dropping database when there is subscription defined for it. I think we can't cascade here as subscription may or may not hold resources (slot) in another instance/database so preventing the drop is best we can do. > > Also I don't think I can now drop mysuba > b=# drop subscription mysuba; > ERROR: subscription "mysuba" does not exist > Yeah subscriptions are per database. I don't want to make v14 just for these 2 changes as that would make life harder for anybody code-reviewing the v13 so attached is diff with above fixes that applies on top of v13. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 8be9f39..2c58cc6 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -107,7 +107,41 @@ GetSubscription(Oid subid, bool missing_ok) } /* - * Free memory allocated by subscription struct. */ + * Return number of subscriptions defined in given database. + * Used by dropdb() to check if database can indeed be dropped. + */ +int +CountDBSubscriptions(Oid dbid) +{ + int nsubs = 0; + Relation rel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); + + ScanKeyInit(&scankey, + Anum_pg_subscription_subdbid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(dbid)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, 1, &scankey); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) + nsubs++; + + systable_endscan(scan); + + heap_close(rel, NoLock); + + return nsubs; +} + +/* + * Free memory allocated by subscription struct. + */ void FreeSubscription(Subscription *sub) { diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 0919ad8..45d152c 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -37,6 +37,7 @@ #include "catalog/pg_authid.h" #include "catalog/pg_database.h" #include "catalog/pg_db_role_setting.h" +#include "catalog/pg_subscription.h" #include "catalog/pg_tablespace.h" #include "commands/comment.h" #include "commands/dbcommands.h" @@ -790,6 +791,7 @@ dropdb(const char *dbname, bool missing_ok) int npreparedxacts; int nslots, nslots_active; + int nsubscriptions; /* * Look up the target database's OID, and get exclusive lock on it. We @@ -875,6 +877,21 @@ dropdb(const char *dbname, bool missing_ok) errdetail_busy_db(notherbackends, npreparedxacts))); /* + * Check if there are subscriptions defined in the target database. + * + * We can't drop them automatically because they might be holding + * resources in other databases/instances. + */ + if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("database \"%s\" is being used by logical replication subscription", + dbname), + errdetail_plural("There is %d subscription.", + "There are %d subscriptions.", + nsubscriptions, nsubscriptions))); + + /* * Remove the database's tuple from pg_database. */ tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id)); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index fe30dda..bd865ef 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -370,6 +370,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt) InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0); + ApplyLauncherWakeupAtCommit(); + return myself; } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 0f04cb3..783d97e 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -280,6 +280,8 @@ logicalrep_worker_launch(Oid dbid, Oid subid, Oid userid) BGWORKER_BACKEND_DATABASE_CONNECTION; bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; bgw.bgw_main = ApplyWorkerMain; + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication worker %u", subid); bgw.bgw_restart_time = BGW_NEVER_RESTART; bgw.bgw_notify_pid = MyProcPid; diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index f6a3bac..057b36e 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -77,4 +77,6 @@ extern Subscription *GetSubscription(Oid subid, bool missing_ok); extern void FreeSubscription(Subscription *sub); extern Oid get_subscription_oid(const char *subname, bool missing_ok); +extern int CountDBSubscriptions(Oid dbid); + #endif /* PG_SUBSCRIPTION_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers