Hi hackers, While working on avoiding unnecessary wakeups in logical/worker.c (as was done for walreceiver.c in 05a7be9), I noticed that the tests began taking much longer. This seems to be caused by the reduced frequency of calls to maybe_reread_subscription() in LogicalRepApplyLoop(). Presently, LogicalRepApplyLoop() only waits for up to a second, so the subscription info is re-read by workers relatively frequently. If LogicalRepApplyLoop() sleeps for longer, the subscription info may not be read for much longer.
I think the fix for this problem can be considered independently, as relying on frequent wakeups seems less than ideal, and the patch seems to provide a small improvement even before applying the avoid-unnecessary-wakeups patch. On my machine, the attached patch improved 'check-world -j8' run time by ~12 seconds (from 3min 8sec to 2min 56 sec) and src/test/subscription test time by ~17 seconds (from 139 seconds to 122 seconds). I put the new logic in launcher.c, but it might make more sense to put it in logical/worker.c. I think that might require some new #includes in a couple of files, but otherwise, the patch would likely look about the same. Thoughts? -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From 80452ebbadfe80f237ebf5c7cdf8fa5eb2a61e35 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Mon, 21 Nov 2022 16:01:01 -0800 Subject: [PATCH v1 1/1] wake up logical workers after ALTER SUBSCRIPTION --- src/backend/access/transam/xact.c | 2 ++ src/backend/commands/subscriptioncmds.c | 2 ++ src/backend/replication/logical/launcher.c | 37 ++++++++++++++++++++++ src/include/replication/logicallauncher.h | 3 ++ 4 files changed, 44 insertions(+) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 8086b857b9..175ba770e6 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2360,6 +2360,7 @@ CommitTransaction(void) AtEOXact_PgStat(true, is_parallel_worker); AtEOXact_Snapshot(true, false); AtEOXact_ApplyLauncher(true); + AtEOXact_LogicalRepWorkers(true); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; @@ -2860,6 +2861,7 @@ AbortTransaction(void) AtEOXact_HashTables(false); AtEOXact_PgStat(false, is_parallel_worker); AtEOXact_ApplyLauncher(false); + AtEOXact_LogicalRepWorkers(false); pgstat_report_xact_timestamp(0); } diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index d673557ea4..ff0fe5e27b 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1031,6 +1031,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, form = (Form_pg_subscription) GETSTRUCT(tup); subid = form->oid; + LogicalRepWorkersWakeupAtCommit(subid); + /* must be owner */ if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION, diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 73594c698a..af56220e3e 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -75,6 +75,7 @@ static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); +static Oid on_commit_wakeup_workers_subid = InvalidOid; static bool on_commit_launcher_wakeup = false; @@ -759,6 +760,42 @@ ApplyLauncherShmemInit(void) } } +/* + * Wakeup the stored subscription's workers on commit if requested. + */ +void +AtEOXact_LogicalRepWorkers(bool isCommit) +{ + if (isCommit && OidIsValid(on_commit_wakeup_workers_subid)) + { + List *workers; + ListCell *worker; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + workers = logicalrep_workers_find(on_commit_wakeup_workers_subid, true); + foreach(worker, workers) + logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker)); + + LWLockRelease(LogicalRepWorkerLock); + } + + on_commit_wakeup_workers_subid = InvalidOid; +} + +/* + * Request wakeup of the workers for the given subscription ID on commit of the + * transaction. + * + * This is used to ensure that the workers reread the subscription info as soon + * as possible. + */ +void +LogicalRepWorkersWakeupAtCommit(Oid subid) +{ + on_commit_wakeup_workers_subid = subid; +} + /* * Wakeup the launcher on commit if requested. */ diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index f1e2821e25..02cd3bdd4c 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -24,6 +24,9 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherWakeupAtCommit(void); extern void AtEOXact_ApplyLauncher(bool isCommit); +extern void LogicalRepWorkersWakeupAtCommit(Oid subid); +extern void AtEOXact_LogicalRepWorkers(bool isCommit); + extern bool IsLogicalLauncher(void); #endif /* LOGICALLAUNCHER_H */ -- 2.25.1