On Sat, 7 Oct 2023 at 08:12, Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Tue, Oct 3, 2023 at 12:12 PM vignesh C <vignes...@gmail.com> wrote:
> >
> > Thanks for the comments, the attached v6 version patch has the changes
> > for the same.
> >
>
> Few comments:
> =============
> 1.
> /* Is the use of a password mandatory? */
>   must_use_password = MySubscription->passwordrequired &&
> - !superuser_arg(MySubscription->owner);
> + !MySubscription->ownersuperuser;
>
> - /* Note that the superuser_arg call can access the DB */
>   CommitTransactionCommand();
>
> We can call CommitTransactionCommand() before the above check now. It
> was done afterward to invoke superuser_arg(), so, if that requirement
> is changed, we no longer need to keep the transaction open for a
> longer time. Please check other places for similar changes.

Modified

> 2.
> + ereport(LOG,
> + errmsg("logical replication worker for subscription \"%s\" will
> restart because the subscription owner has become a non-superuser",
>
> How about something on the below lines?
> logical replication worker for subscription \"%s\" will restart
> because superuser privileges have been revoked for the subscription
> owner
> OR
> logical replication worker for subscription \"%s\" will restart
> because the subscription owner's superuser privileges have been
> revoked

Modified

> 3.
> - /* Keep us informed about subscription changes. */
> + /*
> + * Keep us informed about subscription changes or pg_authid rows.
> + * (superuser can become non-superuser.)
> + */
>
> Let's slightly change the comment to: "Keep us informed about
> subscription or role changes. Note that role's superuser privilege can
> be revoked."

Modified

The attached v7 version patch has the changes for the same.

Regards,
Vignesh
From 58aaaacf2b372229e8aa0af81b316c23d89658d7 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Sun, 8 Oct 2023 07:14:46 +0530
Subject: [PATCH v7] Restart the apply worker if the subscription owner's
 superuser privileges have been revoked.

Restart the apply worker if the subscription owner's superuser privileges
have been revoked. This is required so that the subscription connection
string gets revalidated to identify cases where the password option is not
specified as part of the connection string for non-superuser.
---
 src/backend/catalog/pg_subscription.c       |  3 ++
 src/backend/commands/subscriptioncmds.c     |  4 +--
 src/backend/replication/logical/tablesync.c |  6 ++--
 src/backend/replication/logical/worker.c    | 33 +++++++++++++++++----
 src/include/catalog/pg_subscription.h       |  1 +
 src/test/subscription/t/027_nosuperuser.pl  | 24 +++++++++++++++
 6 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d07f88ce28..d6a978f136 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -108,6 +108,9 @@ GetSubscription(Oid subid, bool missing_ok)
 								   Anum_pg_subscription_suborigin);
 	sub->origin = TextDatumGetCString(datum);
 
+	/* Is the subscription owner a superuser? */
+	sub->ownersuperuser = superuser_arg(sub->owner);
+
 	ReleaseSysCache(tup);
 
 	return sub;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6fe111e98d..edc82c11be 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -869,7 +869,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	load_file("libpqwalreceiver", false);
 
 	/* Try to connect to the publisher. */
-	must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
+	must_use_password = sub->passwordrequired && !sub->ownersuperuser;
 	wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
 							sub->name, &err);
 	if (!wrconn)
@@ -1249,7 +1249,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			load_file("libpqwalreceiver", false);
 			/* Check the connection info string. */
 			walrcv_check_conninfo(stmt->conninfo,
-								  sub->passwordrequired && !superuser_arg(sub->owner));
+								  sub->passwordrequired && !sub->ownersuperuser);
 
 			values[Anum_pg_subscription_subconninfo - 1] =
 				CStringGetTextDatum(stmt->conninfo);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d461654ab..c758c3cb55 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1262,13 +1262,11 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
 									   MyLogicalRepWorker->relid,
 									   &relstate_lsn);
+	CommitTransactionCommand();
 
 	/* Is the use of a password mandatory? */
 	must_use_password = MySubscription->passwordrequired &&
-		!superuser_arg(MySubscription->owner);
-
-	/* Note that the superuser_arg call can access the DB */
-	CommitTransactionCommand();
+		!MySubscription->ownersuperuser;
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = relstate;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 832b1cf764..fc94915828 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3958,6 +3958,24 @@ maybe_reread_subscription(void)
 		apply_worker_exit();
 	}
 
+	/*
+	 * Exit if the subscription owner's superuser privileges have been
+	 * revoked.
+	 */
+	if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
+	{
+		if (am_parallel_apply_worker())
+			ereport(LOG,
+					errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
+						   MySubscription->name));
+		else
+			ereport(LOG,
+					errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
+						   MySubscription->name));
+
+		apply_worker_exit();
+	}
+
 	/* Check for other changes that should never happen too. */
 	if (newsub->dbid != MySubscription->dbid)
 	{
@@ -4500,11 +4518,18 @@ InitializeApplyWorker(void)
 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
 					PGC_BACKEND, PGC_S_OVERRIDE);
 
