I looked at this patchset and it seemed natural to apply 0008 next
(adding work_mem to subscriptions).  Attached is Dilip's latest version,
plus my review changes.  This will break the patch tester's logic; sorry
about that.

What part of this change is what sets the process's
logical_decoding_work_mem to the given value?  I was unable to figure
that out.  Is it missing or am I just stupid?

Changes:
* the patch adds logical_decoding_work_mem SGML, but that has already
  been applied (cec2edfa7859); remove dupe.

* parse_subscription_options() comment says that it will raise an error if a
  caller does not pass the pointer for an option but option list
  specifies that option.  It does not really implement that behavior (an
  existing problem): instead, if the pointer is not passed, the option
  is ignored.  Moreover, this new patch continued to fail to handle
  things as the comment says.  I decided to implement the documented
  behavior instead; it's now inconsistent with how the other options are
  implemented.  I think we should fix the other options to behave as the
  comment says, because it's a more convenient API; if we instead opted
  to update the code comment to match the code, each caller would have
  to be checked to verify that the correct options are passed, which is
  pointless and error prone.

* the parse_subscription_options API is a mess.  I reordered the
  arguments a little bit; also change the argument layout in callers so
  that each caller is grouped more sensibly.  Also added comments to
  simplify reading the argument lists.  I think this could be fixed by
  using an ad-hoc struct to pass in and out.  Didn't get around to doing
  that, seems an unrelated potential improvement.

* trying to do own range checking in pgoutput and subscriptioncmds.c
  seems pointless and likely to get out of sync with guc.c.  Simpler is
  to call set_config_option() to verify that the argument is in range.
  (Note a further problem in the patch series: the range check in
  subscriptioncmds.c is only added in patch 0009).

* parsing integers using scanint8() seemed weird (error messages there
  do not correspond to what we want).  After a couple of false starts, I
  decided to rely on guc.c's set_config_option() followed by parse_int().
  That also has the benefit that you can give it units.

* psql \dRs+ should display the work_mem; patch failed to do that.
  Added.  Unit display is done by pg_size_pretty(), which might be
  different from what guc.c does, but I think it works OK.
  It's the first place where we use pg_size_pretty to show a memory
  limit, however.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From a31b4ebd90dd7a4c94a35f2b3452258078c30e37 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 22 Jan 2020 12:44:13 -0300
Subject: [PATCH 1/2] Dilip's original

---
 doc/src/sgml/config.sgml                      | 21 +++++++++
 doc/src/sgml/ref/create_subscription.sgml     | 12 +++++
 src/backend/catalog/pg_subscription.c         |  1 +
 src/backend/commands/subscriptioncmds.c       | 44 ++++++++++++++++---
 .../libpqwalreceiver/libpqwalreceiver.c       |  3 ++
 src/backend/replication/logical/worker.c      |  1 +
 src/backend/replication/pgoutput/pgoutput.c   | 30 ++++++++++++-
 src/include/catalog/pg_subscription.h         |  3 ++
 src/include/replication/walreceiver.h         |  1 +
 9 files changed, 108 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3ccacd528b..163cc77d1d 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1751,6 +1751,27 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-logical-decoding-work-mem" xreflabel="logical_decoding_work_mem">
+      <term><varname>logical_decoding_work_mem</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>logical_decoding_work_mem</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies the maximum amount of memory to be used by logical decoding,
+        before some of the decoded changes are either written to local disk.
+        This limits the amount of memory used by logical streaming replication
+        connections. It defaults to 64 megabytes (<literal>64MB</literal>).
+        Since each replication connection only uses a single buffer of this size,
+        and an installation normally doesn't have many such connections
+        concurrently (as limited by <varname>max_wal_senders</varname>), it's
+        safe to set this value significantly higher than <varname>work_mem</varname>,
+        reducing the amount of decoded changes written to disk.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-max-stack-depth" xreflabel="max_stack_depth">
       <term><varname>max_stack_depth</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 1a90c244fb..91790b0c95 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -206,6 +206,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>work_mem</literal> (<type>integer</type>)</term>
+        <listitem>
+         <para>
+          Limits the amount of memory used to decode changes on the
+          publisher.  If not specified, the publisher will use the default
+          specified by <varname>logical_decoding_work_mem</varname>. When
+          needed, additional data are spilled to disk.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
      </para>
     </listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index f77a83bd2e..5cd1daa238 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -65,6 +65,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
+	sub->workmem = subform->subworkmem;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9bfe142ada..c50e854e96 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -58,7 +58,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 						   bool *enabled, bool *create_slot,
 						   bool *slot_name_given, char **slot_name,
 						   bool *copy_data, char **synchronous_commit,
-						   bool *refresh)
+						   bool *refresh, int *logical_wm,
+						   bool *logical_wm_given)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -89,6 +90,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 		*synchronous_commit = NULL;
 	if (refresh)
 		*refresh = true;
+	if (logical_wm)
+		*logical_wm_given = false;
 
 	/* Parse options */
 	foreach(lc, options)
@@ -174,6 +177,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 			refresh_given = true;
 			*refresh = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "work_mem") == 0 && logical_wm)
+		{
+			if (*logical_wm_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+
+			*logical_wm_given = true;
+			*logical_wm = defGetInt32(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -317,6 +330,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	bool		enabled_given;
 	bool		enabled;
 	bool		copy_data;
+	int			logical_wm;
+	bool		logical_wm_given;
 	char	   *synchronous_commit;
 	char	   *conninfo;
 	char	   *slotname;
@@ -333,7 +348,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	parse_subscription_options(stmt->options, &connect, &enabled_given,
 							   &enabled, &create_slot, &slotname_given,
 							   &slotname, &copy_data, &synchronous_commit,
-							   NULL);
+							   NULL, &logical_wm, &logical_wm_given);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -411,6 +426,12 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	values[Anum_pg_subscription_subpublications - 1] =
 		publicationListToArray(publications);
 
+	if (logical_wm_given)
+		values[Anum_pg_subscription_subworkmem - 1] =
+			Int32GetDatum(logical_wm);
+	else
+		nulls[Anum_pg_subscription_subworkmem - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -668,10 +689,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				char	   *slotname;
 				bool		slotname_given;
 				char	   *synchronous_commit;
+				int			logical_wm;
+				bool		logical_wm_given;
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, &slotname_given, &slotname,
-										   NULL, &synchronous_commit, NULL);
+										   NULL, &synchronous_commit, NULL,
+										   &logical_wm, &logical_wm_given);
 
 				if (slotname_given)
 				{
@@ -696,6 +720,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 					replaces[Anum_pg_subscription_subsynccommit - 1] = true;
 				}
 
+				if (logical_wm_given)
+				{
+					values[Anum_pg_subscription_subworkmem - 1] =
+						Int32GetDatum(logical_wm);
+					replaces[Anum_pg_subscription_subworkmem - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -707,7 +738,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL,
 										   &enabled_given, &enabled, NULL,
-										   NULL, NULL, NULL, NULL, NULL);
+										   NULL, NULL, NULL, NULL, NULL,
+										   NULL, NULL);
 				Assert(enabled_given);
 
 				if (!sub->slotname && enabled)
@@ -745,7 +777,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, &refresh);
+										   NULL, &refresh, NULL, NULL);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
@@ -782,7 +814,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, NULL);
+										   NULL, NULL, NULL, NULL);
 
 				AlterSubscription_refresh(sub, copy_data);
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e4fd1f9bb6..896ddab2b1 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -408,6 +408,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		appendStringInfo(&cmd, "proto_version '%u'",
 						 options->proto.logical.proto_version);
 
+		appendStringInfo(&cmd, ", work_mem '%d'",
+						 options->proto.logical.work_mem);
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7a5471f95c..48b960c4c9 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1745,6 +1745,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.slotname = myslotname;
 	options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
 	options.proto.logical.publication_names = MySubscription->publications;
+	options.proto.logical.work_mem = MySubscription->workmem;
 
 	/* Start normal logical streaming replication. */
 	walrcv_startstreaming(wrconn, &options);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 752508213a..536722b32f 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -18,6 +18,7 @@
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/guc.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/memutils.h"
@@ -87,11 +88,12 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 
 static void
 parse_output_parameters(List *options, uint32 *protocol_version,
-						List **publication_names)
+						List **publication_names, int *logical_decoding_work_mem)
 {
 	ListCell   *lc;
 	bool		protocol_version_given = false;
 	bool		publication_names_given = false;
+	bool		work_mem_given = false;
 
 	foreach(lc, options)
 	{
@@ -137,6 +139,29 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						(errcode(ERRCODE_INVALID_NAME),
 						 errmsg("invalid publication_names syntax")));
 		}
+		else if (strcmp(defel->defname, "work_mem") == 0)
+		{
+			int64	parsed;
+
+			if (work_mem_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			work_mem_given = true;
+
+			if (!scanint8(strVal(defel->arg), true, &parsed))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid work_mem")));
+
+			if (parsed > PG_INT32_MAX || parsed < 64)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("work_mem \"%s\" out of range",
+								strVal(defel->arg))));
+
+			*logical_decoding_work_mem = (int)parsed;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -171,7 +196,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		/* Parse the params and ERROR if we see any we don't recognize */
 		parse_output_parameters(ctx->output_plugin_options,
 								&data->protocol_version,
-								&data->publication_names);
+								&data->publication_names,
+								&logical_decoding_work_mem);
 
 		/* Check if we support requested protocol */
 		if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0a756d42d8..3394379f86 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -48,6 +48,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
+	int32		subworkmem;		/* Memory to use to decode changes. */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -73,6 +75,7 @@ typedef struct Subscription
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
+	int			workmem;		/* Memory to decode changes. */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e08afc6548..4c7acfb7d3 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -169,6 +169,7 @@ typedef struct
 		{
 			uint32		proto_version;	/* Logical protocol version */
 			List	   *publication_names;	/* String list of publications */
+			int			work_mem;	/* Memory limit to use for decoding */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
-- 
2.20.1

>From 848ad7383cede7600ae3fca07440e3f2441ac934 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 22 Jan 2020 12:51:28 -0300
Subject: [PATCH 2/2] =?UTF-8?q?Changes=20by=20=C3=81lvaro?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 doc/src/sgml/config.sgml                    | 21 -------
 src/backend/commands/subscriptioncmds.c     | 62 +++++++++++++--------
 src/backend/replication/pgoutput/pgoutput.c | 23 +++-----
 src/bin/psql/describe.c                     |  4 +-
 src/include/catalog/pg_subscription.h       |  1 -
 5 files changed, 52 insertions(+), 59 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 163cc77d1d..3ccacd528b 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1751,27 +1751,6 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
-     <varlistentry id="guc-logical-decoding-work-mem" xreflabel="logical_decoding_work_mem">
-      <term><varname>logical_decoding_work_mem</varname> (<type>integer</type>)
-      <indexterm>
-       <primary><varname>logical_decoding_work_mem</varname> configuration parameter</primary>
-      </indexterm>
-      </term>
-      <listitem>
-       <para>
-        Specifies the maximum amount of memory to be used by logical decoding,
-        before some of the decoded changes are either written to local disk.
-        This limits the amount of memory used by logical streaming replication
-        connections. It defaults to 64 megabytes (<literal>64MB</literal>).
-        Since each replication connection only uses a single buffer of this size,
-        and an installation normally doesn't have many such connections
-        concurrently (as limited by <varname>max_wal_senders</varname>), it's
-        safe to set this value significantly higher than <varname>work_mem</varname>,
-        reducing the amount of decoded changes written to disk.
-       </para>
-      </listitem>
-     </varlistentry>
-
      <varlistentry id="guc-max-stack-depth" xreflabel="max_stack_depth">
       <term><varname>max_stack_depth</varname> (<type>integer</type>)
       <indexterm>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index c50e854e96..7920e75bfa 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -54,12 +54,13 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
  * accommodate that.
  */
 static void
-parse_subscription_options(List *options, bool *connect, bool *enabled_given,
-						   bool *enabled, bool *create_slot,
+parse_subscription_options(List *options, bool *connect,
+						   bool *enabled_given, bool *enabled,
+						   bool *create_slot,
 						   bool *slot_name_given, char **slot_name,
+						   bool *logical_wm_given, int *logical_wm,
 						   bool *copy_data, char **synchronous_commit,
-						   bool *refresh, int *logical_wm,
-						   bool *logical_wm_given)
+						   bool *refresh)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -177,15 +178,25 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 			refresh_given = true;
 			*refresh = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "work_mem") == 0 && logical_wm)
+		else if (strcmp(defel->defname, "work_mem") == 0)
 		{
+			if (!logical_wm)
+				elog(ERROR, "option \"work_mem\" not valid in this context");
+
 			if (*logical_wm_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
+			/* Test if the value is valid for logical_decoding_work_mem */
+			(void) set_config_option("logical_decoding_work_mem", defGetString(defel),
+									 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
+									 false, 0, false);
+			if (!parse_int(defGetString(defel), logical_wm,
+						   GUC_UNIT_KB, NULL))
+				elog(ERROR, "parse_int failed");	/* shouldn't happen */
 			*logical_wm_given = true;
-			*logical_wm = defGetInt32(defel);
+
 		}
 		else
 			ereport(ERROR,
@@ -345,10 +356,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	 *
 	 * Connection and publication should not be specified here.
 	 */
-	parse_subscription_options(stmt->options, &connect, &enabled_given,
-							   &enabled, &create_slot, &slotname_given,
-							   &slotname, &copy_data, &synchronous_commit,
-							   NULL, &logical_wm, &logical_wm_given);
+	parse_subscription_options(stmt->options, &connect,
+							   &enabled_given, &enabled,
+							   &create_slot, &slotname_given, &slotname,
+							   &logical_wm_given, &logical_wm,
+							   &copy_data, &synchronous_commit, NULL);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -692,10 +704,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				int			logical_wm;
 				bool		logical_wm_given;
 
-				parse_subscription_options(stmt->options, NULL, NULL, NULL,
+				parse_subscription_options(stmt->options, NULL,
+										   NULL, NULL,	/* enabled */
 										   NULL, &slotname_given, &slotname,
-										   NULL, &synchronous_commit, NULL,
-										   &logical_wm, &logical_wm_given);
+										   &logical_wm_given, &logical_wm,
+										   NULL, &synchronous_commit, NULL);
 
 				if (slotname_given)
 				{
@@ -737,9 +750,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 							enabled_given;
 
 				parse_subscription_options(stmt->options, NULL,
-										   &enabled_given, &enabled, NULL,
-										   NULL, NULL, NULL, NULL, NULL,
-										   NULL, NULL);
+										   &enabled_given, &enabled,
+										   NULL, NULL, NULL,	/* slot */
+										   NULL, NULL,	/* logical wm */
+										   NULL, NULL, NULL);
 				Assert(enabled_given);
 
 				if (!sub->slotname && enabled)
@@ -775,9 +789,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				bool		copy_data;
 				bool		refresh;
 
-				parse_subscription_options(stmt->options, NULL, NULL, NULL,
-										   NULL, NULL, NULL, &copy_data,
-										   NULL, &refresh, NULL, NULL);
+				parse_subscription_options(stmt->options, NULL,
+										   NULL, NULL,	/* enabled */
+										   NULL, NULL, NULL,	/* slot */
+										   NULL, NULL,	/* logical wm */
+										   &copy_data, NULL, &refresh);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
@@ -812,9 +828,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 							(errcode(ERRCODE_SYNTAX_ERROR),
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
-				parse_subscription_options(stmt->options, NULL, NULL, NULL,
-										   NULL, NULL, NULL, &copy_data,
-										   NULL, NULL, NULL, NULL);
+				parse_subscription_options(stmt->options, NULL,
+										   NULL, NULL,	/* enabled */
+										   NULL, NULL, NULL,	/* slot */
+										   NULL, NULL,	/* logical wm */
+										   &copy_data, NULL, NULL);
 
 				AlterSubscription_refresh(sub, copy_data);
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 536722b32f..d243d90821 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "catalog/pg_publication.h"
+#include "commands/defrem.h"
 #include "fmgr.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
@@ -141,26 +142,20 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 		}
 		else if (strcmp(defel->defname, "work_mem") == 0)
 		{
-			int64	parsed;
-
 			if (work_mem_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 			work_mem_given = true;
+			/* Test if the value is valid for logical_decoding_work_mem */
+			(void) set_config_option("logical_decoding_work_mem", defGetString(defel),
+									 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
+									 false, 0, false);
 
-			if (!scanint8(strVal(defel->arg), true, &parsed))
-				ereport(ERROR,
-						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-						 errmsg("invalid work_mem")));
-
-			if (parsed > PG_INT32_MAX || parsed < 64)
-				ereport(ERROR,
-						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-						 errmsg("work_mem \"%s\" out of range",
-								strVal(defel->arg))));
-
-			*logical_decoding_work_mem = (int)parsed;
+			/* by here it must be valid, so this shouldn't fail */
+			if (!parse_int(defGetString(defel), logical_decoding_work_mem,
+						   GUC_UNIT_KB, NULL))
+				elog(ERROR, "parse_int failed");	/* shouldn't happen */
 		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index f3c7eb96fa..956ad41f56 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5933,7 +5933,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false};
+	false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -5961,8 +5961,10 @@ describeSubscriptions(const char *pattern, bool verbose)
 	{
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
+						  ",  pg_catalog.pg_size_pretty(subworkmem::bigint * 1024) AS \"%s\"\n"
 						  ",  subconninfo AS \"%s\"\n",
 						  gettext_noop("Synchronous commit"),
+						  gettext_noop("Working Memory"),
 						  gettext_noop("Conninfo"));
 	}
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 3394379f86..eef585b0e5 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -47,7 +47,6 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
-
 	int32		subworkmem;		/* Memory to use to decode changes. */
 
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
-- 
2.20.1

Reply via email to