Hi all, While looking at some of the code related to subscriptions in psql, coming down to make LOGICALREP_STREAM_* available for client code, I've also noticed quite a few inconsistencies and mistakes with how pg_subscription is handled in pg_dump, that have accumulated over the years as of what looks like set of copy-pastos for most of them: - pg_dump.c includes pg_subscription.h and not pg_subscription_d.h. This is a mistake as the former should only be included in the backend code. - SubscriptionInfo has accumulated dust over the years, using declarations types that don't map with its catalog. To keep it short here, all the fields use (char *) leading to more DIY logic in the code (see two_phase_disabled[]), while most of the fields are booleans or char values. Switching to char values allows direct comparisons with the contents of pg_subscription_d.h, leading to more consistent code. - Inconsistent position of fields between SubscriptionInfo and the catalog pg_subscription.
EXPOSE_TO_CLIENT_CODE has been added to pg_subscription.h so as values for substream, twophasestate and origin can be used directly in psql and pg_dump, switching these to use pg_subscription_d.h as this is client code. While all that addressed, I am finishing with the patch attached. Thoughts or comments? -- Michael
From 097de22e34dec9aab560cad85af48e07d446b432 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Fri, 29 Nov 2024 13:02:29 +0900 Subject: [PATCH] Refactor subscription code for psql and pg_dump --- src/include/catalog/pg_subscription.h | 44 +++++++++++++------------ src/bin/pg_dump/pg_dump.c | 47 +++++++++++++-------------- src/bin/pg_dump/pg_dump.h | 16 ++++----- src/bin/psql/describe.c | 7 ++-- 4 files changed, 58 insertions(+), 56 deletions(-) diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index b25f3fea56..beaff6578a 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -23,26 +23,6 @@ #include "lib/stringinfo.h" #include "nodes/pg_list.h" -/* - * two_phase tri-state values. See comments atop worker.c to know more about - * these states. - */ -#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd' -#define LOGICALREP_TWOPHASE_STATE_PENDING 'p' -#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e' - -/* - * The subscription will request the publisher to only send changes that do not - * have any origin. - */ -#define LOGICALREP_ORIGIN_NONE "none" - -/* - * The subscription will request the publisher to send changes regardless - * of their origin. - */ -#define LOGICALREP_ORIGIN_ANY "any" - /* ---------------- * pg_subscription definition. cpp turns this into * typedef struct FormData_pg_subscription @@ -159,6 +139,28 @@ typedef struct Subscription * specified origin */ } Subscription; +#ifdef EXPOSE_TO_CLIENT_CODE + +/* + * two_phase tri-state values. See comments atop worker.c to know more about + * these states. + */ +#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd' +#define LOGICALREP_TWOPHASE_STATE_PENDING 'p' +#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e' + +/* + * The subscription will request the publisher to only send changes that do not + * have any origin. + */ +#define LOGICALREP_ORIGIN_NONE "none" + +/* + * The subscription will request the publisher to send changes regardless + * of their origin. + */ +#define LOGICALREP_ORIGIN_ANY "any" + /* Disallow streaming in-progress transactions. */ #define LOGICALREP_STREAM_OFF 'f' @@ -174,6 +176,8 @@ typedef struct Subscription */ #define LOGICALREP_STREAM_PARALLEL 'p' +#endif /* EXPOSE_TO_CLIENT_CODE */ + extern Subscription *GetSubscription(Oid subid, bool missing_ok); extern void FreeSubscription(Subscription *sub); extern void DisableSubscription(Oid subid); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index add7f16c90..ec0cdf4ed7 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -50,7 +50,7 @@ #include "catalog/pg_default_acl_d.h" #include "catalog/pg_largeobject_d.h" #include "catalog/pg_proc_d.h" -#include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_d.h" #include "catalog/pg_type_d.h" #include "common/connect.h" #include "common/int.h" @@ -4968,20 +4968,20 @@ getSubscriptions(Archive *fout) i_oid = PQfnumber(res, "oid"); i_subname = PQfnumber(res, "subname"); i_subowner = PQfnumber(res, "subowner"); + i_subenabled = PQfnumber(res, "subenabled"); i_subbinary = PQfnumber(res, "subbinary"); i_substream = PQfnumber(res, "substream"); i_subtwophasestate = PQfnumber(res, "subtwophasestate"); i_subdisableonerr = PQfnumber(res, "subdisableonerr"); i_subpasswordrequired = PQfnumber(res, "subpasswordrequired"); i_subrunasowner = PQfnumber(res, "subrunasowner"); + i_subfailover = PQfnumber(res, "subfailover"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subpublications = PQfnumber(res, "subpublications"); i_suborigin = PQfnumber(res, "suborigin"); i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn"); - i_subenabled = PQfnumber(res, "subenabled"); - i_subfailover = PQfnumber(res, "subfailover"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4995,18 +4995,20 @@ getSubscriptions(Archive *fout) subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname)); subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner)); + subinfo[i].subenabled = + (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0); subinfo[i].subbinary = - pg_strdup(PQgetvalue(res, i, i_subbinary)); - subinfo[i].substream = - pg_strdup(PQgetvalue(res, i, i_substream)); - subinfo[i].subtwophasestate = - pg_strdup(PQgetvalue(res, i, i_subtwophasestate)); + (strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0); + subinfo[i].substream = *(PQgetvalue(res, i, i_substream)); + subinfo[i].subtwophasestate = *(PQgetvalue(res, i, i_subtwophasestate)); subinfo[i].subdisableonerr = - pg_strdup(PQgetvalue(res, i, i_subdisableonerr)); + (strcmp(PQgetvalue(res, i, i_subdisableonerr), "t") == 0); subinfo[i].subpasswordrequired = - pg_strdup(PQgetvalue(res, i, i_subpasswordrequired)); + (strcmp(PQgetvalue(res, i, i_subpasswordrequired), "t") == 0); subinfo[i].subrunasowner = - pg_strdup(PQgetvalue(res, i, i_subrunasowner)); + (strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0); + subinfo[i].subfailover = + (strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); if (PQgetisnull(res, i, i_subslotname)) @@ -5024,10 +5026,6 @@ getSubscriptions(Archive *fout) else subinfo[i].suboriginremotelsn = pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn)); - subinfo[i].subenabled = - pg_strdup(PQgetvalue(res, i, i_subenabled)); - subinfo[i].subfailover = - pg_strdup(PQgetvalue(res, i, i_subfailover)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -5208,7 +5206,6 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) char **pubnames = NULL; int npubnames = 0; int i; - char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'}; /* Do nothing if not dumping schema */ if (!dopt->dumpSchema) @@ -5245,29 +5242,29 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) else appendPQExpBufferStr(query, "NONE"); - if (strcmp(subinfo->subbinary, "t") == 0) + if (subinfo->subbinary) appendPQExpBufferStr(query, ", binary = true"); - if (strcmp(subinfo->substream, "t") == 0) + if (subinfo->substream == LOGICALREP_STREAM_ON) appendPQExpBufferStr(query, ", streaming = on"); - else if (strcmp(subinfo->substream, "p") == 0) + else if (subinfo->substream == LOGICALREP_STREAM_PARALLEL) appendPQExpBufferStr(query, ", streaming = parallel"); else appendPQExpBufferStr(query, ", streaming = off"); - if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0) + if (subinfo->subtwophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) appendPQExpBufferStr(query, ", two_phase = on"); - if (strcmp(subinfo->subdisableonerr, "t") == 0) + if (subinfo->subdisableonerr) appendPQExpBufferStr(query, ", disable_on_error = true"); - if (strcmp(subinfo->subpasswordrequired, "t") != 0) + if (!subinfo->subpasswordrequired) appendPQExpBuffer(query, ", password_required = false"); - if (strcmp(subinfo->subrunasowner, "t") == 0) + if (subinfo->subrunasowner) appendPQExpBufferStr(query, ", run_as_owner = true"); - if (strcmp(subinfo->subfailover, "t") == 0) + if (subinfo->subfailover) appendPQExpBufferStr(query, ", failover = true"); if (strcmp(subinfo->subsynccommit, "off") != 0) @@ -5303,7 +5300,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn); } - if (strcmp(subinfo->subenabled, "t") == 0) + if (subinfo->subenabled) { /* * Enable the subscription to allow the replication to continue diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index d65f558565..2e55a0e3bb 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -664,20 +664,20 @@ typedef struct _SubscriptionInfo { DumpableObject dobj; const char *rolname; - char *subenabled; - char *subbinary; - char *substream; - char *subtwophasestate; - char *subdisableonerr; - char *subpasswordrequired; - char *subrunasowner; + bool subenabled; + bool subbinary; + char substream; + char subtwophasestate; + bool subdisableonerr; + bool subpasswordrequired; + bool subrunasowner; + bool subfailover; char *subconninfo; char *subslotname; char *subsynccommit; char *subpublications; char *suborigin; char *suboriginremotelsn; - char *subfailover; } SubscriptionInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 0aa39906a1..2657abdc72 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -25,6 +25,7 @@ #include "catalog/pg_default_acl_d.h" #include "catalog/pg_proc_d.h" #include "catalog/pg_statistic_ext_d.h" +#include "catalog/pg_subscription_d.h" #include "catalog/pg_type_d.h" #include "common.h" #include "common/logging.h" @@ -6679,9 +6680,9 @@ describeSubscriptions(const char *pattern, bool verbose) if (pset.sversion >= 160000) appendPQExpBuffer(&buf, ", (CASE substream\n" - " WHEN 'f' THEN 'off'\n" - " WHEN 't' THEN 'on'\n" - " WHEN 'p' THEN 'parallel'\n" + " WHEN " CppAsString2(LOGICALREP_STREAM_OFF) " THEN 'off'\n" + " WHEN " CppAsString2(LOGICALREP_STREAM_ON) " THEN 'on'\n" + " WHEN " CppAsString2(LOGICALREP_STREAM_PARALLEL) " THEN 'parallel'\n" " END) AS \"%s\"\n", gettext_noop("Streaming")); else -- 2.45.2
signature.asc
Description: PGP signature