-	/* Keep us informed about subscription changes. */
+	/*
+	 * Keep us informed about subscription or role changes. Note that the
+	 * role's superuser privilege can be revoked.
+	 */
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
 								  (Datum) 0);
 
+	CacheRegisterSyscacheCallback(AUTHOID,
+								  subscription_change_cb,
+								  (Datum) 0);
+
 	if (am_tablesync_worker())
 		ereport(LOG,
 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
@@ -4599,13 +4624,11 @@ ApplyWorkerMain(Datum main_arg)
 		replorigin_session_setup(originid, 0);
 		replorigin_session_origin = originid;
 		origin_startpos = replorigin_session_get_progress(false);
+		CommitTransactionCommand();
 
 		/* Is the use of a password mandatory? */
 		must_use_password = MySubscription->passwordrequired &&
-			!superuser_arg(MySubscription->owner);
-
-		/* Note that the superuser_arg call can access the DB */
-		CommitTransactionCommand();
+			!MySubscription->ownersuperuser;
 
 		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
 												must_use_password,
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 1d40eebc78..1e16b69917 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -127,6 +127,7 @@ typedef struct Subscription
 								 * skipped */
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
+	bool		ownersuperuser; /* Is the subscription owner a superuser? */
 	bool		enabled;		/* Indicates if the subscription is enabled */
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index d7a7e3ef5b..642baa5d7c 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -104,6 +104,7 @@ for my $node ($node_publisher, $node_subscriber)
   CREATE ROLE regress_admin SUPERUSER LOGIN;
   CREATE ROLE regress_alice NOSUPERUSER LOGIN;
   GRANT CREATE ON DATABASE postgres TO regress_alice;
+  GRANT PG_CREATE_SUBSCRIPTION TO regress_alice;
   SET SESSION AUTHORIZATION regress_alice;
   CREATE SCHEMA alice;
   GRANT USAGE ON SCHEMA alice TO regress_admin;
@@ -303,4 +304,27 @@ GRANT SELECT ON alice.unpartitioned TO regress_alice;
 expect_replication("alice.unpartitioned", 3, 17, 21,
 	"restoring SELECT permission permits replication to continue");
 
+# The apply worker should get restarted after the superuser privileges are
+# revoked for subscription owner alice.
+grant_superuser("regress_alice");
+$node_subscriber->safe_psql(
+	'postgres', qq(
+SET SESSION AUTHORIZATION regress_alice;
+CREATE SUBSCRIPTION regression_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
+));
+
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher,
+	'regression_sub');
+
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+revoke_superuser("regress_alice");
+
+# After the user becomes non-superuser the apply worker should be restarted.
+$node_subscriber->wait_for_log(
+	qr/LOG: ( [A-Z0-9]+:)? logical replication worker for subscription \"regression_sub\" will restart because the subscription owner's superuser privileges have been revoked/,
+	$offset);
+
 done_testing();
-- 
2.34.1

From 23145742e442837b03afd77015f1319010b1a2df Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Fri, 22 Sep 2023 15:12:23 +0530
Subject: [PATCH v7] Restart the apply worker if the subscription owner's
 superuser privileges have been revoked.

Restart the apply worker if the subscription owner's superuser privileges
have been revoked. This is required so that the subscription connection
string gets revalidated to identify cases where the password option is not
specified as part of the connection string for non-superuser.
---
 src/backend/catalog/pg_subscription.c       |  3 ++
 src/backend/commands/subscriptioncmds.c     |  4 +--
 src/backend/replication/logical/tablesync.c |  6 ++--
 src/backend/replication/logical/worker.c    | 33 +++++++++++++++++----
 src/include/catalog/pg_subscription.h       |  1 +
 src/test/subscription/t/027_nosuperuser.pl  | 24 +++++++++++++++
 6 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d07f88ce28..d6a978f136 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -108,6 +108,9 @@ GetSubscription(Oid subid, bool missing_ok)
 								   Anum_pg_subscription_suborigin);
 	sub->origin = TextDatumGetCString(datum);
 
+	/* Is the subscription owner a superuser? */
+	sub->ownersuperuser = superuser_arg(sub->owner);
+
 	ReleaseSysCache(tup);
 
 	return sub;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6fe111e98d..edc82c11be 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -869,7 +869,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	load_file("libpqwalreceiver", false);
 
 	/* Try to connect to the publisher. */
-	must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
+	must_use_password = sub->passwordrequired && !sub->ownersuperuser;
 	wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
 							sub->name, &err);
 	if (!wrconn)
@@ -1249,7 +1249,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			load_file("libpqwalreceiver", false);
 			/* Check the connection info string. */
 			walrcv_check_conninfo(stmt->conninfo,
-								  sub->passwordrequired && !superuser_arg(sub->owner));
+								  sub->passwordrequired && !sub->ownersuperuser);
 
 			values[Anum_pg_subscription_subconninfo - 1] =
 				CStringGetTextDatum(stmt->conninfo);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e2cee92cf2..37a0abe2f4 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1275,13 +1275,11 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
 									   MyLogicalRepWorker->relid,
 									   &relstate_lsn);
+	CommitTransactionCommand();
 
 	/* Is the use of a password mandatory? */
 	must_use_password = MySubscription->passwordrequired &&
-		!superuser_arg(MySubscription->owner);
-
-	/* Note that the superuser_arg call can access the DB */
-	CommitTransactionCommand();
+		!MySubscription->ownersuperuser;
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = relstate;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 597947410f..54c14495be 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3966,6 +3966,24 @@ maybe_reread_subscription(void)
 		apply_worker_exit();
 	}
 
+	/*
+	 * Exit if the subscription owner's superuser privileges have been
+	 * revoked.
+	 */
+	if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
+	{
+		if (am_parallel_apply_worker())
+			ereport(LOG,
+					errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
+						   MySubscription->name));
+		else
+			ereport(LOG,
+					errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
+						   MySubscription->name));
+
+		apply_worker_exit();
+	}
+
 	/* Check for other changes that should never happen too. */
 	if (newsub->dbid != MySubscription->dbid)
 	{
@@ -4492,13 +4510,11 @@ run_apply_worker()
 	replorigin_session_setup(originid, 0);
 	replorigin_session_origin = originid;
 	origin_startpos = replorigin_session_get_progress(false);
+	CommitTransactionCommand();
 
 	/* Is the use of a password mandatory? */
 	must_use_password = MySubscription->passwordrequired &&
-		!superuser_arg(MySubscription->owner);
-
-	/* Note that the superuser_arg call can access the DB */
-	CommitTransactionCommand();
+		!MySubscription->ownersuperuser;
 
 	LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
 											must_use_password,
@@ -4621,11 +4637,18 @@ InitializeLogRepWorker(void)
 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
 					PGC_BACKEND, PGC_S_OVERRIDE);
 
-	/* Keep us informed about subscription changes. */
+	/*
+	 * Keep us informed about subscription or role changes. Note that the
+	 * role's superuser privilege can be revoked.
+	 */
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
 								  (Datum) 0);
 
+	CacheRegisterSyscacheCallback(AUTHOID,
+								  subscription_change_cb,
+								  (Datum) 0);
+
 	if (am_tablesync_worker())
 		ereport(LOG,
 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index be36c4a820..e0b91eacd2 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -127,6 +127,7 @@ typedef struct Subscription
 								 * skipped */
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
+	bool		ownersuperuser; /* Is the subscription owner a superuser? */
 	bool		enabled;		/* Indicates if the subscription is enabled */
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index d7a7e3ef5b..642baa5d7c 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -104,6 +104,7 @@ for my $node ($node_publisher, $node_subscriber)
   CREATE ROLE regress_admin SUPERUSER LOGIN;
   CREATE ROLE regress_alice NOSUPERUSER LOGIN;
   GRANT CREATE ON DATABASE postgres TO regress_alice;
+  GRANT PG_CREATE_SUBSCRIPTION TO regress_alice;
   SET SESSION AUTHORIZATION regress_alice;
   CREATE SCHEMA alice;
   GRANT USAGE ON SCHEMA alice TO regress_admin;
@@ -303,4 +304,27 @@ GRANT SELECT ON alice.unpartitioned TO regress_alice;
 expect_replication("alice.unpartitioned", 3, 17, 21,
 	"restoring SELECT permission permits replication to continue");
 
+# The apply worker should get restarted after the superuser privileges are
+# revoked for subscription owner alice.
+grant_superuser("regress_alice");
+$node_subscriber->safe_psql(
+	'postgres', qq(
+SET SESSION AUTHORIZATION regress_alice;
+CREATE SUBSCRIPTION regression_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
+));
+
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher,
+	'regression_sub');
+
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+revoke_superuser("regress_alice");
+
+# After the user becomes non-superuser the apply worker should be restarted.
+$node_subscriber->wait_for_log(
+	qr/LOG: ( [A-Z0-9]+:)? logical replication worker for subscription \"regression_sub\" will restart because the subscription owner's superuser privileges have been revoked/,
+	$offset);
+
 done_testing();
-- 
2.34.1

Reply via email to