Hi hackers,

While working on avoiding unnecessary wakeups in logical/worker.c (as was
done for walreceiver.c in 05a7be9), I noticed that the tests began taking
much longer.  This seems to be caused by the reduced frequency of calls to
maybe_reread_subscription() in LogicalRepApplyLoop().  Presently,
LogicalRepApplyLoop() only waits for up to a second, so the subscription
info is re-read by workers relatively frequently.  If LogicalRepApplyLoop()
sleeps for longer, the subscription info may not be read for much longer.

I think the fix for this problem can be considered independently, as
relying on frequent wakeups seems less than ideal, and the patch seems to
provide a small improvement even before applying the
avoid-unnecessary-wakeups patch.  On my machine, the attached patch
improved 'check-world -j8' run time by ~12 seconds (from 3min 8sec to 2min
56 sec) and src/test/subscription test time by ~17 seconds (from 139
seconds to 122 seconds).

I put the new logic in launcher.c, but it might make more sense to put it
in logical/worker.c.  I think that might require some new #includes in a
couple of files, but otherwise, the patch would likely look about the same.

Thoughts?

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 80452ebbadfe80f237ebf5c7cdf8fa5eb2a61e35 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v1 1/1] wake up logical workers after ALTER SUBSCRIPTION

---
 src/backend/access/transam/xact.c          |  2 ++
 src/backend/commands/subscriptioncmds.c    |  2 ++
 src/backend/replication/logical/launcher.c | 37 ++++++++++++++++++++++
 src/include/replication/logicallauncher.h  |  3 ++
 4 files changed, 44 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8086b857b9..175ba770e6 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2360,6 +2360,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2861,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..ff0fe5e27b 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1031,6 +1031,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	form = (Form_pg_subscription) GETSTRUCT(tup);
 	subid = form->oid;
 
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	/* must be owner */
 	if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 73594c698a..af56220e3e 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -75,6 +75,7 @@ static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
 
+static Oid on_commit_wakeup_workers_subid = InvalidOid;
 static bool on_commit_launcher_wakeup = false;
 
 
@@ -759,6 +760,42 @@ ApplyLauncherShmemInit(void)
 	}
 }
 
+/*
+ * Wakeup the stored subscription's workers on commit if requested.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+	if (isCommit && OidIsValid(on_commit_wakeup_workers_subid))
+	{
+		List	   *workers;
+		ListCell   *worker;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+		workers = logicalrep_workers_find(on_commit_wakeup_workers_subid, true);
+		foreach(worker, workers)
+			logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker));
+
+		LWLockRelease(LogicalRepWorkerLock);
+	}
+
+	on_commit_wakeup_workers_subid = InvalidOid;
+}
+
+/*
+ * Request wakeup of the workers for the given subscription ID on commit of the
+ * transaction.
+ *
+ * This is used to ensure that the workers reread the subscription info as soon
+ * as possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+	on_commit_wakeup_workers_subid = subid;
+}
+
 /*
  * Wakeup the launcher on commit if requested.
  */
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index f1e2821e25..02cd3bdd4c 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -24,6 +24,9 @@ extern void ApplyLauncherShmemInit(void);
 extern void ApplyLauncherWakeupAtCommit(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
+extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
+extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+
 extern bool IsLogicalLauncher(void);
 
 #endif							/* LOGICALLAUNCHER_H */
-- 
2.25.1

Reply via email to