From 38490cd763850d8aee9e0efda597fe61711527be Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Mon, 8 Aug 2022 14:14:44 +0300
Subject: [PATCH v8] Allow logical replication to copy table in binary

If binary option is enabled in a subscription and copy_format is set to binary, then copy tables in
binary format during table synchronization.

The patch introduces a new parameter, copy_format, to CREATE SUBSCRIPTION to allow to choose
the format used in initial table synchronization.

Without this patch, table are copied in text format even if the
subscription is created with binary option. When binary format is
beneficial to use, allowing the subscription to copy tables in binary in
table sync phase may reduce the time spent on copy depending on column types.

Discussion: https://postgr.es/m/CAGPVpCQvAziCLknEnygY0v1-KBtg%2BOm-9JHJYZOnNPKFJPompw%40mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                  |  12 +-
 doc/src/sgml/logical-replication.sgml       |   9 +-
 doc/src/sgml/ref/alter_subscription.sgml    |   4 +-
 doc/src/sgml/ref/create_subscription.sgml   |  17 ++
 src/backend/catalog/pg_subscription.c       |   1 +
 src/backend/commands/subscriptioncmds.c     |  84 +++++-
 src/backend/replication/logical/tablesync.c |  13 +-
 src/bin/pg_dump/pg_dump.c                   |  17 +-
 src/bin/pg_dump/pg_dump.h                   |   1 +
 src/bin/psql/describe.c                     |   8 +-
 src/bin/psql/tab-complete.c                 |   5 +-
 src/include/catalog/pg_subscription.h       |  13 +
 src/include/commands/subscriptioncmds.h     |   1 +
 src/test/regress/expected/subscription.out  | 192 ++++++++------
 src/test/regress/sql/subscription.sql       |  26 ++
 src/test/subscription/t/002_types.pl        | 269 +++++++++++++++-----
 src/test/subscription/t/014_binary.pl       |  82 +++++-
 17 files changed, 604 insertions(+), 150 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1e4048054..14e8cad531 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7907,7 +7907,6 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para>
       <para>
        If true, the subscription will request that the publisher send data
-       in binary format
       </para></entry>
      </row>
 
@@ -8002,6 +8001,17 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        origin.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subcopyformat</structfield> <type>bool</type>
+      </para>
+      <para>
+       Specifies the format for copying the initial data in a subscription.
+       <literal>t</literal> indicates text, <literal>b</literal> indicates
+       binary format.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 1bd5660c87..9540917ea3 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -241,10 +241,11 @@
    types of the columns do not need to match, as long as the text
    representation of the data can be converted to the target type.  For
    example, you can replicate from a column of type <type>integer</type> to a
-   column of type <type>bigint</type>.  The target table can also have
-   additional columns not provided by the published table.  Any such columns
-   will be filled with the default value as specified in the definition of the
-   target table.
+   column of type <type>bigint</type>.  Replication in binary format is type
+   specific and does not allow to replicate data between different types according
+   to its restrictions.  The target table can also have additional columns not provided
+   by the published table.  Any such columns will be filled with the default
+   value as specified in the definition of the target table.
   </para>
 
   <sect2 id="logical-replication-subscription-slot">
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 964fcbb8ff..c560d71f91 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -213,8 +213,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
       <literal>binary</literal>, <literal>streaming</literal>,
-      <literal>disable_on_error</literal>, and
-      <literal>origin</literal>.
+      <literal>disable_on_error</literal>, <literal>origin</literal>
+      and <literal>copy_format</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 51c45f17c7..f01b762d81 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -349,6 +349,23 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+      <varlistentry>
+        <term><literal>copy_format</literal> (<type>string</type>)</term>
+        <listitem>
+         <para>
+          Specifies the format in which pre-existing data on the publisher will
+          copied to the subscriber. Supported formats are
+          <literal>text</literal> and <literal>binary</literal>. The default is
+          <literal>text</literal>.
+          <literal>binary</literal> format can be selected only if
+          <literal>binary</literal> is set to <literal>true</literal>. Also,
+          the binary format is very data type specific, it will not allow copying
+          between different column types as opposed to text format. Note that
+          data will not be copied if <literal>copy_data = false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
 
     </listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae311c3..da078acefe 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
+	sub->copyformat = subform->subcopyformat;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d247..4e4023623d 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,7 @@
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
 #define SUBOPT_ORIGIN				0x00001000
+#define SUBOPT_COPY_FORMAT			0x00002000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -90,6 +91,7 @@ typedef struct SubOpts
 	bool		disableonerr;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	char	    copy_format;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -146,6 +148,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->disableonerr = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_COPY_FORMAT))
+		opts->copy_format = LOGICALREP_COPY_AS_TEXT;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -324,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_COPY_FORMAT) &&
+				 strcmp(defel->defname, "copy_format") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_COPY_FORMAT))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_COPY_FORMAT;
+			opts->copy_format = defGetCopyFormat(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -404,6 +417,17 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 								"slot_name = NONE", "create_slot = false")));
 		}
 	}
