(was away for a while, got some time now for this again) On 22/06/17 04:43, Peter Eisentraut wrote: > The alternative is that we use the LockSharedObject() approach that was > already alluded to, like in the attached patch. Both approaches would > work equally fine AFAICT.
I agree, but I think we need bigger overhaul of the locking/management in general. So here is patch which does much more changes. The patch does several important things (in no particular order): - Split SetSubscriptionRelState into AddSubscriptionRelState and UpdateSubscriptionRelState for the reasons said upstream, it's cleaner, there is no half-broken upsert logic and it has proper error checking for each action. - Do LockSharedObject in ALTER SUBSCRIPTION, DROP SUBSCRIPTION (this one is preexisting but mentioning it for context), SetSubscriptionRelState, AddSubscriptionRelState, and in the logicalrep_worker_launch. This means we use granular per object locks to deal with concurrency. - Because of above, the AccessExclusiveLock on pg_subscription is no longer needed, just normal RowExlusiveLock is used now. - logicalrep_worker_stop is also simplified due to the proper locking - There is new interface logicalrep_worker_stop_at_commit which is used by ALTER SUBSCRIPTION ... REFRESH PUBLICATION and by transactional variant of DROP SUBSCRIPTION to only kill workers at the end of transaction. - Locking/reading of subscription info is unified between DROP and ALTER SUBSCRIPTION commands. - DROP SUBSCRIPTION will kill all workers associated with subscription, not just apply. - The sync worker checks during startup if the relation is still subscribed. - The sync worker will exit when waiting for apply and apply has shut-down. - The log messages around workers and removed or disabled subscription are now more consistent between startup and normal runtime of the worker. - Some code deduplication and stylistic changes/simplification in related areas. - Fixed catcache's IndexScanOK() handling of the subscription catalog. It's bit bigger patch but solves issues from multiple threads around handling of ALTER/DROP subscription. A lot of the locking that I added is normally done transparently by dependency handling, but subscriptions and subscription relation status do not use that much as it was deemed to bloat pg_depend needlessly during the original patch review (it's also probably why this has slipped through). -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From d7038474012769c9c3b50231af76dd7796fe593f Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmo...@pjmodos.net> Date: Sat, 24 Jun 2017 19:38:21 +0200 Subject: [PATCH] Rework subscription worker and relation status handling --- src/backend/catalog/pg_subscription.c | 137 +++++++------ src/backend/commands/subscriptioncmds.c | 98 +++++----- src/backend/replication/logical/launcher.c | 293 +++++++++++++++------------- src/backend/replication/logical/tablesync.c | 97 +++++---- src/backend/replication/logical/worker.c | 23 ++- src/backend/utils/cache/catcache.c | 6 +- src/include/catalog/pg_subscription_rel.h | 6 +- src/include/replication/worker_internal.h | 6 +- 8 files changed, 367 insertions(+), 299 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index c69c461..b643e54 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -28,6 +28,8 @@ #include "nodes/makefuncs.h" +#include "storage/lmgr.h" + #include "utils/array.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -225,84 +227,101 @@ textarray_to_stringlist(ArrayType *textarray) } /* - * Set the state of a subscription table. - * - * If update_only is true and the record for given table doesn't exist, do - * nothing. This can be used to avoid inserting a new record that was deleted - * by someone else. Generally, subscription DDL commands should use false, - * workers should use true. - * - * The insert-or-update logic in this function is not concurrency safe so it - * might raise an error in rare circumstances. But if we took a stronger lock - * such as ShareRowExclusiveLock, we would risk more deadlocks. + * Add new state record for a subscription table. */ Oid -SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn, bool update_only) +AddSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) { Relation rel; HeapTuple tup; - Oid subrelid = InvalidOid; + Oid subrelid; bool nulls[Natts_pg_subscription_rel]; Datum values[Natts_pg_subscription_rel]; + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock); /* Try finding existing mapping. */ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(subid)); + if (HeapTupleIsValid(tup)) + elog(ERROR, "subscription table %u in subscription %u already exists", + relid, subid); - /* - * If the record for given table does not exist yet create new record, - * otherwise update the existing one. - */ - if (!HeapTupleIsValid(tup) && !update_only) - { - /* Form the tuple. */ - memset(values, 0, sizeof(values)); - memset(nulls, false, sizeof(nulls)); - values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); - values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); - values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); - if (sublsn != InvalidXLogRecPtr) - values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); - else - nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; - - tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); - - /* Insert tuple into catalog. */ - subrelid = CatalogTupleInsert(rel, tup); - - heap_freetuple(tup); - } - else if (HeapTupleIsValid(tup)) - { - bool replaces[Natts_pg_subscription_rel]; + /* Form the tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); + values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); + values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + if (sublsn != InvalidXLogRecPtr) + values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); + else + nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; - /* Update the tuple. */ - memset(values, 0, sizeof(values)); - memset(nulls, false, sizeof(nulls)); - memset(replaces, false, sizeof(replaces)); + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); - replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; - values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + /* Insert tuple into catalog. */ + subrelid = CatalogTupleInsert(rel, tup); - replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; - if (sublsn != InvalidXLogRecPtr) - values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); - else - nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + heap_freetuple(tup); - tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, - replaces); + /* Cleanup. */ + heap_close(rel, NoLock); - /* Update the catalog. */ - CatalogTupleUpdate(rel, &tup->t_self, tup); + return subrelid; +} - subrelid = HeapTupleGetOid(tup); - } +/* + * Update the state of a subscription table. + */ +Oid +UpdateSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) +{ + Relation rel; + HeapTuple tup; + Oid subrelid; + bool nulls[Natts_pg_subscription_rel]; + Datum values[Natts_pg_subscription_rel]; + bool replaces[Natts_pg_subscription_rel]; + + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + + rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock); + + /* Try finding existing mapping. */ + tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(subid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription table %u in subscription %u does not exist", + relid, subid); + + /* Update the tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; + values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + + replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; + if (sublsn != InvalidXLogRecPtr) + values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); + else + nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + subrelid = HeapTupleGetOid(tup); /* Cleanup. */ heap_close(rel, NoLock); @@ -377,6 +396,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid) HeapTuple tup; int nkeys = 0; + Assert(OidIsValid(subid) || OidIsValid(relid)); + rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock); if (OidIsValid(subid)) @@ -400,9 +421,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid) /* Do the search and delete what we found. */ scan = heap_beginscan_catalog(rel, nkeys, skey); while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) - { CatalogTupleDelete(rel, &tup->t_self); - } heap_endscan(scan); heap_close(rel, RowExclusiveLock); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9cbd36f..3dc1f4c 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) CheckSubscriptionRelkind(get_rel_relkind(relid), rv->schemaname, rv->relname); - SetSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr, false); + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); } /* @@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) if (!bsearch(&relid, subrel_local_oids, list_length(subrel_states), sizeof(Oid), oid_cmp)) { - SetSubscriptionRelState(sub->oid, relid, - copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr, false); + AddSubscriptionRelState(sub->oid, relid, + copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, + InvalidXLogRecPtr); ereport(NOTICE, (errmsg("added subscription for table %s.%s", quote_identifier(rv->schemaname), @@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); + logicalrep_worker_stop_at_commit(sub->oid, relid); namespace = get_namespace_name(get_rel_namespace(relid)); ereport(NOTICE, @@ -636,14 +636,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt) errmsg("subscription \"%s\" does not exist", stmt->subname))); + subid = HeapTupleGetOid(tup); + /* must be owner */ - if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId())) + if (!pg_subscription_ownercheck(subid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION, stmt->subname); - subid = HeapTupleGetOid(tup); sub = GetSubscription(subid, false); + /* Lock the subscription so nobody else can do anything with it. */ + LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock); + /* Form a new tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); @@ -811,14 +815,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ObjectAddress myself; HeapTuple tup; Oid subid; - Datum datum; - bool isnull; - char *subname; - char *conninfo; - char *slotname; + List *subworkers; + ListCell *lc; char originname[NAMEDATALEN]; - char *err = NULL; RepOriginId originid; + char *err = NULL; + Subscription *sub; WalReceiverConn *wrconn = NULL; StringInfoData cmd; @@ -826,7 +828,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * Lock pg_subscription with AccessExclusiveLock to ensure that the * launcher doesn't restart new worker during dropping the subscription */ - rel = heap_open(SubscriptionRelationId, AccessExclusiveLock); + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId, CStringGetDatum(stmt->subname)); @@ -858,31 +860,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* DROP hook for the subscription being removed */ InvokeObjectDropHook(SubscriptionRelationId, subid, 0); - /* - * Lock the subscription so nobody else can do anything with it (including - * the replication workers). - */ - LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock); + sub = GetSubscription(subid, false); - /* Get subname */ - datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, - Anum_pg_subscription_subname, &isnull); - Assert(!isnull); - subname = pstrdup(NameStr(*DatumGetName(datum))); - - /* Get conninfo */ - datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, - Anum_pg_subscription_subconninfo, &isnull); - Assert(!isnull); - conninfo = TextDatumGetCString(datum); - - /* Get slotname */ - datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, - Anum_pg_subscription_subslotname, &isnull); - if (!isnull) - slotname = pstrdup(NameStr(*DatumGetName(datum))); - else - slotname = NULL; + /* Lock the subscription so nobody else can do anything with it. */ + LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock); /* * Since dropping a replication slot is not transactional, the replication @@ -894,7 +875,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * of a subscription that is associated with a replication slot", but we * don't have the proper facilities for that. */ - if (slotname) + if (sub->slotname) PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION"); @@ -906,15 +887,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ReleaseSysCache(tup); + /* + * If we are dropping slot, stop all the subscription workers immediately + * so that the slot is accessible, otherwise just shedule the stop at the + * end of the transaction. + * + * New workers won't be started because we hold exclusive lock on the + * subscription till the end of transaction. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + subworkers = logicalrep_sub_workers_find(subid, false); + LWLockRelease(LogicalRepWorkerLock); + foreach (lc, subworkers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + if (sub->slotname) + logicalrep_worker_stop(w->subid, w->relid); + else + logicalrep_worker_stop_at_commit(w->subid, w->relid); + } + list_free(subworkers); + /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); /* Remove any associated relation synchronization states. */ RemoveSubscriptionRel(subid, InvalidOid); - /* Kill the apply worker so that the slot becomes accessible. */ - logicalrep_worker_stop(subid, InvalidOid); - /* Remove the origin tracking if exists. */ snprintf(originname, sizeof(originname), "pg_%u", subid); originid = replorigin_by_name(originname, true); @@ -925,7 +924,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * If there is no slot associated with the subscription, we can finish * here. */ - if (!slotname) + if (!sub->slotname) { heap_close(rel, NoLock); return; @@ -938,13 +937,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) load_file("libpqwalreceiver", false); initStringInfo(&cmd); - appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname)); + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", + quote_identifier(sub->slotname)); - wrconn = walrcv_connect(conninfo, true, subname, &err); + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); if (wrconn == NULL) ereport(ERROR, (errmsg("could not connect to publisher when attempting to " - "drop the replication slot \"%s\"", slotname), + "drop the replication slot \"%s\"", sub->slotname), errdetail("The error was: %s", err), errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) " "to disassociate the subscription from the slot."))); @@ -958,12 +958,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, (errmsg("could not drop the replication slot \"%s\" on publisher", - slotname), + sub->slotname), errdetail("The error was: %s", res->err))); else ereport(NOTICE, (errmsg("dropped replication slot \"%s\" on publisher", - slotname))); + sub->slotname))); walrcv_clear_result(res); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 86a2b14..410ad18 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -42,12 +42,14 @@ #include "replication/worker_internal.h" #include "storage/ipc.h" +#include "storage/lmgr.h" #include "storage/proc.h" #include "storage/procarray.h" #include "storage/procsignal.h" #include "tcop/tcopprot.h" +#include "utils/inval.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/ps_status.h" @@ -73,6 +75,14 @@ typedef struct LogicalRepCtxStruct LogicalRepCtxStruct *LogicalRepCtx; +typedef struct LogicalRepWorkerId +{ + Oid subid; + Oid relid; +} LogicalRepWorkerId; + +static List *on_commit_stop_workers = NIL; + static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); @@ -86,12 +96,11 @@ static bool on_commit_launcher_wakeup = false; Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); - /* * Load the list of subscriptions. * - * Only the fields interesting for worker start/stop functions are filled for - * each subscription. + * Only the fields interesting for worker start are filled for each + * subscription. */ static List * get_subscription_list(void) @@ -100,19 +109,13 @@ get_subscription_list(void) Relation rel; HeapScanDesc scan; HeapTuple tup; - MemoryContext resultcxt; - - /* This is the context that we will allocate our output data in */ - resultcxt = CurrentMemoryContext; /* - * Start a transaction so we can access pg_database, and get a snapshot. * We don't have a use for the snapshot itself, but we're interested in * the secondary effect that it sets RecentGlobalXmin. (This is critical * for anything that reads heap pages, because HOT may decide to prune * them even if the process doesn't attempt to modify any tuples.) */ - StartTransactionCommand(); (void) GetTransactionSnapshot(); rel = heap_open(SubscriptionRelationId, AccessShareLock); @@ -121,34 +124,17 @@ get_subscription_list(void) while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) { Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup); - Subscription *sub; - MemoryContext oldcxt; - /* - * Allocate our results in the caller's context, not the - * transaction's. We do this inside the loop, and restore the original - * context at the end, so that leaky things like heap_getnext() are - * not called in a potentially long-lived context. - */ - oldcxt = MemoryContextSwitchTo(resultcxt); - - sub = (Subscription *) palloc0(sizeof(Subscription)); - sub->oid = HeapTupleGetOid(tup); - sub->dbid = subform->subdbid; - sub->owner = subform->subowner; - sub->enabled = subform->subenabled; - sub->name = pstrdup(NameStr(subform->subname)); - /* We don't fill fields we are not interested in. */ - - res = lappend(res, sub); - MemoryContextSwitchTo(oldcxt); + /* We only care about enabled subscriptions. */ + if (!subform->subenabled) + continue; + + res = lappend_oid(res, HeapTupleGetOid(tup)); } heap_endscan(scan); heap_close(rel, AccessShareLock); - CommitTransactionCommand(); - return res; } @@ -250,23 +236,68 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) } /* - * Start new apply background worker. + * Similar as logicalrep_worker_find(), but returns list of all workers + * for the subscription instead just one. + */ +List * +logicalrep_sub_workers_find(Oid subid, bool only_running) +{ + int i; + List *res = NIL; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (w->in_use && w->subid == subid && (!only_running || w->proc)) + res = lappend(res, w); + } + + return res; +} + +/* + * Start new logical replication background worker. */ void -logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, - Oid relid) +logicalrep_worker_launch(Oid subid, Oid relid) { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; int i; int slot = 0; - LogicalRepWorker *worker = NULL; - int nsyncworkers; + List *subworkers; + ListCell *lc; TimestampTz now; + int nsyncworkers = 0; + Subscription *sub; + LogicalRepWorker *worker = NULL; ereport(DEBUG1, - (errmsg("starting logical replication worker for subscription \"%s\"", - subname))); + (errmsg("starting logical replication worker for subscription %u", + subid))); + + /* Block any concurrent DDL on the subscription. */ + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + + /* + * Subscription might have been dropped in meantime, make sure our cache + * is up to date. + */ + AcceptInvalidationMessages(); + + /* Get info about subscription. */ + sub = GetSubscription(subid, true); + if (!sub) + { + ereport(DEBUG1, + (errmsg("subscription %u not found, not starting worker for it", + subid))); + return; + } /* Report this after the initial starting message for consistency. */ if (max_replication_slots == 0) @@ -294,7 +325,14 @@ retry: } } - nsyncworkers = logicalrep_sync_worker_count(subid); + subworkers = logicalrep_sub_workers_find(subid, false); + foreach (lc, subworkers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + if (w->relid != InvalidOid) + nsyncworkers ++; + } + list_free(subworkers); now = GetCurrentTimestamp(); @@ -340,6 +378,7 @@ retry: if (nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); + UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); return; } @@ -350,6 +389,7 @@ retry: if (worker == NULL) { LWLockRelease(LogicalRepWorkerLock); + UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); ereport(WARNING, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("out of logical replication worker slots"), @@ -362,8 +402,8 @@ retry: worker->in_use = true; worker->generation++; worker->proc = NULL; - worker->dbid = dbid; - worker->userid = userid; + worker->dbid = sub->dbid; + worker->userid = sub->owner; worker->subid = subid; worker->relid = relid; worker->relstate = SUBREL_STATE_UNKNOWN; @@ -374,8 +414,6 @@ retry: worker->reply_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->reply_time); - LWLockRelease(LogicalRepWorkerLock); - /* Register the new dynamic worker. */ memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | @@ -394,8 +432,13 @@ retry: bgw.bgw_notify_pid = MyProcPid; bgw.bgw_main_arg = Int32GetDatum(slot); + /* Try to register the worker and cleanup in case of failure. */ if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) { + logicalrep_worker_cleanup(worker); + LWLockRelease(LogicalRepWorkerLock); + UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + ereport(WARNING, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("out of background worker slots"), @@ -403,13 +446,24 @@ retry: return; } + /* Done with the worker array. */ + LWLockRelease(LogicalRepWorkerLock); + /* Now wait until it attaches. */ WaitForReplicationWorkerAttach(worker, bgw_handle); + + /* + * Worker either started or died, in any case we are done with the + * subscription. + */ + UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); } /* * Stop the logical replication worker and wait until it detaches from the * slot. + * + * Callers of this function better have exclusive lock on the subscription. */ void logicalrep_worker_stop(Oid subid, Oid relid) @@ -417,7 +471,8 @@ logicalrep_worker_stop(Oid subid, Oid relid) LogicalRepWorker *worker; uint16 generation; - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + /* Exclusive is needed for logicalrep_worker_cleanup(). */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); worker = logicalrep_worker_find(subid, relid, false); @@ -428,56 +483,16 @@ logicalrep_worker_stop(Oid subid, Oid relid) return; } + /* If there is worker but it's not running, clean it up. */ + if (!worker->proc) + logicalrep_worker_cleanup(worker); + /* * Remember which generation was our worker so we can check if what we see * is still the same one. */ generation = worker->generation; - /* - * If we found worker but it does not have proc set it is starting up, - * wait for it to finish and then kill it. - */ - while (worker->in_use && !worker->proc) - { - int rc; - - LWLockRelease(LogicalRepWorkerLock); - - /* Wait for signal. */ - rc = WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, - 1000L, WAIT_EVENT_BGWORKER_STARTUP); - - /* emergency bailout if postmaster has died */ - if (rc & WL_POSTMASTER_DEATH) - proc_exit(1); - - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - } - - /* Check worker status. */ - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - - /* - * Check whether the worker slot is no longer used, which would mean - * that the worker has exited, or whether the worker generation is - * different, meaning that a different worker has taken the slot. - */ - if (!worker->in_use || worker->generation != generation) - { - LWLockRelease(LogicalRepWorkerLock); - return; - } - - /* Worker has assigned proc, so it has started. */ - if (worker->proc) - break; - } - /* Now terminate the worker ... */ kill(worker->proc->pid, SIGTERM); LWLockRelease(LogicalRepWorkerLock); @@ -497,7 +512,10 @@ logicalrep_worker_stop(Oid subid, Oid relid) CHECK_FOR_INTERRUPTS(); - /* Wait for more work. */ + /* + * We need timeout because we generally don't get notified via latch + * about the worker attach. + */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_BGWORKER_SHUTDOWN); @@ -515,6 +533,22 @@ logicalrep_worker_stop(Oid subid, Oid relid) } /* + * Request worker to be stopped on commit. + */ +void +logicalrep_worker_stop_at_commit(Oid subid, Oid relid) +{ + LogicalRepWorkerId *wid; + wid = MemoryContextAlloc(TopTransactionContext, + sizeof(LogicalRepWorkerId)); + wid->subid = subid; + wid->relid = relid; + + on_commit_stop_workers = lappend(on_commit_stop_workers, wid); +} + + +/* * Wake up (using latch) the logical replication worker. */ void @@ -648,30 +682,6 @@ logicalrep_launcher_sighup(SIGNAL_ARGS) } /* - * Count the number of registered (not necessarily running) sync workers - * for a subscription. - */ -int -logicalrep_sync_worker_count(Oid subid) -{ - int i; - int res = 0; - - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - - /* Search for attached worker for a given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) - { - LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - - if (w->subid == subid && OidIsValid(w->relid)) - res++; - } - - return res; -} - -/* * ApplyLauncherShmemSize * Compute space needed for replication launcher shared memory */ @@ -754,9 +764,25 @@ ApplyLauncherShmemInit(void) void AtEOXact_ApplyLauncher(bool isCommit) { - if (isCommit && on_commit_launcher_wakeup) - ApplyLauncherWakeup(); + ListCell *lc; + + if (isCommit) + { + foreach (lc, on_commit_stop_workers) + { + LogicalRepWorkerId *wid = lfirst(lc); + logicalrep_worker_stop(wid->subid, wid->relid); + } + if (on_commit_launcher_wakeup) + ApplyLauncherWakeup(); + } + + /* + * No need to pfree on_commit_stop_workers, it's been allocated in + * transaction memory context which is going to be cleaned soon. + */ + on_commit_stop_workers = NIL; on_commit_launcher_wakeup = false; } @@ -814,8 +840,6 @@ ApplyLauncherMain(Datum main_arg) int rc; List *sublist; ListCell *lc; - MemoryContext subctx; - MemoryContext oldctx; TimestampTz now; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; @@ -827,41 +851,38 @@ ApplyLauncherMain(Datum main_arg) if (TimestampDifferenceExceeds(last_start_time, now, wal_retrieve_retry_interval)) { - /* Use temporary context for the database list and worker info. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - oldctx = MemoryContextSwitchTo(subctx); - - /* search for subscriptions to start or stop. */ + /* + * Start new transaction so that we can take locks and snapshots. + * + * Any allocations will also be made inside the transaction memory + * context. + */ + StartTransactionCommand(); + + /* Search for subscriptions to start. */ sublist = get_subscription_list(); - /* Start the missing workers for enabled subscriptions. */ + /* Start the missing workers. */ foreach(lc, sublist) { - Subscription *sub = (Subscription *) lfirst(lc); + Oid subid = lfirst_oid(lc); LogicalRepWorker *w; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); + w = logicalrep_worker_find(subid, InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); - if (sub->enabled && w == NULL) + if (w == NULL) { last_start_time = now; wait_time = wal_retrieve_retry_interval; - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid); + /* Start the worker. */ + logicalrep_worker_launch(subid, InvalidOid); } } - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); + CommitTransactionCommand(); } else { diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 3ef12df..11f4977 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -215,7 +215,7 @@ wait_for_relation_state_change(Oid relid, char expected_state) * Returns false if the apply worker has disappeared or the table state has been * reset. */ -static bool +static void wait_for_worker_state_change(char expected_state) { int rc; @@ -232,10 +232,13 @@ wait_for_worker_state_change(char expected_state) InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); if (!worker) - return false; + ereport(FATAL, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("terminating logical replication synchronization " + "worker due to subscription apply worker exit"))); if (MyLogicalRepWorker->relstate == expected_state) - return true; + return; rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, @@ -247,8 +250,6 @@ wait_for_worker_state_change(char expected_state) ResetLatch(MyLatch); } - - return false; } /* @@ -285,11 +286,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) SpinLockRelease(&MyLogicalRepWorker->relmutex); - SetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn, - true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); walrcv_endstreaming(wrconn, &tli); finish_sync_worker(); @@ -332,6 +332,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ListCell *lc; bool started_tx = false; +#define ensure_transaction() \ + if (!started_tx) \ + {\ + StartTransactionCommand(); \ + started_tx = true; \ + } + Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ @@ -346,8 +353,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) list_free_deep(table_states); table_states = NIL; - StartTransactionCommand(); - started_tx = true; + ensure_transaction(); /* Fetch all non-ready tables. */ rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); @@ -409,14 +415,11 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) { rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } - SetSubscriptionRelState(MyLogicalRepWorker->subid, - rstate->relid, rstate->state, - rstate->lsn, true); + + ensure_transaction(); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn); } } else @@ -435,13 +438,26 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) SpinLockRelease(&syncworker->relmutex); } else + { + List *subworkers; + ListCell *lc; /* * If there is no sync worker for this table yet, count * running sync workers for this subscription, while we have * the lock, for later. */ - nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + subworkers = + logicalrep_sub_workers_find(MyLogicalRepWorker->subid, + false); + foreach (lc, subworkers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + if (w->relid != InvalidOid) + nsyncworkers ++; + } + list_free(subworkers); + } LWLockRelease(LogicalRepWorkerLock); /* @@ -467,11 +483,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * Enter busy loop and wait for synchronization worker to * reach expected state (or die trying). */ - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } + ensure_transaction(); wait_for_relation_state_change(rstate->relid, SUBREL_STATE_SYNCDONE); } @@ -493,10 +505,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) TimestampDifferenceExceeds(hentry->last_start_time, now, wal_retrieve_retry_interval)) { - logicalrep_worker_launch(MyLogicalRepWorker->dbid, - MySubscription->oid, - MySubscription->name, - MyLogicalRepWorker->userid, + ensure_transaction(); + logicalrep_worker_launch(MySubscription->oid, rstate->relid); hentry->last_start_time = now; } @@ -798,6 +808,15 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, &relstate_lsn, true); + if (relstate == SUBREL_STATE_UNKNOWN) + { + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", " + "table \"%s\" will stop because the table is no longer subscribed", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid)))); + proc_exit(0); + } CommitTransactionCommand(); SpinLockAcquire(&MyLogicalRepWorker->relmutex); @@ -844,11 +863,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Update the state and make it visible to others. */ StartTransactionCommand(); - SetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn, - true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); CommitTransactionCommand(); pgstat_report_stat(false); @@ -933,11 +951,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * Update the new state in catalog. No need to bother * with the shmem state as we are exiting for good. */ - SetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - SUBREL_STATE_SYNCDONE, - *origin_startpos, - true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + SUBREL_STATE_SYNCDONE, + *origin_startpos); finish_sync_worker(); } break; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 898c497..085dd8c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1516,24 +1516,31 @@ ApplyWorkerMain(Datum main_arg) ALLOCSET_DEFAULT_SIZES); StartTransactionCommand(); oldctx = MemoryContextSwitchTo(ApplyContext); - MySubscription = GetSubscription(MyLogicalRepWorker->subid, false); + MySubscription = GetSubscription(MyLogicalRepWorker->subid, true); + if (!MySubscription) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription %u will " + "stop because the subscription was removed", + MyLogicalRepWorker->subid))); + proc_exit(0); + } MySubscriptionValid = true; MemoryContextSwitchTo(oldctx); - /* Setup synchronous commit according to the user's wishes */ - SetConfigOption("synchronous_commit", MySubscription->synccommit, - PGC_BACKEND, PGC_S_OVERRIDE); - if (!MySubscription->enabled) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will not " - "start because the subscription was disabled during startup", + (errmsg("logical replication apply worker for subscription \"%s\" will " + "stop because the subscription was disabled", MySubscription->name))); - proc_exit(0); } + /* Setup synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", MySubscription->synccommit, + PGC_BACKEND, PGC_S_OVERRIDE); + /* Keep us informed about subscription changes. */ CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, subscription_change_cb, diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c index e7e8e3b..639b4eb 100644 --- a/src/backend/utils/cache/catcache.c +++ b/src/backend/utils/cache/catcache.c @@ -1052,10 +1052,12 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey) case AUTHNAME: case AUTHOID: case AUTHMEMMEMROLE: + case SUBSCRIPTIONOID: + case SUBSCRIPTIONNAME: /* - * Protect authentication lookups occurring before relcache has - * collected entries for shared indexes. + * Protect authentication and subscription lookups occurring + * before relcache has collected entries for shared indexes. */ if (!criticalSharedRelcachesBuilt) return false; diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 991ca9d..c5b0b9c 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -70,8 +70,10 @@ typedef struct SubscriptionRelState char state; } SubscriptionRelState; -extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn, bool update_only); +extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn); +extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, bool missing_ok); extern void RemoveSubscriptionRel(Oid subid, Oid relid); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 494a3a3..add7841 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -71,13 +71,13 @@ extern bool in_remote_transaction; extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); -extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, - Oid userid, Oid relid); +extern void logicalrep_worker_launch(Oid subid, Oid relid); extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); -extern int logicalrep_sync_worker_count(Oid subid); +extern List *logicalrep_sub_workers_find(Oid subid, bool only_running); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); void process_syncing_tables(XLogRecPtr current_lsn); -- 2.7.4
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers