Hi all,

While looking at some of the code related to subscriptions in psql,
coming down to make LOGICALREP_STREAM_* available for client code,
I've also noticed quite a few inconsistencies and mistakes with how
pg_subscription is handled in pg_dump, that have accumulated over the
years as of what looks like set of copy-pastos for most of them:
- pg_dump.c includes pg_subscription.h and not pg_subscription_d.h.
This is a mistake as the former should only be included in the backend
code.
- SubscriptionInfo has accumulated dust over the years, using
declarations types that don't map with its catalog.  To keep it short
here, all the fields use (char *) leading to more DIY logic in the
code (see two_phase_disabled[]), while most of the fields are booleans
or char values.  Switching to char values allows direct comparisons
with the contents of pg_subscription_d.h, leading to more consistent
code.
- Inconsistent position of fields between SubscriptionInfo and the
catalog pg_subscription.

EXPOSE_TO_CLIENT_CODE has been added to pg_subscription.h so as values
for substream, twophasestate and origin can be used directly in psql
and pg_dump, switching these to use pg_subscription_d.h as this is
client code.

While all that addressed, I am finishing with the patch attached.

Thoughts or comments?
--
Michael
From 097de22e34dec9aab560cad85af48e07d446b432 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Fri, 29 Nov 2024 13:02:29 +0900
Subject: [PATCH] Refactor subscription code for psql and pg_dump

---
 src/include/catalog/pg_subscription.h | 44 +++++++++++++------------
 src/bin/pg_dump/pg_dump.c             | 47 +++++++++++++--------------
 src/bin/pg_dump/pg_dump.h             | 16 ++++-----
 src/bin/psql/describe.c               |  7 ++--
 4 files changed, 58 insertions(+), 56 deletions(-)

diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b25f3fea56..beaff6578a 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -23,26 +23,6 @@
 #include "lib/stringinfo.h"
 #include "nodes/pg_list.h"
 
-/*
- * two_phase tri-state values. See comments atop worker.c to know more about
- * these states.
- */
-#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd'
-#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
-#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
-
-/*
- * The subscription will request the publisher to only send changes that do not
- * have any origin.
- */
-#define LOGICALREP_ORIGIN_NONE "none"
-
-/*
- * The subscription will request the publisher to send changes regardless
- * of their origin.
- */
-#define LOGICALREP_ORIGIN_ANY "any"
-
 /* ----------------
  *		pg_subscription definition. cpp turns this into
  *		typedef struct FormData_pg_subscription
@@ -159,6 +139,28 @@ typedef struct Subscription
 								 * specified origin */
 } Subscription;
 
+#ifdef EXPOSE_TO_CLIENT_CODE
+
+/*
+ * two_phase tri-state values. See comments atop worker.c to know more about
+ * these states.
+ */
+#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd'
+#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
+#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
+
+/*
+ * The subscription will request the publisher to only send changes that do not
+ * have any origin.
+ */
+#define LOGICALREP_ORIGIN_NONE "none"
+
+/*
+ * The subscription will request the publisher to send changes regardless
+ * of their origin.
+ */
+#define LOGICALREP_ORIGIN_ANY "any"
+
 /* Disallow streaming in-progress transactions. */
 #define LOGICALREP_STREAM_OFF 'f'
 
@@ -174,6 +176,8 @@ typedef struct Subscription
  */
 #define LOGICALREP_STREAM_PARALLEL 'p'
 
+#endif							/* EXPOSE_TO_CLIENT_CODE */
+
 extern Subscription *GetSubscription(Oid subid, bool missing_ok);
 extern void FreeSubscription(Subscription *sub);
 extern void DisableSubscription(Oid subid);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index add7f16c90..ec0cdf4ed7 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -50,7 +50,7 @@
 #include "catalog/pg_default_acl_d.h"
 #include "catalog/pg_largeobject_d.h"
 #include "catalog/pg_proc_d.h"
-#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_d.h"
 #include "catalog/pg_type_d.h"
 #include "common/connect.h"
 #include "common/int.h"
@@ -4968,20 +4968,20 @@ getSubscriptions(Archive *fout)
 	i_oid = PQfnumber(res, "oid");
 	i_subname = PQfnumber(res, "subname");
 	i_subowner = PQfnumber(res, "subowner");
+	i_subenabled = PQfnumber(res, "subenabled");
 	i_subbinary = PQfnumber(res, "subbinary");
 	i_substream = PQfnumber(res, "substream");
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
 	i_subrunasowner = PQfnumber(res, "subrunasowner");
+	i_subfailover = PQfnumber(res, "subfailover");
 	i_subconninfo = PQfnumber(res, "subconninfo");
 	i_subslotname = PQfnumber(res, "subslotname");
 	i_subsynccommit = PQfnumber(res, "subsynccommit");
 	i_subpublications = PQfnumber(res, "subpublications");
 	i_suborigin = PQfnumber(res, "suborigin");
 	i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
-	i_subenabled = PQfnumber(res, "subenabled");
-	i_subfailover = PQfnumber(res, "subfailover");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4995,18 +4995,20 @@ getSubscriptions(Archive *fout)
 		subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
 		subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
 
+		subinfo[i].subenabled =
+			(strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0);
 		subinfo[i].subbinary =
-			pg_strdup(PQgetvalue(res, i, i_subbinary));
-		subinfo[i].substream =
-			pg_strdup(PQgetvalue(res, i, i_substream));
-		subinfo[i].subtwophasestate =
-			pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
+			(strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0);
+		subinfo[i].substream = *(PQgetvalue(res, i, i_substream));
+		subinfo[i].subtwophasestate = *(PQgetvalue(res, i, i_subtwophasestate));
 		subinfo[i].subdisableonerr =
-			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
+			(strcmp(PQgetvalue(res, i, i_subdisableonerr), "t") == 0);
 		subinfo[i].subpasswordrequired =
-			pg_strdup(PQgetvalue(res, i, i_subpasswordrequired));
+			(strcmp(PQgetvalue(res, i, i_subpasswordrequired), "t") == 0);
 		subinfo[i].subrunasowner =
-			pg_strdup(PQgetvalue(res, i, i_subrunasowner));
+			(strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0);
+		subinfo[i].subfailover =
+			(strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0);
 		subinfo[i].subconninfo =
 			pg_strdup(PQgetvalue(res, i, i_subconninfo));
 		if (PQgetisnull(res, i, i_subslotname))
@@ -5024,10 +5026,6 @@ getSubscriptions(Archive *fout)
 		else
 			subinfo[i].suboriginremotelsn =
 				pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
-		subinfo[i].subenabled =
-			pg_strdup(PQgetvalue(res, i, i_subenabled));
-		subinfo[i].subfailover =
-			pg_strdup(PQgetvalue(res, i, i_subfailover));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -5208,7 +5206,6 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	char	  **pubnames = NULL;
 	int			npubnames = 0;
 	int			i;
-	char		two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'};
 
 	/* Do nothing if not dumping schema */
 	if (!dopt->dumpSchema)
@@ -5245,29 +5242,29 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	else
 		appendPQExpBufferStr(query, "NONE");
 
-	if (strcmp(subinfo->subbinary, "t") == 0)
+	if (subinfo->subbinary)
 		appendPQExpBufferStr(query, ", binary = true");
 
-	if (strcmp(subinfo->substream, "t") == 0)
+	if (subinfo->substream == LOGICALREP_STREAM_ON)
 		appendPQExpBufferStr(query, ", streaming = on");
-	else if (strcmp(subinfo->substream, "p") == 0)
+	else if (subinfo->substream == LOGICALREP_STREAM_PARALLEL)
 		appendPQExpBufferStr(query, ", streaming = parallel");
 	else
 		appendPQExpBufferStr(query, ", streaming = off");
 
-	if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
+	if (subinfo->subtwophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED)
 		appendPQExpBufferStr(query, ", two_phase = on");
 
-	if (strcmp(subinfo->subdisableonerr, "t") == 0)
+	if (subinfo->subdisableonerr)
 		appendPQExpBufferStr(query, ", disable_on_error = true");
 
-	if (strcmp(subinfo->subpasswordrequired, "t") != 0)
+	if (!subinfo->subpasswordrequired)
 		appendPQExpBuffer(query, ", password_required = false");
 
-	if (strcmp(subinfo->subrunasowner, "t") == 0)
+	if (subinfo->subrunasowner)
 		appendPQExpBufferStr(query, ", run_as_owner = true");
 
-	if (strcmp(subinfo->subfailover, "t") == 0)
+	if (subinfo->subfailover)
 		appendPQExpBufferStr(query, ", failover = true");
 
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
@@ -5303,7 +5300,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 			appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn);
 		}
 
-		if (strcmp(subinfo->subenabled, "t") == 0)
+		if (subinfo->subenabled)
 		{
 			/*
 			 * Enable the subscription to allow the replication to continue
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index d65f558565..2e55a0e3bb 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -664,20 +664,20 @@ typedef struct _SubscriptionInfo
 {
 	DumpableObject dobj;
 	const char *rolname;
-	char	   *subenabled;
-	char	   *subbinary;
-	char	   *substream;
-	char	   *subtwophasestate;
-	char	   *subdisableonerr;
-	char	   *subpasswordrequired;
-	char	   *subrunasowner;
+	bool		subenabled;
+	bool		subbinary;
+	char		substream;
+	char		subtwophasestate;
+	bool		subdisableonerr;
+	bool		subpasswordrequired;
+	bool		subrunasowner;
+	bool		subfailover;
 	char	   *subconninfo;
 	char	   *subslotname;
 	char	   *subsynccommit;
 	char	   *subpublications;
 	char	   *suborigin;
 	char	   *suboriginremotelsn;
-	char	   *subfailover;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 0aa39906a1..2657abdc72 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_default_acl_d.h"
 #include "catalog/pg_proc_d.h"
 #include "catalog/pg_statistic_ext_d.h"
+#include "catalog/pg_subscription_d.h"
 #include "catalog/pg_type_d.h"
 #include "common.h"
 #include "common/logging.h"
@@ -6679,9 +6680,9 @@ describeSubscriptions(const char *pattern, bool verbose)
 			if (pset.sversion >= 160000)
 				appendPQExpBuffer(&buf,
 								  ", (CASE substream\n"
-								  "    WHEN 'f' THEN 'off'\n"
-								  "    WHEN 't' THEN 'on'\n"
-								  "    WHEN 'p' THEN 'parallel'\n"
+								  "    WHEN " CppAsString2(LOGICALREP_STREAM_OFF) " THEN 'off'\n"
+								  "    WHEN " CppAsString2(LOGICALREP_STREAM_ON) " THEN 'on'\n"
+								  "    WHEN " CppAsString2(LOGICALREP_STREAM_PARALLEL) " THEN 'parallel'\n"
 								  "   END) AS \"%s\"\n",
 								  gettext_noop("Streaming"));
 			else
-- 
2.45.2

Attachment: signature.asc
Description: PGP signature

Reply via email to