Hmm, I messed up the patch file I sent.  Here's the complete patch.

-- 
Álvaro Herrera         PostgreSQL Developer  —  https://www.EnterpriseDB.com/
"Doing what he did amounts to sticking his fingers under the hood of the
implementation; if he gets his fingers burnt, it's his problem."  (Tom Lane)
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index bb4ef5e5e2..c86055b93c 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -30,7 +30,7 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ]  [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -110,6 +110,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
       name to explicitly indicate that descendant tables are included.
+      Optionally, a column list can be specified.  See <xref
+      linkend="sql-createpublication"/> for details.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index d805e8e77a..73a23cbb02 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -78,6 +78,15 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       publication, so they are never explicitly added to the publication.
      </para>
 
+     <para>
+      When a column list is specified, only the listed columns are replicated;
+      any other columns are ignored for the purpose of replication through
+      this publication.  If no column list is specified, all columns of the
+      table are replicated through this publication, including any columns
+      added later.  If a column list is specified, it must include the replica
+      identity columns.
+     </para>
+
      <para>
       Only persistent base tables and partitioned tables can be part of a
       publication.  Temporary tables, unlogged tables, foreign tables,
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index fe9c714257..a88d12e8ae 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -1472,7 +1472,7 @@ doDeletion(const ObjectAddress *object, int flags)
 			break;
 
 		case OCLASS_PUBLICATION_REL:
-			RemovePublicationRelById(object->objectId);
+			RemovePublicationRelById(object->objectId, object->objectSubId);
 			break;
 
 		case OCLASS_PUBLICATION:
@@ -2754,8 +2754,12 @@ free_object_addresses(ObjectAddresses *addrs)
 ObjectClass
 getObjectClass(const ObjectAddress *object)
 {
-	/* only pg_class entries can have nonzero objectSubId */
+	/*
+	 * only pg_class and pg_publication_rel entries can have nonzero
+	 * objectSubId
+	 */
 	if (object->classId != RelationRelationId &&
+		object->classId != PublicationRelRelationId &&
 		object->objectSubId != 0)
 		elog(ERROR, "invalid non-zero objectSubId for object class %u",
 			 object->classId);
diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c
index 2bae3fbb17..5eed248dcb 100644
--- a/src/backend/catalog/objectaddress.c
+++ b/src/backend/catalog/objectaddress.c
@@ -4019,6 +4019,7 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok)
 				/* translator: first %s is, e.g., "table %s" */
 				appendStringInfo(&buffer, _("publication of %s in publication %s"),
 								 rel.data, pubname);
+				/* FIXME add objectSubId support */
 				pfree(rel.data);
 				ReleaseSysCache(tup);
 				break;
@@ -5853,9 +5854,16 @@ getObjectIdentityParts(const ObjectAddress *object,
 
 				getRelationIdentity(&buffer, prform->prrelid, objname, false);
 				appendStringInfo(&buffer, " in publication %s", pubname);
+				if (object->objectSubId)	/* FIXME maybe get_attname */
+					appendStringInfo(&buffer, " column %d", object->objectSubId);
 
 				if (objargs)
+				{
 					*objargs = list_make1(pubname);
+					if (object->objectSubId)
+						*objargs = lappend(*objargs,
+										   psprintf("%d", object->objectSubId));
+				}
 
 				ReleaseSysCache(tup);
 				break;
diff --git a/src/backend/catalog/pg_depend.c b/src/backend/catalog/pg_depend.c
index 5f37bf6d10..dfcb450e61 100644
--- a/src/backend/catalog/pg_depend.c
+++ b/src/backend/catalog/pg_depend.c
@@ -658,6 +658,56 @@ isObjectPinned(const ObjectAddress *object)
  * Various special-purpose lookups and manipulations of pg_depend.
  */
 
+/*
+ * Find all objects of the given class that reference the specified object,
+ * and add them to the given ObjectAddresses.
+ */
+void
+findAndAddAddresses(ObjectAddresses *addrs, Oid classId,
+					Oid refclassId, Oid refobjectId, int32 refobjsubId)
+{
+	Relation	depRel;
+	ScanKeyData	key[3];
+	SysScanDesc scan;
+	HeapTuple	tup;
+
+	depRel = table_open(DependRelationId, AccessShareLock);
+
+	ScanKeyInit(&key[0],
+				Anum_pg_depend_refclassid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(refclassId));
+	ScanKeyInit(&key[1],
+				Anum_pg_depend_refobjid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(refobjectId));
+	ScanKeyInit(&key[2],
+				Anum_pg_depend_refobjsubid,
+				BTEqualStrategyNumber, F_INT4EQ,
+				Int32GetDatum(refobjsubId));
+
+	scan = systable_beginscan(depRel, DependReferenceIndexId, true,
+							  NULL, 3, key);
+
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_depend depform = (Form_pg_depend) GETSTRUCT(tup);
+		ObjectAddress	object;
+
+		if (depform->classid != classId)
+			continue;
+
+		ObjectAddressSubSet(object, depform->classid, depform->objid,
+							depform->refobjsubid);
+
+		add_exact_object_address(&object, addrs);
+	}
+
+	systable_endscan(scan);
+
+	table_close(depRel, AccessShareLock);
+}
+
 
 /*
  * Find the extension containing the specified object, if any
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 62f10bcbd2..ae58adc8e5 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -46,12 +46,18 @@
 #include "utils/syscache.h"
 
 /*
- * Check if relation can be in given publication and throws appropriate
- * error if not.
+ * Check if relation can be in given publication and that the column
+ * filter is sensible, and throws appropriate error if not.
+ *
+ * targetcols is the bitmapset of column specified as column filter
+ * (shifted by FirstLowInvalidHeapAttributeNumber), or NULL if no column
+ * filter was specified.
  */
 static void
-check_publication_add_relation(Relation targetrel)
+check_publication_add_relation(Relation targetrel, Bitmapset *columns)
 {
+	bool		replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+
 	/* Must be a regular or partitioned table */
 	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
 		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
@@ -82,6 +88,40 @@ check_publication_add_relation(Relation targetrel)
 				 errmsg("cannot add relation \"%s\" to publication",
 						RelationGetRelationName(targetrel)),
 				 errdetail("This operation is not supported for unlogged tables.")));