+
+	if (IsSet(opts->specified_opts, SUBOPT_BINARY) &&
+		!opts->binary &&
+		opts->copy_format == LOGICALREP_COPY_AS_BINARY)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+					errmsg("%s and %s are mutually exclusive options",
+						"binary = false", "copy_format = binary")));
+	}
+
 }
 
 /*
@@ -560,7 +584,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN |
+					  SUBOPT_COPY_FORMAT);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -649,6 +674,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		publicationListToArray(publications);
 	values[Anum_pg_subscription_suborigin - 1] =
 		CStringGetTextDatum(opts.origin);
+	values[Anum_pg_subscription_subcopyformat - 1] = CharGetDatum(opts.copy_format);
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -1054,7 +1080,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN | SUBOPT_COPY_FORMAT);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1091,6 +1117,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_BINARY))
 				{
+					/* Do not allow binary = false with copy_format = binary */
+					if (!opts.binary &&
+						sub->copyformat == LOGICALREP_COPY_AS_BINARY &&
+						!IsSet(opts.specified_opts, SUBOPT_COPY_FORMAT))
+						ereport(ERROR,
+								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								 errmsg("cannot set %s for a subscription with %s",
+										"binary = false", "copy_format = binary")));
+
 					values[Anum_pg_subscription_subbinary - 1] =
 						BoolGetDatum(opts.binary);
 					replaces[Anum_pg_subscription_subbinary - 1] = true;
@@ -1118,6 +1153,21 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_suborigin - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_COPY_FORMAT))
+				{
+					/* Do not allow binary = false with copy_format = binary */
+					if (opts.copy_format == LOGICALREP_COPY_AS_BINARY &&
+						!sub->binary &&
+						!IsSet(opts.specified_opts, SUBOPT_BINARY))
+						ereport(ERROR,
+								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								 errmsg("cannot set %s for a subscription with %s",
+										"copy_format = binary", "binary = false")));
+					values[Anum_pg_subscription_subcopyformat - 1] =
+						CharGetDatum(opts.copy_format);
+					replaces[Anum_pg_subscription_subcopyformat - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -2195,3 +2245,33 @@ defGetStreamingMode(DefElem *def)
 					def->defname)));
 	return LOGICALREP_STREAM_OFF;	/* keep compiler quiet */
 }
+
+/*
+ * Extract the copy format value from a DefElem.
+ */
+char
+defGetCopyFormat(DefElem *def)
+{
+	char	   *sval;
+
+	/*
+	 * If no parameter value given, set it to text format.
+	 */
+	if (!def->arg)
+		return LOGICALREP_COPY_AS_TEXT;
+
+	/*
+	 * Currently supported formats are "text" and "binary".
+	 */
+	sval = defGetString(def);
+	if (pg_strcasecmp(sval, "text") == 0)
+		return LOGICALREP_COPY_AS_TEXT;
+	if (pg_strcasecmp(sval, "binary") == 0)
+		return LOGICALREP_COPY_AS_BINARY;
+
+	ereport(ERROR,
+			(errcode(ERRCODE_SYNTAX_ERROR),
+			 errmsg("%s value must be either \"text\" or \"binary\"",
+					def->defname)));
+	return LOGICALREP_COPY_AS_TEXT;	/* keep compiler quiet */
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 07eea504ba..62c866c9eb 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -101,6 +101,7 @@
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
 #include "miscadmin.h"
+#include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
@@ -1090,6 +1091,7 @@ copy_table(Relation rel)
 	CopyFromState cstate;
 	List	   *attnamelist;
 	ParseState *pstate;
+	List 	   *options = NIL;
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
@@ -1168,6 +1170,15 @@ copy_table(Relation rel)
 
 		appendStringInfoString(&cmd, ") TO STDOUT");
 	}
+
+	/* If the publisher is v16 or later, specify the format to copy data. */
+	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000)
+	{
+		char *format = MySubscription->copyformat == LOGICALREP_COPY_AS_BINARY ? "binary" : "text";
+		appendStringInfo(&cmd, "  WITH (FORMAT %s)", format);
+		options = lappend(options, makeDefElem("format", (Node *) makeString(format), -1));
+	}
+
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
@@ -1184,7 +1195,7 @@ copy_table(Relation rel)
 										 NULL, false, false);
 
 	attnamelist = make_copy_attnamelist(relmapentry);
-	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
+	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
 
 	/* Do the copy */
 	(void) CopyFrom(cstate);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 527c7651ab..9b97aa06e5 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4494,6 +4494,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_subcopyformat;
 	int			i,
 				ntups;
 
@@ -4546,9 +4547,15 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
 	if (fout->remoteVersion >= 160000)
-		appendPQExpBufferStr(query, " s.suborigin\n");
+	{
+		appendPQExpBufferStr(query, " s.suborigin,\n");
+		appendPQExpBufferStr(query, " s.subcopyformat\n");
+	}
 	else