+
+	/*
+	 * Enforce that the column filter can only leave out columns that aren't
+	 * forced to be sent.
+	 *
+	 * No column can be excluded if REPLICA IDENTITY is FULL (since all the
+	 * columns need to be sent regardless); and in other cases, the columns in
+	 * the REPLICA IDENTITY cannot be left out.
+	 */
+	if (columns != NULL)
+	{
+		if (replidentfull)
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("invalid column list for publishing relation \"%s\"",
+						   RelationGetRelationName(targetrel)),
+					errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL."));
+		else
+		{
+			Bitmapset  *idattrs;
+
+			idattrs = RelationGetIndexAttrBitmap(targetrel,
+												 INDEX_ATTR_BITMAP_IDENTITY_KEY);
+			if (!bms_is_subset(idattrs, columns))
+				ereport(ERROR,
+						errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+						errmsg("invalid column list for publishing relation \"%s\"",
+							   RelationGetRelationName(targetrel)),
+						errdetail("All columns in REPLICA IDENTITY must be present in the column list."));
+
+			if (idattrs)
+				pfree(idattrs);
+		}
+	}
 }
 
 /*
@@ -289,9 +329,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	Oid			relid = RelationGetRelid(targetrel->relation);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
+	Bitmapset  *attmap = NULL;
+	AttrNumber *attarray;
+	int			natts = 0;
+	int			attnum;
 	ObjectAddress myself,
 				referenced;
 	List	   *relids = NIL;
+	ListCell   *lc;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -305,6 +350,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	{
 		table_close(rel, RowExclusiveLock);
 
+		/* FIXME need to handle the case of different column list */
+
 		if (if_not_exists)
 			return InvalidObjectAddress;
 
@@ -314,7 +361,34 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel->relation);
+	attarray = palloc(sizeof(AttrNumber) * list_length(targetrel->columns));
+	foreach(lc, targetrel->columns)
+	{
+		char	   *colname = strVal(lfirst(lc));
+		AttrNumber	attnum = get_attnum(relid, colname);
+
+		if (attnum == InvalidAttrNumber)
+			ereport(ERROR,
+					errcode(ERRCODE_UNDEFINED_COLUMN),
+					errmsg("column \"%s\" of relation \"%s\" does not exist",
+						   colname, RelationGetRelationName(targetrel->relation)));
+		if (attnum < 0)
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+					errmsg("cannot reference system column \"%s\" in publication column list",
+						   colname));
+
+		if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, attmap))
+			ereport(ERROR,
+					errcode(ERRCODE_DUPLICATE_OBJECT),
+					errmsg("column \"%s\" specified twice in publication column list",
+						   colname));
+
+		attmap = bms_add_member(attmap, attnum - FirstLowInvalidHeapAttributeNumber);
+		attarray[natts++] = attnum;
+	}
+
+	check_publication_add_relation(targetrel->relation, attmap);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -327,6 +401,15 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 		ObjectIdGetDatum(pubid);
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
+	if (targetrel->columns)
+	{
+		int2vector *prattrs;
+
+		prattrs = buildint2vector(attarray, natts);
+		values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs);
+	}
+	else
+		nulls[Anum_pg_publication_rel_prattrs - 1] = true;
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -344,6 +427,21 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	ObjectAddressSet(referenced, RelationRelationId, relid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
+	/*
+	 * If there's an explicit column list, make one dependency entry for each
+	 * column.  Note that the referencing side of the dependency is also
+	 * specific to one column, so that it can be dropped separately if the
+	 * column is dropped.
+	 */
+	while ((attnum = bms_first_member(attmap)) >= 0)
+	{
+		ObjectAddressSubSet(referenced, RelationRelationId, relid,
+							attnum + FirstLowInvalidHeapAttributeNumber);
+		myself.objectSubId = attnum + FirstLowInvalidHeapAttributeNumber;
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+	}
+	myself.objectSubId = 0;		/* need to undo this bit */
+
 	/* Close the table. */
 	table_close(rel, RowExclusiveLock);
 
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 404bb5d0c8..a070914bdd 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -561,7 +561,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 
 				pubrel = palloc(sizeof(PublicationRelInfo));
 				pubrel->relation = oldrel;
-
+				pubrel->columns = NIL;
 				delrels = lappend(delrels, pubrel);
 			}
 		}
@@ -757,10 +757,11 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
 }
 
 /*
- * Remove relation from publication by mapping OID.
+ * Remove relation from publication by mapping OID, or publication status
+ * of one column of that relation in the publication if an attnum is given.
  */
 void
-RemovePublicationRelById(Oid proid)
+RemovePublicationRelById(Oid proid, int32 attnum)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -790,7 +791,81 @@ RemovePublicationRelById(Oid proid)
 
 	InvalidatePublicationRels(relids);
 
-	CatalogTupleDelete(rel, &tup->t_self);
+	/*
+	 * If no column is given, simply delete the relation from the publication.
+	 *
+	 * If a column is given, what we do instead is to remove that column from
+	 * the column list.  The relation remains in the publication, with the
+	 * other columns.  However, dropping the last column is disallowed.
+	 */
+	if (attnum == 0)
+	{
+		CatalogTupleDelete(rel, &tup->t_self);
+	}
+	else
+	{
+		Datum		adatum;
+		ArrayType  *arr;
+		int			nelems;
+		int16	   *elems;
+		int16	   *newelems;
+		int2vector *newvec;
+		Datum		values[Natts_pg_publication_rel];
+		bool		nulls[Natts_pg_publication_rel];
+		bool		replace[Natts_pg_publication_rel];
+		HeapTuple	newtup;
+		int			i,
+					j;
+		bool		isnull;
+
+		/* Obtain the original column list */
+		adatum = SysCacheGetAttr(PUBLICATIONRELMAP,
+								 tup,
+								 Anum_pg_publication_rel_prattrs,
+								 &isnull);
+		if (isnull)			/* shouldn't happen */
+			elog(ERROR, "can't drop column from publication without a column list");
+		arr = DatumGetArrayTypeP(adatum);
+		nelems = ARR_DIMS(arr)[0];
+		elems = (int16 *) ARR_DATA_PTR(arr);
+
+		/* Construct a list excluding the given column */
+		newelems = palloc(sizeof(int16) * nelems - 1);
+		for (i = 0, j = 0; i < nelems - 1; i++)
+		{
+			if (elems[i] == attnum)
+				continue;
+			newelems[j++] = elems[i];
+		}
+
+		/*
+		 * If this is the last column used in the publication, disallow the
+		 * command. We could alternatively just drop the relation from the
+		 * publication.
+		 */
+		if (j == 0)
+		{
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("cannot drop the last column in publication \"%s\"",
+						   get_publication_name(pubrel->prpubid, false)),
+					errhint("Remove table \"%s\" from the publication first.",
+							get_rel_name(pubrel->prrelid)));
+		}
+
+		/* Build the updated tuple */
+		MemSet(values, 0, sizeof(values));
+		MemSet(nulls, false, sizeof(nulls));
+		MemSet(replace, false, sizeof(replace));
+		newvec = buildint2vector(newelems, j);
+		values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(newvec);
+		replace[Anum_pg_publication_rel_prattrs - 1] = true;
+
+		/* Execute the update */
+		newtup = heap_modify_tuple(tup, RelationGetDescr(rel),
+								   values, nulls, replace);
+		CatalogTupleUpdate(rel, &tup->t_self, newtup);
+	}
 
 	ReleaseSysCache(tup);
 
@@ -932,6 +1007,8 @@ OpenTableList(List *tables)
 
 		pub_rel = palloc(sizeof(PublicationRelInfo));
 		pub_rel->relation = rel;
+		pub_rel->columns = t->columns;
+
 		rels = lappend(rels, pub_rel);
 		relids = lappend_oid(relids, myrelid);
 
@@ -965,8 +1042,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = table_open(childrelid, NoLock);
+
 				pub_rel = palloc(sizeof(PublicationRelInfo));
 				pub_rel->relation = rel;
+				pub_rel->columns = t->columns;
+
 				rels = lappend(rels, pub_rel);
 				relids = lappend_oid(relids, childrelid);
 			}
@@ -1074,6 +1154,12 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 		Relation	rel = pubrel->relation;
 		Oid			relid = RelationGetRelid(rel);
 
+		if (pubrel->columns)
+			ereport(ERROR,
+					errcode(ERRCODE_SYNTAX_ERROR),
+					errmsg("column list may not be specified for relation \"%s\" in ALTER PUBLICATION ... SET/DROP command",
+						   RelationGetRelationName(pubrel->relation)));
+
 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
 							   ObjectIdGetDatum(relid),
 							   ObjectIdGetDatum(pubid));
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 47b29001d5..7207dcf9c0 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -40,8 +40,9 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
-#include "catalog/pg_tablespace.h"
+#include "catalog/pg_publication_rel.h"
 #include "catalog/pg_statistic_ext.h"
+#include "catalog/pg_tablespace.h"
 #include "catalog/pg_trigger.h"
 #include "catalog/pg_type.h"
 #include "catalog/storage.h"
@@ -8420,6 +8421,13 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 
 	ReleaseSysCache(tuple);
 
+	/*
+	 * If the column is part of a replication column list, arrange to get that
+	 * removed too.
+	 */
+	findAndAddAddresses(addrs, PublicationRelRelationId,
+						RelationRelationId, RelationGetRelid(rel), attnum);
+
 	/*
 	 * Propagate to children as appropriate.  Unlike most other ALTER
 	 * routines, we have to do this one level of recursion at a time; we can't
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index df0b747883..0ff4c1ceac 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4833,6 +4833,7 @@ _copyPublicationTable(const PublicationTable *from)
 	PublicationTable *newnode = makeNode(PublicationTable);
 
 	COPY_NODE_FIELD(relation);
+	COPY_NODE_FIELD(columns);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index cb7ddd463c..d786a688ac 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2312,6 +2312,7 @@ static bool
 _equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
 {
 	COMPARE_NODE_FIELD(relation);
+	COMPARE_NODE_FIELD(columns);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3d4dd43e47..4dad6fedfb 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9742,12 +9742,13 @@ CreatePublicationStmt:
  * relation_expr here.
  */
 PublicationObjSpec:
-			TABLE relation_expr
+			TABLE relation_expr opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $2;
+					$$->pubtable->columns = $3;
 				}
 			| ALL TABLES IN_P SCHEMA ColId
 				{
@@ -9762,28 +9763,38 @@ PublicationObjSpec:
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA;
 					$$->location = @5;
 				}
-			| ColId
+			| ColId opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
-					$$->name = $1;
+					if ($2 != NULL)
+					{
+						$$->pubtable = makeNode(PublicationTable);
+						$$->pubtable->relation = makeRangeVar(NULL, $1, @1);
+						$$->pubtable->columns = $2;
+						$$->name = NULL;
+					}
+					else
+						$$->name = $1;
 					$$->location = @1;
 				}
-			| ColId indirection
+			| ColId indirection opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
+					$$->pubtable->columns = $3;
 					$$->location = @1;
 				}
 			/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
-			| extended_relation_expr
+			| extended_relation_expr opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $1;
+					$$->pubtable->columns = $2;
 				}
 			| CURRENT_SCHEMA
 				{
@@ -17435,8 +17446,9 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 			{
 				/* convert it to PublicationTable */
 				PublicationTable *pubtable = makeNode(PublicationTable);
-				pubtable->relation = makeRangeVar(NULL, pubobj->name,
-												  pubobj->location);
+
+				pubtable->relation =
+					makeRangeVar(NULL, pubobj->name, pubobj->location);
 				pubobj->pubtable = pubtable;
 				pubobj->name = NULL;
 			}
@@ -17444,6 +17456,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 		else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_SCHEMA ||
 				 pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA)
 		{
+			/*
+			 * This can happen if a column list is specified in a continuation
+			 * for a schema entry; reject it.
+			 */
+			if (pubobj->pubtable)
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("column specification not allowed for schemas"),
+						parser_errposition(pubobj->location));
+
 			/*
 			 * We can distinguish between the different type of schema
 			 * objects based on whether name and pubtable is set.
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b639..15d8192238 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -29,9 +29,9 @@
 #define TRUNCATE_CASCADE		(1<<0)
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
-static void logicalrep_write_attrs(StringInfo out, Relation rel);
+static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
-								   HeapTuple tuple, bool binary);
+								   HeapTuple tuple, bool binary, Bitmapset *att_map);
 
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple newtuple, bool binary)
+						HeapTuple newtuple, bool binary, Bitmapset *att_map)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, att_map);
 }
 
 /*
@@ -442,7 +442,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  */
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+						HeapTuple oldtuple, HeapTuple newtuple, bool binary, Bitmapset *att_map)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -463,11 +463,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldtuple, binary);
+		logicalrep_write_tuple(out, rel, oldtuple, binary, att_map);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, att_map);
 }
 
 /*
@@ -536,7 +536,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldtuple, binary);
+	logicalrep_write_tuple(out, rel, oldtuple, binary, NULL);
 }
 
 /*
@@ -651,7 +651,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
  * Write relation description to the output stream.
  */
 void
-logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *att_map)
 {
 	char	   *relname;
 
@@ -673,7 +673,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
 	pq_sendbyte(out, rel->rd_rel->relreplident);
 
 	/* send the attribute info */
-	logicalrep_write_attrs(out, rel);
+	logicalrep_write_attrs(out, rel, att_map);
 }
 
 /*
@@ -749,20 +749,42 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  * Write a tuple to the outputstream, in the most efficient format possible.
  */
 static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary,
+					   Bitmapset *att_map)
 {
 	TupleDesc	desc;
 	Datum		values[MaxTupleAttributeNumber];
 	bool		isnull[MaxTupleAttributeNumber];
 	int			i;
 	uint16		nliveatts = 0;
+	Bitmapset  *idattrs = NULL;
+	bool		replidentfull;
+	Form_pg_attribute att;
 
 	desc = RelationGetDescr(rel);
 
+	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+	if (!replidentfull)
+		idattrs = RelationGetIdentityKeyBitmap(rel);
+
 	for (i = 0; i < desc->natts; i++)
 	{
+		att = TupleDescAttr(desc, i);
 		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
 			continue;
+
+		/*
+		 * Do not increment count of attributes if not a part of column
+		 * filters except for replica identity columns or if replica identity
+		 * is full.
+		 */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   att_map) &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   idattrs) &&
+			!replidentfull)
+			continue;
 		nliveatts++;
 	}
 	pq_sendint16(out, nliveatts);
@@ -800,6 +822,19 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
 			continue;
 		}
 
+		/*
+		 * Do not send attribute data if it is not a part of column filters,
+		 * except if it is a part of REPLICA IDENTITY or REPLICA IDENTITY is
+		 * full, send the data.
+		 */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   att_map) &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   idattrs) &&
+			!replidentfull)
+			continue;
+
 		typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
 		if (!HeapTupleIsValid(typtup))
 			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
@@ -904,7 +939,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
  * Write relation attribute metadata to the stream.
  */
 static void
-logicalrep_write_attrs(StringInfo out, Relation rel)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map)
 {
 	TupleDesc	desc;
 	int			i;
@@ -914,20 +949,35 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 
 	desc = RelationGetDescr(rel);
 
-	/* send number of live attributes */
-	for (i = 0; i < desc->natts; i++)
-	{
-		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
-			continue;
-		nliveatts++;
-	}
-	pq_sendint16(out, nliveatts);
-
 	/* fetch bitmap of REPLICATION IDENTITY attributes */
 	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
 	if (!replidentfull)
 		idattrs = RelationGetIdentityKeyBitmap(rel);
 
+	/* send number of live attributes */
+	for (i = 0; i < desc->natts; i++)
+	{
+		Form_pg_attribute att = TupleDescAttr(desc, i);
+
+		if (att->attisdropped || att->attgenerated)
+			continue;
+		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
+		if (replidentfull ||
+			bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						  idattrs))
+		{
+			nliveatts++;
+			continue;
+		}
+		/* Skip sending if not a part of column filter */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   att_map))
+			continue;
+		nliveatts++;
+	}
+	pq_sendint16(out, nliveatts);
+
 	/* send the attributes */
 	for (i = 0; i < desc->natts; i++)
 	{
@@ -937,6 +987,17 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
+		/*
+		 * Exclude filtered columns, but REPLICA IDENTITY columns can't be
+		 * excluded
+		 */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   att_map) &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   idattrs)
+			&& !replidentfull)
+			continue;
 		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
 		if (replidentfull ||
 			bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..15902faf56 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -111,6 +111,7 @@
 #include "replication/origin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -695,19 +696,25 @@ fetch_remote_table_info(char *nspname, char *relname,
 						LogicalRepRelation *lrel)
 {
 	WalRcvExecResult *res;
+	WalRcvExecResult *res_pub;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
+	TupleTableSlot *slot_pub;
+	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID};
 	Oid			attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			pubRow[] = {TEXTARRAYOID};
 	bool		isnull;
 	int			natt;
+	List	   *pub_columns = NIL;
+	ListCell   *lc;
+	bool		am_partition = false;
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
 
 	/* First fetch Oid and replica identity. */
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
+	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition"
 					 "  FROM pg_catalog.pg_class c"
 					 "  INNER JOIN pg_catalog.pg_namespace n"
 					 "        ON (c.relnamespace = n.oid)"
@@ -737,6 +744,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	Assert(!isnull);
 	lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
 	Assert(!isnull);
+	am_partition = DatumGetChar(slot_getattr(slot, 4, &isnull));
 
 	ExecDropSingleTupleTableSlot(slot);
 	walrcv_clear_result(res);
@@ -774,11 +782,101 @@ fetch_remote_table_info(char *nspname, char *relname,
 
 	natt = 0;
 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+
+	/*
+	 * Now, fetch the values of publications' column filters.
+	 *
+	 * For a partition, use pg_inherit to find the parent, as the
+	 * pg_publication_rel contains only the topmost parent table entry in case
+	 * the table is partitioned.  Run a recursive query to iterate through all
+	 * the parents of the partition and retreive the record for the parent
+	 * that exists in pg_publication_rel.
+	 */
+	resetStringInfo(&cmd);
+	appendStringInfoString(&cmd,
+						   "SELECT CASE WHEN prattrs IS NOT NULL THEN\n"
+						   "           ARRAY(SELECT attname\n"
+						   "                   FROM pg_catalog.generate_series(0, pg_catalog.array_upper(prattrs::int[], 1)) s,\n"
+						   "                        pg_catalog.pg_attribute\n"
+						   "                  WHERE attrelid = prrelid AND attnum = prattrs[s])\n"
+						   "           ELSE NULL END AS columns\n"
+						   "FROM pg_catalog.pg_publication_rel\n");
+	if (!am_partition)
+		appendStringInfo(&cmd, "WHERE prrelid = %u", lrel->remoteid);
+	else
+		appendStringInfo(&cmd,
+						 "WHERE prrelid IN (SELECT relid \n"
+						 "FROM pg_catalog.pg_partition_tree(pg_catalog.pg_partition_root(%u)))",
+						 lrel->remoteid);
+
+	res_pub = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+						  lengthof(pubRow), pubRow);
+
+	if (res_pub->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("could not fetch published columns info for table \"%s.%s\" from publisher: %s",
+						nspname, relname, res_pub->err)));
+	slot_pub = MakeSingleTupleTableSlot(res_pub->tupledesc, &TTSOpsMinimalTuple);
+
+	while (tuplestore_gettupleslot(res_pub->tuplestore, true, false, slot_pub))
+	{
+		Datum		adatum;
+		Datum	   *elems;
+		bool	   *nulls;
+		int			nelems;
+
+		adatum = slot_getattr(slot_pub, 1, &isnull);
+		if (isnull)			/* shouldn't happen */
+			elog(ERROR, "unexpected null value in publication column filter");
+		deconstruct_array(DatumGetArrayTypeP(adatum),
+						  TEXTOID, -1, false, TYPALIGN_INT,
+						  &elems, &nulls, &nelems);
+		for (int i = 0; i < nelems; i++)
+		{
+			if (nulls[i])	/* shouldn't happen */
+				elog(ERROR, "unexpected null value in publication column filter");
+			pub_columns = lappend(pub_columns, TextDatumGetCString(elems[i]));
+		}
+		ExecClearTuple(slot_pub);
+	}
+	ExecDropSingleTupleTableSlot(slot_pub);
+	walrcv_clear_result(res_pub);
+
+	/*
+	 * Store the column names only if they are contained in column filter
+	 * LogicalRepRelation will only contain attributes corresponding to those
+	 * specficied in column filters.
+	 */
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
-		lrel->attnames[natt] =
-			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		char	   *rel_colname;
+		bool		found = false;
+
+		rel_colname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
 		Assert(!isnull);
+		if (pub_columns != NIL)
+		{
+			foreach(lc, pub_columns)
+			{
+				char	   *pub_colname = lfirst(lc);
+
+				if (!strcmp(pub_colname, rel_colname))
+				{
+					found = true;
+					lrel->attnames[natt] = rel_colname;
+					break;
+				}
+			}
+		}
+		else
+		{
+			found = true;
+			lrel->attnames[natt] = rel_colname;
+		}
+		if (!found)
+			continue;
+
 		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
 		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
@@ -829,8 +927,17 @@ copy_table(Relation rel)
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
 	if (lrel.relkind == RELKIND_RELATION)
-		appendStringInfo(&cmd, "COPY %s TO STDOUT",
+	{
+		appendStringInfo(&cmd, "COPY %s (",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		for (int i = 0; i < lrel.natts; i++)
+		{
+			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+			if (i < lrel.natts - 1)
+				appendStringInfoString(&cmd, ", ");
+		}
+		appendStringInfo(&cmd, ") TO STDOUT");
+	}
 	else
 	{
 		/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6f6a203dea..f9f9ecd0c0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,16 +15,19 @@
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel_d.h"
 #include "commands/defrem.h"
 #include "fmgr.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
@@ -81,7 +84,8 @@ static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
-									LogicalDecodingContext *ctx);
+									LogicalDecodingContext *ctx,
+									Bitmapset *att_map);
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
@@ -130,6 +134,7 @@ typedef struct RelationSyncEntry
 	 * having identical TupleDesc.
 	 */
 	TupleConversionMap *map;
+	Bitmapset  *att_map;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -570,11 +575,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 		}
 
 		MemoryContextSwitchTo(oldctx);
-		send_relation_and_attrs(ancestor, xid, ctx);
+		send_relation_and_attrs(ancestor, xid, ctx, relentry->att_map);
 		RelationClose(ancestor);
 	}
 
-	send_relation_and_attrs(relation, xid, ctx);
+	send_relation_and_attrs(relation, xid, ctx, relentry->att_map);
 
 	if (in_streaming)
 		set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -587,7 +592,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
  */
 static void
 send_relation_and_attrs(Relation relation, TransactionId xid,
-						LogicalDecodingContext *ctx)
+						LogicalDecodingContext *ctx,
+						Bitmapset *att_map)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
 	int			i;
@@ -610,13 +616,25 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 		if (att->atttypid < FirstGenbkiObjectId)
 			continue;
 
+		/*
+		 * Do not send type information if attribute is not present in column
+		 * filter. XXX Allow sending type information for REPLICA IDENTITY
+		 * COLUMNS with user created type. even when they are not mentioned in
+		 * column filters.
+		 *
+		 * FIXME -- this code seems not verified by tests.
+		 */
+		if (att_map != NULL &&
+			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+						   att_map))
+			continue;
 		OutputPluginPrepareWrite(ctx, false);
 		logicalrep_write_typ(ctx->out, xid, att->atttypid);
 		OutputPluginWrite(ctx, false);
 	}
 
 	OutputPluginPrepareWrite(ctx, false);
-	logicalrep_write_rel(ctx->out, xid, relation);
+	logicalrep_write_rel(ctx->out, xid, relation, att_map);
 	OutputPluginWrite(ctx, false);
 }
 
@@ -693,7 +711,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_insert(ctx->out, xid, relation, tuple,
-										data->binary);
+										data->binary, relentry->att_map);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -722,7 +740,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_update(ctx->out, xid, relation, oldtuple,
-										newtuple, data->binary);
+										newtuple, data->binary, relentry->att_map);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -1122,6 +1140,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 	bool		am_partition = get_rel_relispartition(relid);
 	char		relkind = get_rel_relkind(relid);
 	bool		found;
+	Oid			ancestor_id;
 	MemoryContext oldctx;
 
 	Assert(RelationSyncCache != NULL);
@@ -1142,6 +1161,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->publish_as_relid = InvalidOid;
+		entry->att_map = NULL;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
 	}
@@ -1182,6 +1202,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		{
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
+			bool		ancestor_published = false;
 
 			if (pub->alltables)
 			{
@@ -1192,8 +1213,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 
 			if (!publish)
 			{
-				bool		ancestor_published = false;
-
 				/*
 				 * For a partition, check if any of the ancestors are
 				 * published.  If so, note down the topmost ancestor that is
@@ -1219,6 +1238,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 											pub->oid))
 						{
 							ancestor_published = true;
+							ancestor_id = ancestor;
 							if (pub->pubviaroot)
 								publish_as_relid = ancestor;
 						}
@@ -1239,15 +1259,47 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 			if (publish &&
 				(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
 			{
+				Oid			relid;
+				HeapTuple	pub_rel_tuple;
+
+				relid = ancestor_published ? ancestor_id : publish_as_relid;
+				pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP,
+												ObjectIdGetDatum(relid),
+												ObjectIdGetDatum(pub->oid));
+
+				if (HeapTupleIsValid(pub_rel_tuple))
+				{
+					Datum		pub_rel_cols;
+					bool		isnull;
+
+					pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP,
+												   pub_rel_tuple,
+												   Anum_pg_publication_rel_prattrs,
+												   &isnull);
+					if (!isnull)
+					{
+						ArrayType  *arr;
+						int			nelems;
+						int16	   *elems;
+
+						arr = DatumGetArrayTypeP(pub_rel_cols);
+						nelems = ARR_DIMS(arr)[0];
+						elems = (int16 *) ARR_DATA_PTR(arr);
+
+						/* XXX is there a danger of memory leak here? beware */
+						oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+						for (int i = 0; i < nelems; i++)
+							entry->att_map = bms_add_member(entry->att_map,
+															elems[i] - FirstLowInvalidHeapAttributeNumber);
+						MemoryContextSwitchTo(oldctx);
+					}
+					ReleaseSysCache(pub_rel_tuple);
+				}
 				entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
-
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
 		}
 
 		list_free(pubids);
@@ -1343,6 +1395,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		entry->schema_sent = false;
 		list_free(entry->streamed_txns);
 		entry->streamed_txns = NIL;
+		bms_free(entry->att_map);
+		entry->att_map = NULL;
 		if (entry->map)
 		{
 			/*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 10a86f9810..0c438481dc 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4265,6 +4265,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	int			i_oid;
 	int			i_prpubid;
 	int			i_prrelid;
+	int			i_prattrs;
 	int			i,
 				j,
 				ntups;
@@ -4276,8 +4277,13 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 
 	/* Collect all publication membership info. */
 	appendPQExpBufferStr(query,
-						 "SELECT tableoid, oid, prpubid, prrelid "
-						 "FROM pg_catalog.pg_publication_rel");
+						 "SELECT tableoid, oid, prpubid, prrelid");
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBufferStr(query, ", prattrs");
+	else
+		appendPQExpBufferStr(query, ", NULL as prattrs");
+	appendPQExpBufferStr(query,
+						 " FROM pg_catalog.pg_publication_rel");
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
 	ntups = PQntuples(res);
@@ -4286,6 +4292,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	i_oid = PQfnumber(res, "oid");
 	i_prpubid = PQfnumber(res, "prpubid");
 	i_prrelid = PQfnumber(res, "prrelid");
+	i_prattrs = PQfnumber(res, "prattrs");
 
 	/* this allocation may be more than we need */
 	pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
@@ -4327,6 +4334,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 		pubrinfo[j].publication = pubinfo;
 		pubrinfo[j].pubtable = tbinfo;
 
+		if (!PQgetisnull(res, i, i_prattrs))
+		{
+			char	  **attnames;
+			int			nattnames;
+			PQExpBuffer attribs;
+
+			if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
+							  &attnames, &nattnames))
+				fatal("could not parse %s array", "prattrs");
+			attribs = createPQExpBuffer();
+			for (int k = 0; k < nattnames; k++)
+			{
+				if (k > 0)
+					appendPQExpBufferStr(attribs, ", ");
+
+				appendPQExpBufferStr(attribs, fmtId(attnames[k]));
+			}
+			pubrinfo[i].pubrattrs = attribs->data;
+		}
+		else
+			pubrinfo[j].pubrattrs = NULL;
+
 		/* Decide whether we want to dump it */
 		selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
 
@@ -4391,10 +4420,12 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo)
 
 	query = createPQExpBuffer();
 
-	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
+	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY ",
 					  fmtId(pubinfo->dobj.name));
-	appendPQExpBuffer(query, " %s;\n",
-					  fmtQualifiedDumpable(tbinfo));
+	appendPQExpBufferStr(query, fmtQualifiedDumpable(tbinfo));
+	if (pubrinfo->pubrattrs)
+		appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
+	appendPQExpBufferStr(query, ";\n");
 
 	/*
 	 * There is no point in creating a drop query as the drop is done by table
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 6dccb4be4e..50a5b885f6 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -633,6 +633,7 @@ typedef struct _PublicationRelInfo
 	DumpableObject dobj;
 	PublicationInfo *publication;
 	TableInfo  *pubtable;
+	char	   *pubrattrs;
 } PublicationRelInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 72d8547628..46fa616406 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6302,7 +6302,7 @@ listPublications(const char *pattern)
  */
 static bool
 addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
-						   bool singlecol, printTableContent *cont)
+						   bool as_schema, printTableContent *cont)
 {
 	PGresult   *res;
 	int			count = 0;
@@ -6319,10 +6319,14 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
 
 	for (i = 0; i < count; i++)
 	{
-		if (!singlecol)
+		if (!as_schema)			/* as table */
+		{
 			printfPQExpBuffer(buf, "    \"%s.%s\"", PQgetvalue(res, i, 0),
 							  PQgetvalue(res, i, 1));
-		else
+			if (!PQgetisnull(res, i, 2))
+				appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 2));
+		}
+		else					/* as schema */
 			printfPQExpBuffer(buf, "    \"%s\"", PQgetvalue(res, i, 0));
 
 		printTableAddFooter(cont, buf->data);
@@ -6450,8 +6454,20 @@ describePublications(const char *pattern)
 		{
 			/* Get the tables for the specified publication */
 			printfPQExpBuffer(&buf,
-							  "SELECT n.nspname, c.relname\n"
-							  "FROM pg_catalog.pg_class c,\n"
+							  "SELECT n.nspname, c.relname, \n");
+			if (pset.sversion >= 150000)
+				appendPQExpBufferStr(&buf,
+									 "       CASE WHEN pr.prattrs IS NOT NULL THEN\n"
+									 "       pg_catalog.array_to_string"
+									 "(ARRAY(SELECT attname\n"
+									 "         FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::int[], 1)) s,\n"
+									 "              pg_catalog.pg_attribute\n"
+									 "        WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n"
+									 "       ELSE NULL END AS columns");
+			else
+				appendPQExpBufferStr(&buf, "NULL as columns");
+			appendPQExpBuffer(&buf,
+							  "\nFROM pg_catalog.pg_class c,\n"
 							  "     pg_catalog.pg_namespace n,\n"
 							  "     pg_catalog.pg_publication_rel pr\n"
 							  "WHERE c.relnamespace = n.oid\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 2f412ca3db..84ee807e0b 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1648,6 +1648,8 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER PUBLICATION <name> ADD */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD"))
 		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
+	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "TABLE"))
+		COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL);
 	/* ALTER PUBLICATION <name> DROP */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP"))
 		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h
index 3eca295ff4..76d421e09e 100644
--- a/src/include/catalog/dependency.h
+++ b/src/include/catalog/dependency.h
@@ -214,6 +214,9 @@ extern long changeDependenciesOf(Oid classId, Oid oldObjectId,
 extern long changeDependenciesOn(Oid refClassId, Oid oldRefObjectId,
 								 Oid newRefObjectId);
 
+extern void findAndAddAddresses(ObjectAddresses *addrs, Oid classId,
+					Oid refclassId, Oid refobjectId, int32 refobjsubId);
+
 extern Oid	getExtensionOfObject(Oid classId, Oid objectId);
 extern List *getAutoExtensionsOfObject(Oid classId, Oid objectId);
 
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 902f2f2f0d..f5ae2065e9 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -86,6 +86,7 @@ typedef struct Publication
 typedef struct PublicationRelInfo
 {
 	Relation	relation;
+	List	   *columns;
 } PublicationRelInfo;
 
 extern Publication *GetPublication(Oid pubid);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..7ad285faae 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 	Oid			oid;			/* oid */
 	Oid			prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
 	Oid			prrelid BKI_LOOKUP(pg_class);	/* Oid of the relation */
+#ifdef CATALOG_VARLEN
+	int2vector	prattrs;		/* Variable length field starts here */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index 4ba68c70ee..23f037df7f 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -25,7 +25,7 @@
 extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt);
 extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt);
 extern void RemovePublicationById(Oid pubid);
-extern void RemovePublicationRelById(Oid proid);
+extern void RemovePublicationRelById(Oid proid, int32 attnum);
 extern void RemovePublicationSchemaById(Oid psoid);
 
 extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 4c5a8a39bf..02b547d044 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3642,6 +3642,7 @@ typedef struct PublicationTable
 {
 	NodeTag		type;
 	RangeVar   *relation;		/* relation to be published */
+	List	   *columns;		/* List of columns in a publication table */
 } PublicationTable;
 
 /*
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 83741dcf42..709b4be916 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple newtuple,
-									bool binary);
+									bool binary, Bitmapset *att_map);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple oldtuple,
-									HeapTuple newtuple, bool binary);
+									HeapTuple newtuple, bool binary, Bitmapset *att_map);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
@@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in,
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
-								 Relation rel);
+								 Relation rel, Bitmapset *att_map);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 5ac2d666a2..84afe0ebef 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -165,7 +165,35 @@ Publications:
  regress_publication_user | t          | t       | t       | f       | f         | f
 (1 row)
 
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (x, y, z);  -- error
+ERROR:  column "x" of relation "testpub_tbl5" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error
+ERROR:  column "x" of relation "testpub_tbl5" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);  -- error
+ERROR:  invalid column list for publishing relation "testpub_tbl5"
+DETAIL:  All columns in REPLICA IDENTITY must be present in the column list.
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);  -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+\dRp+ testpub_fortable
+                                Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_tbl5" (a)
+Tables from schemas:
+    "pub_test"
+
+ALTER TABLE testpub_tbl5 DROP COLUMN a;
+ERROR:  cannot drop the last column in publication "testpub_fortable"
+HINT:  Remove table "testpub_tbl5" from the publication first.
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);  -- error
+ERROR:  invalid column list for publishing relation "testpub_tbl6"
+DETAIL:  Cannot have column filter on relations with REPLICA IDENTITY FULL.
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
 DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
 CREATE TABLE testpub_tbl3 (a int);
 CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3);
@@ -669,6 +697,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes
 Tables from schemas:
     "pub_test1"
 
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ERROR:  syntax error at or near "("
+LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+                                                                ^
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+ERROR:  column specification not allowed for schemas
+LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)...
+                                                             ^
 -- cleanup pub_test1 schema for invalidation tests
 ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
 DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 56dd358554..200158ba69 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -89,7 +89,20 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall
 \d+ testpub_tbl2
 \dRp+ testpub_foralltables
 
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (x, y, z);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);  -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+\dRp+ testpub_fortable
+ALTER TABLE testpub_tbl5 DROP COLUMN a;
+
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);  -- error
+
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
 DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
 
 CREATE TABLE testpub_tbl3 (a int);
@@ -362,6 +375,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem
 ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1;
 \dRp+ testpub1_forschema
 
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+
 -- cleanup pub_test1 schema for invalidation tests
 ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
 DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/subscription/t/021_column_filter.pl b/src/test/subscription/t/021_column_filter.pl
new file mode 100644
index 0000000000..354e6ac363
--- /dev/null
+++ b/src/test/subscription/t/021_column_filter.pl
@@ -0,0 +1,162 @@
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test TRUNCATE
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 10;
+
+# setup
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_logical_replication_workers = 6));
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int)");
+# Test with weird column names
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, B varchar, \"c'\" int)");
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+#Test replication with multi-level partition
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, \"c'\" int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+#Test create publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub1 FOR TABLE tab1(a, \"B\"), tab3(\"a'\",\"c'\"), test_part(a,b)");
+
+my $result = $node_publisher->safe_psql('postgres',
+	"select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;");
+is($result, qq(tab1|1 2
+tab3|1 3
+test_part|1 2), 'publication relation updated');
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+#Initial sync
+$node_publisher->wait_for_catchup('sub1');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab1 VALUES (1,2,3)");
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab3 VALUES (1,2,3)");
+#Test for replication of partition data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (1,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (2,'bcd', '2021-07-03 11:12:13')");
+#Test for replication of multi-level partition data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (4,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (5,'bcd', '2021-07-03 11:12:13')");
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'insert on column c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab3");
+is($result, qq(1|3), 'insert on column b is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM test_part");
+is($result, qq(1|abc\n2|bcd\n4|abc\n5|bcd), 'insert on all columns is replicated');
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab1 SET c = 5 where a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'update on column c is not replicated');
+
+#Test alter publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)");
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION"
+);
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab2 VALUES (1,'abc',3)");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab2");
+is($result, qq(1|abc), 'insert on column c is not replicated');
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab2 SET c = 5 where a = 1");
+is($result, qq(1|abc), 'update on column c is not replicated');
+
+# Test behavior when a column is dropped
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_part DROP COLUMN b");
+$result = $node_publisher->safe_psql('postgres',
+	"select prrelid::regclass, prattrs from pg_publication_rel pb;");
+is($result,
+	q(tab1|1 2
+tab3|1 3
+tab2|1 2
+test_part|1), 'column test_part.b removed');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (3, '2021-12-13 12:13:14')");
+$node_publisher->wait_for_catchup('sub1');
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM test_part WHERE a = 3");
+is($result, "3|", 'only column a is replicated');
+
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab4 (a int PRIMARY KEY, b int, c int, d int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab4 (a int PRIMARY KEY, b int, d int)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub2 FOR TABLE tab4 (a, b)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub3 FOR TABLE tab4 (a, d)");
+$node_subscriber->safe_psql('postgres',    "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2, pub3");
+$node_publisher->wait_for_catchup('sub2');
+$node_publisher->safe_psql('postgres', "INSERT INTO tab4 VALUES (1, 11, 111, 1111)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab4 VALUES (2, 22, 222, 2222)");
+$node_publisher->wait_for_catchup('sub2');
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab4;"),
+   qq(1|11|1111
+2|22|2222),
+   'overlapping publications with overlapping column lists');

Reply via email to