-		appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+	{
+		appendPQExpBuffer(query, " '%s' AS suborigin,\n", LOGICALREP_ORIGIN_ANY);
+		appendPQExpBuffer(query, " '%c' AS subcopyformat\n", LOGICALREP_COPY_AS_TEXT);
+	}
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
@@ -4576,6 +4583,7 @@ getSubscriptions(Archive *fout)
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
+	i_subcopyformat = PQfnumber(res, "subcopyformat");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4606,6 +4614,8 @@ getSubscriptions(Archive *fout)
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+		subinfo[i].subcopyformat =
+			pg_strdup(PQgetvalue(res, i, i_subcopyformat));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4687,6 +4697,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
+	if (strcmp(subinfo->subcopyformat, "t") != 0)
+		appendPQExpBuffer(query, ", copy_format = %s", subinfo->subcopyformat);
+
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index e7cbd8d7ed..f6b7dbcd97 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -662,6 +662,7 @@ typedef struct _SubscriptionInfo
 	char	   *suborigin;
 	char	   *subsynccommit;
 	char	   *subpublications;
+	char	   *subcopyformat;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c8a0bb7b3a..8e9523aba4 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6472,7 +6472,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6528,9 +6528,15 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Disable on error"));
 
 		if (pset.sversion >= 160000)
+		{
 			appendPQExpBuffer(&buf,
 							  ", suborigin AS \"%s\"\n",
 							  gettext_noop("Origin"));
+			/* Copy format is only supported in v16 and higher */
+			appendPQExpBuffer(&buf,
+							  ", subcopyformat AS \"%s\"\n",
+							  gettext_noop("Copy Format"));
+		}
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 5e1882eaea..acc178780c 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1926,7 +1926,7 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name",
-					  "streaming", "synchronous_commit");
+					  "streaming", "synchronous_commit", "copy_format");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
@@ -3269,7 +3269,8 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "disable_on_error", "enabled", "origin", "slot_name",
-					  "streaming", "synchronous_commit", "two_phase");
+					  "streaming", "synchronous_commit", "two_phase",
+					  "copy_format");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a1705d..fe74cf2cce 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -88,6 +88,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subdisableonerr;	/* True if a worker error should cause the
 									 * subscription to be disabled */
 
+	char		subcopyformat BKI_DEFAULT(LOGICALREP_COPY_AS_TEXT); /* Copy format */
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -137,6 +138,8 @@ typedef struct Subscription
 	List	   *publications;	/* List of publication names to subscribe to */
 	char	   *origin;			/* Only publish data originating from the
 								 * specified origin */
+	char		copyformat;		/* Copy format for subscriptions with binary
+								 * option enabled */
 } Subscription;
 
 /* Disallow streaming in-progress transactions. */
@@ -154,6 +157,16 @@ typedef struct Subscription
  */
 #define LOGICALREP_STREAM_PARALLEL 'p'
 
+/*
+ * Copy data as text. This is the default.
+ */
+#define LOGICALREP_COPY_AS_TEXT 't'
+
+/*
+ * Copy data as binary if the binary options is enabled in the subscription.
+ */
+#define LOGICALREP_COPY_AS_BINARY 'b'
+
 extern Subscription *GetSubscription(Oid subid, bool missing_ok);
 extern void FreeSubscription(Subscription *sub);
 extern void DisableSubscription(Oid subid);
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 214dc6c29e..7937defea5 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -27,5 +27,6 @@ extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
 extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
 
 extern char defGetStreamingMode(DefElem *def);
+extern char defGetCopyFormat(DefElem *def);
 
 #endif							/* SUBSCRIPTIONCMDS_H */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 3f99b14394..045f34e710 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -143,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -163,10 +163,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345  | t
 (1 row)
 
 -- ok - with lsn = NONE
@@ -175,10 +175,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0      | t
 (1 row)
 
 BEGIN;
@@ -210,10 +210,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                               List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                      List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN | Copy Format 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------+-------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0      | t
 (1 row)
 
 -- rename back to keep the rest simple
@@ -247,19 +247,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -271,27 +271,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 -- fail - publication already exists
@@ -306,10 +306,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                        List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 -- fail - publication used more than once
@@ -324,10 +324,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -363,10 +363,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -375,10 +375,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -388,10 +388,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -404,22 +404,70 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
+-- fail - cannot set binary = false and copy_format = binary in a subscription 
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (binary = false, copy_format = binary, connect = false, slot_name = NONE);
+ERROR:  binary = false and copy_format = binary are mutually exclusive options
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (binary = true, copy_format = binary, connect = false, slot_name = NONE);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+\dRs+
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | b
+(1 row)
+
+-- alter copy_format to text
+ALTER SUBSCRIPTION regress_testsub SET (copy_format = text);
+\dRs+
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
+(1 row)
+
+-- alter copy_format back to binary 
+ALTER SUBSCRIPTION regress_testsub SET (copy_format = binary);
+\dRs+
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | b
+(1 row)
+
+-- fail - cannot set binary = false if copy_format = binary
+ALTER SUBSCRIPTION regress_testsub SET (binary = false);
+ERROR:  cannot set binary = false for a subscription with copy_format = binary
+-- now it works
+ALTER SUBSCRIPTION regress_testsub SET (binary = false, copy_format = text);
+\dRs+
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
+(1 row)
+
+-- fail - cannot set copy_format = binary for a subscription with binary = false
+ALTER SUBSCRIPTION regress_testsub SET (copy_format = binary);
+ERROR:  cannot set copy_format = binary for a subscription with binary = false
+DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 7281f5fee2..875f3af6bf 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -286,6 +286,32 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail - cannot set binary = false and copy_format = binary in a subscription
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (binary = false, copy_format = binary, connect = false, slot_name = NONE);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (binary = true, copy_format = binary, connect = false, slot_name = NONE);
+\dRs+
+
+-- alter copy_format to text
+ALTER SUBSCRIPTION regress_testsub SET (copy_format = text);
+\dRs+
+
+-- alter copy_format back to binary
+ALTER SUBSCRIPTION regress_testsub SET (copy_format = binary);
+\dRs+
+
+-- fail - cannot set binary = false if copy_format = binary
+ALTER SUBSCRIPTION regress_testsub SET (binary = false);
+
+-- now it works
+ALTER SUBSCRIPTION regress_testsub SET (binary = false, copy_format = text);
+\dRs+
+
+-- fail - cannot set copy_format = binary for a subscription with binary = false
+ALTER SUBSCRIPTION regress_testsub SET (copy_format = binary);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl
index 6b5853b80b..02db58dc3b 100644
--- a/src/test/subscription/t/002_types.pl
+++ b/src/test/subscription/t/002_types.pl
@@ -19,6 +19,11 @@ my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->start;
 
+# Create binary subscriber node
+my $node_subscriber_binary = PostgreSQL::Test::Cluster->new('subscriber_binary');
+$node_subscriber_binary->init(allows_streaming => 'logical');
+$node_subscriber_binary->start;
+
 # Create some preexisting content on publisher
 my $ddl = qq(
 	CREATE EXTENSION hstore WITH SCHEMA public;
@@ -104,6 +109,100 @@ my $ddl = qq(
 # Setup structure on both nodes
 $node_publisher->safe_psql('postgres', $ddl);
 $node_subscriber->safe_psql('postgres', $ddl);
+$node_subscriber_binary->safe_psql('postgres', $ddl);
+
+# Insert initial test data
+$node_publisher->safe_psql(
+	'postgres', qq(
+	-- test_tbl_one_array_col
+	INSERT INTO tst_one_array (a, b) VALUES
+		(1, '{1, 2, 3}'),
+		(2, '{2, 3, 1}');
+
+	-- test_tbl_arrays
+	INSERT INTO tst_arrays (a, b, c, d) VALUES
+		('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'),
+		('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}');
+
+	-- test_tbl_single_enum
+	INSERT INTO tst_one_enum (a, b) VALUES
+		(1, 'a'),
+		(2, 'b');
+
+	-- test_tbl_enums
+	INSERT INTO tst_enums (a, b) VALUES
+		('a', '{b, c}'),
+		('b', '{c, a}');
+
+	-- test_tbl_single_composites
+	INSERT INTO tst_one_comp (a, b) VALUES
+		(1, ROW(1.0, 'a', 1)),
+		(2, ROW(2.0, 'b', 2));
+
+	-- test_tbl_composites
+	INSERT INTO tst_comps (a, b) VALUES
+		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]),
+		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]);
+
+	-- test_tbl_composite_with_enums
+	INSERT INTO tst_comp_enum (a, b) VALUES
+		(1, ROW(1.0, 'a', 1)),
+		(2, ROW(2.0, 'b', 2));
+
+	-- test_tbl_composite_with_enums_array
+	INSERT INTO tst_comp_enum_array (a, b) VALUES
+		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]),
+		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]);
+
+	-- test_tbl_composite_with_single_enums_array_in_composite
+	INSERT INTO tst_comp_one_enum_array (a, b) VALUES
+		(1, ROW(1.0, '{a, b, c}', 1)),
+		(2, ROW(2.0, '{a, b, c}', 2));
+
+	-- test_tbl_composite_with_enums_array_in_composite
+	INSERT INTO tst_comp_enum_what (a, b) VALUES
+		(ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]),
+		(ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]);
+
+	-- test_tbl_mixed_composites
+	INSERT INTO tst_comp_mix_array (a, b) VALUES
+		(ROW(
+			ROW(1,'a',1),
+			ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t],
+			'a',
+			'{a,b,NULL,c}'),
+		ARRAY[
+			ROW(
+				ROW(1,'a',1),
+				ARRAY[
+					ROW(1,'a',1)::tst_comp_basic_t,
+					ROW(2,'b',2)::tst_comp_basic_t,
+					NULL
+					],
+				'a',
+				'{a,b,c}'
+				)::tst_comp_mix_t
+			]
+		);
+
+	-- test_tbl_range
+	INSERT INTO tst_range (a, b) VALUES
+		(1, '[1, 10]'),
+		(2, '[2, 20]');
+
+	-- test_tbl_range_array
+	INSERT INTO tst_range_array (a, b, c) VALUES
+		(1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'),
+		(2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}');
+
+	-- tst_hstore
+	INSERT INTO tst_hstore (a, b) VALUES
+		(1, '"a"=>"1"'),
+		(2, '"zzz"=>"foo"');
+
+	-- tst_dom_constr
+	INSERT INTO tst_dom_constr VALUES (10);
+));
 
 # Setup logical replication
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
@@ -113,89 +212,126 @@ $node_publisher->safe_psql('postgres',
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
 );
+$node_subscriber_binary->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_binary CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_binary_slot, binary = true, copy_format = 'binary')"
+);
 
 # Wait for initial sync to finish as well
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+$node_subscriber_binary->wait_for_subscription_sync($node_publisher, 'tap_sub_binary');
 
-# Insert initial test data
+my $sync_check =  qq(
+	SET timezone = '+2';
+	SELECT a, b FROM tst_one_array ORDER BY a;
+	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
+	SELECT a, b FROM tst_one_enum ORDER BY a;
+	SELECT a, b FROM tst_enums ORDER BY a;
+	SELECT a, b FROM tst_one_comp ORDER BY a;
+	SELECT a, b FROM tst_comps ORDER BY a;
+	SELECT a, b FROM tst_comp_enum ORDER BY a;
+	SELECT a, b FROM tst_comp_enum_array ORDER BY a;
+	SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
+	SELECT a, b FROM tst_comp_enum_what ORDER BY a;
+	SELECT a, b FROM tst_comp_mix_array ORDER BY a;
+	SELECT a, b FROM tst_range ORDER BY a;
+	SELECT a, b, c FROM tst_range_array ORDER BY a;
+	SELECT a, b FROM tst_hstore ORDER BY a;
+);
+
+# Check the synced data on subscribers
+my $result = $node_subscriber->safe_psql('postgres', $sync_check);
+my $result_binary = $node_subscriber_binary->safe_psql('postgres', $sync_check);
+
+my $sync_result = '1|{1,2,3}
+2|{2,3,1}
+{1,2,3}|{a,b,c}|{1.1,2.2,3.3}|{"1 day","2 days","3 days"}
+{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00}
+1|a
+2|b
+a|{b,c}
+b|{c,a}
+1|(1,a,1)
+2|(2,b,2)
+(1,a,1)|{"(1,a,1)"}
+(2,b,2)|{"(2,b,2)"}
+1|(1,a,1)
+2|(2,b,2)
+(1,a,1)|{"(1,a,1)"}
+(2,b,2)|{"(2,b,2)"}
+1|(1,"{a,b,c}",1)
+2|(2,"{a,b,c}",2)
+(1,"{a,b,c}",1)|{"(1,\"{a,b,c}\",1)"}
+(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
+("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
+1|[1,11)
+2|[2,21)
+1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
+1|"a"=>"1"
+2|"zzz"=>"foo"';
+
+is( $result, $sync_result, 'check initial sync on subscriber');
+is( $result_binary, $sync_result, 'check initial sync on subscriber in binary');
+
+# Insert test data for update to apply changes
 $node_publisher->safe_psql(
 	'postgres', qq(
 	-- test_tbl_one_array_col
 	INSERT INTO tst_one_array (a, b) VALUES
-		(1, '{1, 2, 3}'),
-		(2, '{2, 3, 1}'),
 		(3, '{3, 2, 1}'),
 		(4, '{4, 3, 2}'),
 		(5, '{5, NULL, 3}');
 
 	-- test_tbl_arrays
 	INSERT INTO tst_arrays (a, b, c, d) VALUES
-		('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'),
-		('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}'),
 		('{3, 1, 2}', '{"c", "a", "b"}', '{3.3, 1.1, 2.2}', '{"3 years", "1 year", "2 years"}'),
 		('{4, 1, 2}', '{"d", "a", "b"}', '{4.4, 1.1, 2.2}', '{"4 years", "1 year", "2 years"}'),
 		('{5, NULL, NULL}', '{"e", NULL, "b"}', '{5.5, 1.1, NULL}', '{"5 years", NULL, NULL}');
 
 	-- test_tbl_single_enum
 	INSERT INTO tst_one_enum (a, b) VALUES
-		(1, 'a'),
-		(2, 'b'),
 		(3, 'c'),
 		(4, 'd'),
 		(5, NULL);
 
 	-- test_tbl_enums
 	INSERT INTO tst_enums (a, b) VALUES
-		('a', '{b, c}'),
-		('b', '{c, a}'),
 		('c', '{b, a}'),
 		('d', '{c, b}'),
 		('e', '{d, NULL}');
 
 	-- test_tbl_single_composites
 	INSERT INTO tst_one_comp (a, b) VALUES
-		(1, ROW(1.0, 'a', 1)),
-		(2, ROW(2.0, 'b', 2)),
 		(3, ROW(3.0, 'c', 3)),
 		(4, ROW(4.0, 'd', 4)),
 		(5, ROW(NULL, NULL, 5));
 
 	-- test_tbl_composites
 	INSERT INTO tst_comps (a, b) VALUES
-		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]),
-		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]),
 		(ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_basic_t]),
 		(ROW(4.0, 'd', 4), ARRAY[ROW(4, 'd', 3)::tst_comp_basic_t]),
 		(ROW(5.0, 'e', NULL), ARRAY[NULL, ROW(5, NULL, 5)::tst_comp_basic_t]);
 
 	-- test_tbl_composite_with_enums
 	INSERT INTO tst_comp_enum (a, b) VALUES
-		(1, ROW(1.0, 'a', 1)),
-		(2, ROW(2.0, 'b', 2)),
 		(3, ROW(3.0, 'c', 3)),
 		(4, ROW(4.0, 'd', 4)),
 		(5, ROW(NULL, 'e', NULL));
 
 	-- test_tbl_composite_with_enums_array
 	INSERT INTO tst_comp_enum_array (a, b) VALUES
-		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]),
-		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]),
 		(ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_enum_t]),
 		(ROW(4.0, 'd', 3), ARRAY[ROW(3, 'd', 3)::tst_comp_enum_t]),
 		(ROW(5.0, 'e', 3), ARRAY[ROW(3, 'e', 3)::tst_comp_enum_t, NULL]);
 
 	-- test_tbl_composite_with_single_enums_array_in_composite
 	INSERT INTO tst_comp_one_enum_array (a, b) VALUES
-		(1, ROW(1.0, '{a, b, c}', 1)),
-		(2, ROW(2.0, '{a, b, c}', 2)),
 		(3, ROW(3.0, '{a, b, c}', 3)),
 		(4, ROW(4.0, '{c, b, d}', 4)),
 		(5, ROW(5.0, '{NULL, e, NULL}', 5));
 
 	-- test_tbl_composite_with_enums_array_in_composite
 	INSERT INTO tst_comp_enum_what (a, b) VALUES
-		(ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]),
-		(ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]),
 		(ROW(3.0, '{c, a, b}', 1), ARRAY[ROW(3, '{c, a, b}', 1)::tst_comp_enum_array_t]),
 		(ROW(4.0, '{c, b, d}', 4), ARRAY[ROW(4, '{c, b, d}', 4)::tst_comp_enum_array_t]),
 		(ROW(5.0, '{c, NULL, b}', NULL), ARRAY[ROW(5, '{c, e, b}', 1)::tst_comp_enum_array_t]);
@@ -203,10 +339,10 @@ $node_publisher->safe_psql(
 	-- test_tbl_mixed_composites
 	INSERT INTO tst_comp_mix_array (a, b) VALUES
 		(ROW(
-			ROW(1,'a',1),
-			ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t],
-			'a',
-			'{a,b,NULL,c}'),
+			ROW(2,'b',2),
+			ARRAY[ROW(2,'b',2)::tst_comp_basic_t, ROW(3,'c',3)::tst_comp_basic_t],
+			'b',
+			'{b,c,NULL,d}'),
 		ARRAY[
 			ROW(
 				ROW(1,'a',1),
@@ -223,36 +359,29 @@ $node_publisher->safe_psql(
 
 	-- test_tbl_range
 	INSERT INTO tst_range (a, b) VALUES
-		(1, '[1, 10]'),
-		(2, '[2, 20]'),
 		(3, '[3, 30]'),
 		(4, '[4, 40]'),
 		(5, '[5, 50]');
 
 	-- test_tbl_range_array
 	INSERT INTO tst_range_array (a, b, c) VALUES
-		(1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'),
-		(2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}'),
 		(3, tstzrange('Fri Aug 01 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[3,4]"}'),
 		(4, tstzrange('Thu Jul 31 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[4,5]", NULL, "[40,50]"}'),
 		(5, NULL, NULL);
 
 	-- tst_hstore
 	INSERT INTO tst_hstore (a, b) VALUES
-		(1, '"a"=>"1"'),
-		(2, '"zzz"=>"foo"'),
 		(3, '"123"=>"321"'),
 		(4, '"yellow horse"=>"moaned"');
 
 	-- tst_dom_constr
-	INSERT INTO tst_dom_constr VALUES (10);
+	INSERT INTO tst_dom_constr VALUES (11);
 ));
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
-# Check the data on subscriber
-my $result = $node_subscriber->safe_psql(
-	'postgres', qq(
+my $initial_check =  qq(
 	SET timezone = '+2';
 	SELECT a, b FROM tst_one_array ORDER BY a;
 	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
@@ -268,9 +397,13 @@ my $result = $node_subscriber->safe_psql(
 	SELECT a, b FROM tst_range ORDER BY a;
 	SELECT a, b, c FROM tst_range_array ORDER BY a;
 	SELECT a, b FROM tst_hstore ORDER BY a;
-));
+);
+
+# Check the data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $initial_check);
+$result_binary = $node_subscriber_binary->safe_psql('postgres', $initial_check);
 
-is( $result, '1|{1,2,3}
+my $initial_result = '1|{1,2,3}
 2|{2,3,1}
 3|{3,2,1}
 4|{4,3,2}
@@ -321,6 +454,7 @@ e|{d,NULL}
 (4,"{c,b,d}",4)|{"(4,\"{c,b,d}\",4)"}
 (5,"{c,NULL,b}",)|{"(5,\"{c,e,b}\",1)"}
 ("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
+("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
 1|[1,11)
 2|[2,21)
 3|[3,31)
@@ -334,8 +468,10 @@ e|{d,NULL}
 1|"a"=>"1"
 2|"zzz"=>"foo"
 3|"123"=>"321"
-4|"yellow horse"=>"moaned"',
-	'check replicated inserts on subscriber');
+4|"yellow horse"=>"moaned"';
+
+is( $result, $initial_result, 'check replicated inserts on subscriber');
+is( $result_binary, $initial_result, 'check replicated inserts on subscriber in binary');
 
 # Run batch of updates
 $node_publisher->safe_psql(
@@ -370,10 +506,9 @@ $node_publisher->safe_psql(
 ));
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
-# Check the data on subscriber
-$result = $node_subscriber->safe_psql(
-	'postgres', qq(
+my $update_check =  qq(
 	SET timezone = '+2';
 	SELECT a, b FROM tst_one_array ORDER BY a;
 	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
@@ -389,9 +524,13 @@ $result = $node_subscriber->safe_psql(
 	SELECT a, b FROM tst_range ORDER BY a;
 	SELECT a, b, c FROM tst_range_array ORDER BY a;
 	SELECT a, b FROM tst_hstore ORDER BY a;
-));
+);
+
+# Check the data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $update_check);
+$result_binary = $node_subscriber_binary->safe_psql('postgres', $update_check);
 
-is( $result, '1|{4,5,6}
+my $update_result = '1|{4,5,6}
 2|{2,3,1}
 3|{3,2,1}
 4|{4,5,6,1}
@@ -442,6 +581,7 @@ e|{e,d}
 (4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"}
 (5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"}
 ("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")",NULL}
+("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
 1|[100,1001)
 2|[2,21)
 3|[3,31)
@@ -455,8 +595,10 @@ e|{e,d}
 1|"updated"=>"value"
 2|"updated"=>"value"
 3|"also"=>"updated"
-4|"yellow horse"=>"moaned"',
-	'check replicated updates on subscriber');
+4|"yellow horse"=>"moaned"';
+
+is( $result, $update_result, 'check replicated updates on subscriber');
+is( $result_binary, $update_result, 'check replicated updates on subscriber in binary');
 
 # Run batch of deletes
 $node_publisher->safe_psql(
@@ -490,10 +632,9 @@ $node_publisher->safe_psql(
 ));
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
-# Check the data on subscriber
-$result = $node_subscriber->safe_psql(
-	'postgres', qq(
+my $delete_check =  qq(
 	SET timezone = '+2';
 	SELECT a, b FROM tst_one_array ORDER BY a;
 	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
@@ -509,9 +650,13 @@ $result = $node_subscriber->safe_psql(
 	SELECT a, b FROM tst_range ORDER BY a;
 	SELECT a, b, c FROM tst_range_array ORDER BY a;
 	SELECT a, b FROM tst_hstore ORDER BY a;
-));
+);
+
+# Check the data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $delete_check);
+$result_binary = $node_subscriber_binary->safe_psql('postgres', $delete_check);
 
-is( $result, '3|{3,2,1}
+my $delete_result = '3|{3,2,1}
 4|{4,5,6,1}
 5|{4,5,6,1}
 {3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"}
@@ -540,26 +685,34 @@ e|{e,d}
 (2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
 (4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"}
 (5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"}
+("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
 2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
 3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"}
 2|"updated"=>"value"
 3|"also"=>"updated"
-4|"yellow horse"=>"moaned"',
-	'check replicated deletes on subscriber');
+4|"yellow horse"=>"moaned"';
+
+is( $result, $delete_result, 'check replicated deletes on subscriber');
+is( $result_binary, $delete_result, 'check replicated deletes on subscriber in binary');
 
 # Test a domain with a constraint backed by a SQL-language function,
 # which needs an active snapshot in order to operate.
 $node_publisher->safe_psql('postgres',
-	"INSERT INTO tst_dom_constr VALUES (11)");
+	"INSERT INTO tst_dom_constr VALUES (12)");
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
+
+my $domain_check = qq(SELECT sum(a) FROM tst_dom_constr);
+
+$result = $node_subscriber->safe_psql('postgres', $domain_check);
+$result_binary = $node_subscriber->safe_psql('postgres', $domain_check);
 
-$result =
-  $node_subscriber->safe_psql('postgres',
-	"SELECT sum(a) FROM tst_dom_constr");
-is($result, '21', 'sql-function constraint on domain');
+is( $result, '33', 'sql-function constraint on domain');
+is( $result_binary, '33', 'sql-function constraint on domain');
 
 $node_subscriber->stop('fast');
+$node_subscriber_binary->stop('fast');
 $node_publisher->stop('fast');
 
 done_testing();
diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl
index e53e23da3e..4fee421733 100644
--- a/src/test/subscription/t/014_binary.pl
+++ b/src/test/subscription/t/014_binary.pl
@@ -36,6 +36,16 @@ my $ddl = qq(
 $node_publisher->safe_psql('postgres', $ddl);
 $node_subscriber->safe_psql('postgres', $ddl);
 
+# Insert some content and make sure it's synced to subscriber
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO public.test_arrays (a, b, c) VALUES
+		('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}');
+
+	INSERT INTO public.test_numerical (a, b, c, d) VALUES
+		(1, 1.2, 1.3, 10);
+	));
+
 # Configure logical replication
 $node_publisher->safe_psql('postgres',
 	"CREATE PUBLICATION tpub FOR ALL TABLES");
@@ -48,27 +58,35 @@ $node_subscriber->safe_psql('postgres',
 # Ensure nodes are in sync with each other
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
 
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, b, c, d FROM test_numerical ORDER BY a;
+	SELECT a, b, c FROM test_arrays ORDER BY a;");
+
+is( $result, '1|1.2|1.3|10
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}', 'check syned data on subscriber');
+
 # Insert some content and make sure it's replicated across
 $node_publisher->safe_psql(
 	'postgres', qq(
 	INSERT INTO public.test_arrays (a, b, c) VALUES
-		('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
 		('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
 
 	INSERT INTO public.test_numerical (a, b, c, d) VALUES
-		(1, 1.2, 1.3, 10),
 		(2, 2.2, 2.3, 20),
 		(3, 3.2, 3.3, 30);
 	));
 
 $node_publisher->wait_for_catchup('tsub');
 
-my $result = $node_subscriber->safe_psql('postgres',
-	"SELECT a, b, c, d FROM test_numerical ORDER BY a");
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, b, c, d FROM test_numerical ORDER BY a;
+	SELECT a, b, c FROM test_arrays ORDER BY a;");
 
 is( $result, '1|1.2|1.3|10
 2|2.2|2.3|20
-3|3.2|3.3|30', 'check replicated data on subscriber');
+3|3.2|3.3|30
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}
+{3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check replicated data on subscriber');
 
 # Test updates as well
 $node_publisher->safe_psql(
@@ -130,6 +148,60 @@ is( $result, '{1,2,3}|{42,1.2,1.3}|
 {2,3,1}|{1.2,1.3,1.1}|{two,three,one}
 {3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber');
 
+# Test whether subscriber fails in case of column type mismatch
+$ddl = qq(
+	CREATE TABLE public.test_cross_type (a SMALLINT);
+	INSERT INTO public.test_cross_type VALUES (1);
+);
+$node_publisher->safe_psql('postgres', $ddl);
+
+$ddl = qq(
+	CREATE TABLE public.test_cross_type (a INTEGER);
+);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Change the format of inital copy to binary
+$node_subscriber->safe_psql('postgres', 'ALTER SUBSCRIPTION tsub SET (copy_format = binary)');
+$node_subscriber->safe_psql('postgres', 'ALTER SUBSCRIPTION tsub REFRESH PUBLICATION');
+
+# Cannot copy initial data in binary format because of type mismatch
+$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? insufficient data left in message/);
+
+# Reset the format of initial copy back to text
+$node_subscriber->safe_psql('postgres', 'ALTER SUBSCRIPTION tsub SET (copy_format = text)');
+
+# It should be able to sync the table now
+$node_subscriber->wait_for_subscription_sync($node_publisher,'tsub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT a FROM test_cross_type ORDER BY a");
+
+is( $result, '1', 'check initial sync of data with type mismatch');
+
+# Change the format of inital copy to binary again to test the apply phase
+$node_subscriber->safe_psql('postgres', 'ALTER SUBSCRIPTION tsub SET (copy_format = binary)');
+
+# Insert a new row
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO public.test_cross_type (a) VALUES
+		(2);
+	));
+
+# Cannot apply due to type mismatch
+$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? insufficient data left in message/);
+
+# Set binary =false and copy_format = text
+$node_subscriber->safe_psql('postgres', 'ALTER SUBSCRIPTION tsub SET (binary = false, copy_format = text)');
+
+$node_publisher->wait_for_catchup('tsub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT a FROM public.test_cross_type ORDER BY a");
+
+is( $result, '1
+2', 'check replication of data with type mismatch');
+
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
-- 
2.25.1

