On Fri, Jan 22, 2021 at 10:01 AM vignesh C <vignes...@gmail.com> wrote:
>
> Thanks Rahila for your comments. Please find my thoughts below:
>
> On Wed, Jan 20, 2021 at 6:27 PM Rahila Syed <rahilasye...@gmail.com> wrote:
> >
> > Hi Vignesh,
> >
> >>
> >> I have handled the above scenario(drop schema should automatically
> >> remove the schema entry from publication schema relation) & addition
> >> of tests in the new v2 patch attached.
> >> Thoughts?
> >
> >
> > Please see some initial comments:
> >
> > 1. I think there should be more tests to show that the schema data is 
> > actually replicated
> > to the subscriber.  Currently, I am not seeing the data being replicated 
> > when I use FOR SCHEMA.
> >
> I will fix this issue and include more tests in my next version of the patch.

Modified to handle this and also added a few more tests.

> > 2. How does replication behave when a table is added or removed from a 
> > subscribed schema
> > using ALTER TABLE SET SCHEMA?
> >
> I would like to keep the behavior similar to the table behavior. I
> will post more details for this along with my next version of the
> patch.
>

If a table is set to a different schema, after the schema change table
data will not be sent to the subscriber.
When a new table is added to the published schema, the table data will
be sent by the publisher, subscriber will not apply the changes. If
the change needs to be reflected, subscriber's publication should be
refreshed using "alter subscription mysub1 refresh publication". This
relation will be reflected in the subscriber relation when the
subscriber's publication is refreshed.
If a table is dropped, there is no impact on subscriber, This relation
will be present in pg_subscriber_rel after refreshing subscriber
publication.

> > 3. Can we have a default schema like a public or current schema that gets 
> > replicated in case the user didn't
> > specify one, this can be handy to replicate current schema tables.
> >
> It looks like a good use case, I will check on the feasibility of this
> and try to implement this.

This can be done, I will handle this later.

> > 4. +   The fourth, fifth and sixth variants change which schemas are part 
> > of the
> > +   publication.  The <literal>SET TABLE</literal> clause will replace the 
> > list
> > +   of schemas in the publication with the specified one.  The <literal>ADD
> >
> > There is a typo above s/SET TABLE/SET SCHEMA
> I will fix this in the next version of the patch.

Modified it.
I have separated the tests and documentation into a separate patch to
make review easier. Attached v3 patch with the fixes.
Thoughts?

Regards,
Vignesh
From 72255fcb52cfba2d284b8402b64c118160b35b9f Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@enterprisedb.com>
Date: Sun, 31 Jan 2021 22:47:38 +0530
Subject: [PATCH v3 1/2] Added schema level support for publication.

This patch adds schema level support for publication.  User can specify multiple
schemas with schema option. When user specifies schema option, then the tables
present in the schema specified will be selected by publisher for sending the
data to subscriber.
---
 src/backend/catalog/Makefile                |   4 +-
 src/backend/catalog/aclchk.c                |   2 +
 src/backend/catalog/dependency.c            |   9 +
 src/backend/catalog/objectaddress.c         | 138 +++++++++++++++
 src/backend/catalog/pg_publication.c        | 134 +++++++++++++-
 src/backend/commands/alter.c                |   1 +
 src/backend/commands/event_trigger.c        |   4 +
 src/backend/commands/publicationcmds.c      | 266 +++++++++++++++++++++++++++-
 src/backend/commands/seclabel.c             |   1 +
 src/backend/commands/tablecmds.c            |   1 +
 src/backend/parser/gram.y                   |  83 ++++++---
 src/backend/replication/pgoutput/pgoutput.c |  12 ++
 src/backend/utils/cache/syscache.c          |  23 +++
 src/bin/pg_dump/common.c                    |   3 +
 src/bin/pg_dump/pg_backup_archiver.c        |   3 +-
 src/bin/pg_dump/pg_dump.c                   | 155 +++++++++++++++-
 src/bin/pg_dump/pg_dump.h                   |  17 ++
 src/bin/pg_dump/pg_dump_sort.c              |   7 +
 src/bin/psql/describe.c                     | 110 +++++++++++-
 src/include/catalog/dependency.h            |   1 +
 src/include/catalog/pg_publication.h        |  16 +-
 src/include/catalog/pg_publication_schema.h |  49 +++++
 src/include/commands/publicationcmds.h      |   1 +
 src/include/nodes/parsenodes.h              |   3 +
 src/include/utils/syscache.h                |   2 +
 src/test/regress/expected/publication.out   | 100 +++++------
 src/test/regress/expected/sanity_check.out  |   1 +
 27 files changed, 1055 insertions(+), 91 deletions(-)
 create mode 100644 src/include/catalog/pg_publication_schema.h

diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index c85f0ca..dc8a9eb 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -67,8 +67,8 @@ CATALOG_HEADERS := \
 	pg_foreign_table.h pg_policy.h pg_replication_origin.h \
 	pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \
 	pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \
-	pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \
-	pg_subscription_rel.h
+	pg_sequence.h pg_publication.h pg_publication_rel.h pg_publication_schema.h \
+	pg_subscription.h pg_subscription_rel.h
 
 GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h
 
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index f3c1ca1..8322d50 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -3417,6 +3417,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype,
 					case OBJECT_DEFACL:
 					case OBJECT_DOMCONSTRAINT:
 					case OBJECT_PUBLICATION_REL:
+					case OBJECT_PUBLICATION_SCHEMA:
 					case OBJECT_ROLE:
 					case OBJECT_RULE:
 					case OBJECT_TABCONSTRAINT:
@@ -3556,6 +3557,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype,
 					case OBJECT_DEFACL:
 					case OBJECT_DOMCONSTRAINT:
 					case OBJECT_PUBLICATION_REL:
+					case OBJECT_PUBLICATION_SCHEMA:
 					case OBJECT_ROLE:
 					case OBJECT_TRANSFORM:
 					case OBJECT_TSPARSER:
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index 2140151..94ed1f4 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -49,6 +49,7 @@
 #include "catalog/pg_proc.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
+#include "catalog/pg_publication_schema.h"
 #include "catalog/pg_rewrite.h"
 #include "catalog/pg_statistic_ext.h"
 #include "catalog/pg_subscription.h"
@@ -180,6 +181,7 @@ static const Oid object_classes[] = {
 	PolicyRelationId,			/* OCLASS_POLICY */
 	PublicationRelationId,		/* OCLASS_PUBLICATION */
 	PublicationRelRelationId,	/* OCLASS_PUBLICATION_REL */
+	PublicationSchemaRelationId, /* OCLASS_PUBLICATION_SCHEMA */
 	SubscriptionRelationId,		/* OCLASS_SUBSCRIPTION */
 	TransformRelationId			/* OCLASS_TRANSFORM */
 };
@@ -1549,6 +1551,10 @@ doDeletion(const ObjectAddress *object, int flags)
 			RemovePublicationRelById(object->objectId);
 			break;
 
+		case OCLASS_PUBLICATION_SCHEMA:
+			RemovePublicationSchemaById(object->objectId);
+			break;
+
 		case OCLASS_CAST:
 		case OCLASS_COLLATION:
 		case OCLASS_CONVERSION:
@@ -2982,6 +2988,9 @@ getObjectClass(const ObjectAddress *object)
 		case PublicationRelRelationId:
 			return OCLASS_PUBLICATION_REL;
 
+		case PublicationSchemaRelationId:
+			return OCLASS_PUBLICATION_SCHEMA;
+
 		case SubscriptionRelationId:
 			return OCLASS_SUBSCRIPTION;
 
diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c
index 6d88b69..02661c4 100644
--- a/src/backend/catalog/objectaddress.c
+++ b/src/backend/catalog/objectaddress.c
@@ -49,6 +49,7 @@
 #include "catalog/pg_proc.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
+#include "catalog/pg_publication_schema.h"
 #include "catalog/pg_rewrite.h"
 #include "catalog/pg_statistic_ext.h"
 #include "catalog/pg_subscription.h"
@@ -828,6 +829,10 @@ static const struct object_type_map
 	{
 		"publication relation", OBJECT_PUBLICATION_REL
 	},
+	/* OCLASS_PUBLICATION_SCHEMA */
+	{
+		"publication schema", OBJECT_PUBLICATION_SCHEMA
+	},
 	/* OCLASS_SUBSCRIPTION */
 	{
 		"subscription", OBJECT_SUBSCRIPTION
@@ -874,6 +879,9 @@ static ObjectAddress get_object_address_usermapping(List *object,
 static ObjectAddress get_object_address_publication_rel(List *object,
 														Relation *relp,
 														bool missing_ok);
+static ObjectAddress get_object_address_publication_schema(List *object,
+														   bool missing_ok);
+
 static ObjectAddress get_object_address_defacl(List *object,
 											   bool missing_ok);
 static const ObjectPropertyType *get_object_property_data(Oid class_id);
@@ -1117,6 +1125,10 @@ get_object_address(ObjectType objtype, Node *object,
 															 &relation,
 															 missing_ok);
 				break;
+			case OBJECT_PUBLICATION_SCHEMA:
+				address = get_object_address_publication_schema(castNode(List, object),
+																missing_ok);
+				break;
 			case OBJECT_DEFACL:
 				address = get_object_address_defacl(castNode(List, object),
 													missing_ok);
@@ -1935,6 +1947,51 @@ get_object_address_publication_rel(List *object,
 }
 
 /*
+ * Find the ObjectAddress for a publication schema.  The first element of
+ * the object parameter is the schema name, the second is the
+ * publication name.
+ */
+static ObjectAddress
+get_object_address_publication_schema(List *object, bool missing_ok)
+{
+	ObjectAddress address;
+	char	   *pubname;
+	Publication *pub;
+	char 	   *schemaname;
+	Oid schemaoid;
+
+	ObjectAddressSet(address, PublicationSchemaRelationId, InvalidOid);
+
+	/* fetch publication name and schema oid from input list */
+	schemaname = strVal(linitial(object));
+	pubname = strVal(lsecond(object));
+
+	schemaoid = get_namespace_oid(schemaname, false);
+
+	/* Now look up the pg_publication tuple */
+	pub = GetPublicationByName(pubname, missing_ok);
+	if (!pub)
+		return address;
+
+	/* Find the publication schema mapping in syscache. */
+	address.objectId =
+		GetSysCacheOid2(PUBLICATIONSCHEMAMAP, Anum_pg_publication_schema_oid,
+						ObjectIdGetDatum(schemaoid),
+						ObjectIdGetDatum(pub->oid));
+	if (!OidIsValid(address.objectId))
+	{
+		if (!missing_ok)
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+					 errmsg("publication schema \"%u\" in publication \"%s\" does not exist",
+							schemaoid, pubname)));
+		return address;
+	}
+
+	return address;
+}
+
+/*
  * Find the ObjectAddress for a default ACL.
  */
 static ObjectAddress
@@ -2206,6 +2263,7 @@ pg_get_object_address(PG_FUNCTION_ARGS)
 		case OBJECT_CAST:
 		case OBJECT_USER_MAPPING:
 		case OBJECT_PUBLICATION_REL:
+		case OBJECT_PUBLICATION_SCHEMA:
 		case OBJECT_DEFACL:
 		case OBJECT_TRANSFORM:
 			if (list_length(args) != 1)
@@ -2298,6 +2356,9 @@ pg_get_object_address(PG_FUNCTION_ARGS)
 		case OBJECT_PUBLICATION_REL:
 			objnode = (Node *) list_make2(name, linitial(args));
 			break;
+		case OBJECT_PUBLICATION_SCHEMA:
+			objnode = (Node *) list_make2(linitial(name), linitial(args));
+			break;
 		case OBJECT_USER_MAPPING:
 			objnode = (Node *) list_make2(linitial(name), linitial(args));
 			break;
@@ -3897,6 +3958,40 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok)
 				break;
 			}
 
+		case OCLASS_PUBLICATION_SCHEMA:
+			{
+				HeapTuple	tup;
+				char	   *pubname;
+				Form_pg_publication_schema prform;
+				char	   *nspname;
+
+				tup = SearchSysCache1(PUBLICATIONSCHEMA,
+									  ObjectIdGetDatum(object->objectId));
+				if (!HeapTupleIsValid(tup))
+				{
+					if (!missing_ok)
+						elog(ERROR, "cache lookup failed for publication schema %u",
+							 object->objectId);
+					break;
+				}
+
+				prform = (Form_pg_publication_schema) GETSTRUCT(tup);
+				pubname = get_publication_name(prform->prpubid, false);
+				nspname = get_namespace_name(prform->prnspcid);
+				if (!nspname)
+				{
+					if (!missing_ok)
+						elog(ERROR, "cache lookup failed for schema %u",
+							 object->objectId);
+					break;
+				}
+
+				appendStringInfo(&buffer, _("publication of schema %s in publication %s"),
+								 nspname, pubname);
+				ReleaseSysCache(tup);
+				break;
+			}
+
 		case OCLASS_SUBSCRIPTION:
 			{
 				char	   *subname = get_subscription_name(object->objectId,
@@ -4470,6 +4565,10 @@ getObjectTypeDescription(const ObjectAddress *object, bool missing_ok)
 			appendStringInfoString(&buffer, "publication relation");
 			break;
 
+		case OCLASS_PUBLICATION_SCHEMA:
+			appendStringInfoString(&buffer, "publication schema");
+			break;
+
 		case OCLASS_SUBSCRIPTION:
 			appendStringInfoString(&buffer, "subscription");
 			break;
@@ -5705,6 +5804,45 @@ getObjectIdentityParts(const ObjectAddress *object,
 				break;
 			}
 
+		case OCLASS_PUBLICATION_SCHEMA:
+			{
+				HeapTuple	tup;
+				char	   *pubname;
+				char	   *nspname;
+				Form_pg_publication_schema prform;
+
+				tup = SearchSysCache1(PUBLICATIONSCHEMA,
+									  ObjectIdGetDatum(object->objectId));
+				if (!HeapTupleIsValid(tup))
+				{
+					if (!missing_ok)
+						elog(ERROR, "cache lookup failed for publication schema %u",
+							 object->objectId);
+					break;
+				}
+
+				prform = (Form_pg_publication_schema) GETSTRUCT(tup);
+				pubname = get_publication_name(prform->prpubid, false);
+				nspname = get_namespace_name(prform->prnspcid);
+				if (!nspname)
+				{
+					if (!missing_ok)
+						elog(ERROR, "cache lookup failed for schema %u",
+							 object->objectId);
+					break;
+				}
+
+				appendStringInfo(&buffer, "%s in publication %s", nspname, pubname);
+
+				if (objargs)
+					*objargs = list_make1(pubname);
+				if (objname)
+					*objname = list_make1(nspname);
+
+				ReleaseSysCache(tup);
+				break;
+			}
+
 		case OCLASS_SUBSCRIPTION:
 			{
 				char	   *subname;
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 5f8e1c6..4d46935 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -28,8 +28,10 @@
 #include "catalog/objectaccess.h"
 #include "catalog/objectaddress.h"
 #include "catalog/pg_inherits.h"
+#include "catalog/pg_namespace.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
+#include "catalog/pg_publication_schema.h"
 #include "catalog/pg_type.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -214,6 +216,76 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	return myself;
 }
 
+/*
+ * Insert new publication / schema mapping.
+ */
+ObjectAddress
+publication_add_schema(Oid pubid, Oid schemaoid, bool if_not_exists)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	Datum		values[Natts_pg_publication_rel];
+	bool		nulls[Natts_pg_publication_rel];
+	Oid			prrelid;
+	Publication *pub = GetPublication(pubid);
+	ObjectAddress myself,
+				referenced;
+
+	rel = table_open(PublicationSchemaRelationId, RowExclusiveLock);
+
+	/*
+	 * Check for duplicates. Note that this does not really prevent
+	 * duplicates, it's here just to provide nicer error message in common
+	 * case. The real protection is the unique key on the catalog.
+	 */
+	if (SearchSysCacheExists2(PUBLICATIONSCHEMAMAP, ObjectIdGetDatum(schemaoid),
+							  ObjectIdGetDatum(pubid)))
+	{
+		table_close(rel, RowExclusiveLock);
+
+		if (if_not_exists)
+			return InvalidObjectAddress;
+
+		ereport(ERROR,
+				(errcode(ERRCODE_DUPLICATE_OBJECT),
+				 errmsg("schema \"%s\" is already member of publication \"%s\"",
+						get_namespace_name(schemaoid), pub->name)));
+	}
+
+	/* Form a tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+
+	prrelid = GetNewOidWithIndex(rel, PublicationSchemaObjectIndexId,
+								 Anum_pg_publication_schema_oid);
+	values[Anum_pg_publication_schema_oid - 1] = ObjectIdGetDatum(prrelid);
+	values[Anum_pg_publication_schema_prpubid - 1] =
+		ObjectIdGetDatum(pubid);
+	values[Anum_pg_publication_schema_prnspcid - 1] =
+		ObjectIdGetDatum(schemaoid);
+
+	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+
+	/* Insert tuple into catalog. */
+	CatalogTupleInsert(rel, tup);
+	heap_freetuple(tup);
+
+	ObjectAddressSet(myself, PublicationSchemaRelationId, prrelid);
+
+	/* Add dependency on the publication */
+	ObjectAddressSet(referenced, PublicationRelationId, pubid);
+	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+
+	/* Add dependency on the schema */
+	ObjectAddressSet(referenced, NamespaceRelationId, schemaoid);
+	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+
+	/* Close the table. */
+	table_close(rel, RowExclusiveLock);
+
+	return myself;
+}
+
 /* Gets list of publication oids for a relation */
 List *
 GetRelationPublications(Oid relid)
@@ -305,6 +377,47 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 }
 
 /*
+ * Gets list of schema oids for a publication.
+ *
+ * This should only be used for normal publications.
+ */
+List *
+GetPublicationSchemas(Oid pubid)
+{
+	List	   *result;
+	Relation	pubrelsrel;
+	ScanKeyData scankey;
+	SysScanDesc scan;
+	HeapTuple	tup;
+
+	/* Find all publications associated with the schema. */
+	pubrelsrel = table_open(PublicationSchemaRelationId, AccessShareLock);
+
+	ScanKeyInit(&scankey,
+				Anum_pg_publication_schema_prpubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(pubid));
+
+	scan = systable_beginscan(pubrelsrel, PublicationSchemaPrnspcidPrpubidIndexId,
+							  true, NULL, 1, &scankey);
+
+	result = NIL;
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_publication_schema pubrel;
+
+		pubrel = (Form_pg_publication_schema) GETSTRUCT(tup);
+
+		result = lappend_oid(result, pubrel->prnspcid);
+	}
+
+	systable_endscan(scan);
+	table_close(pubrelsrel, AccessShareLock);
+
+	return result;
+}
+
+/*
  * Gets list of publication oids for publications marked as FOR ALL TABLES.
  */
 List *
@@ -349,13 +462,16 @@ GetAllTablesPublications(void)
  * root partitioned tables.
  */
 List *
-GetAllTablesPublicationRelations(bool pubviaroot)
+GetAllTablesPublicationRelations(Publication *publication)
 {
 	Relation	classRel;
 	ScanKeyData key[1];
 	TableScanDesc scan;
 	HeapTuple	tuple;
 	List	   *result = NIL;
+	bool pubviaroot = publication->pubviaroot;
+	List	   *pubschemas = GetPublicationSchemas(publication->oid);
+
 
 	classRel = table_open(RelationRelationId, AccessShareLock);
 
@@ -371,6 +487,16 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 		Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
 		Oid			relid = relForm->oid;
 
+		/*
+		 * If schema is specified by the user, check if the relation is present
+		 * in one of the schema specified.
+		 */
+		if (pubschemas)
+		{
+			if (!list_member_oid(pubschemas, relForm->relnamespace))
+				continue;
+		}
+
 		if (is_publishable_class(relid, relForm) &&
 			!(relForm->relispartition && pubviaroot))
 			result = lappend_oid(result, relid);
@@ -431,6 +557,8 @@ GetPublication(Oid pubid)
 	pub->pubactions.pubdelete = pubform->pubdelete;
 	pub->pubactions.pubtruncate = pubform->pubtruncate;
 	pub->pubviaroot = pubform->pubviaroot;
+	pub->pubtables = pubform->pubtables;
+	pub->pubschemas = pubform->pubschemas;
 
 	ReleaseSysCache(tup);
 
@@ -530,8 +658,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		 * replicated using leaf partition identity and schema, so we only
 		 * need those.
 		 */
-		if (publication->alltables)
-			tables = GetAllTablesPublicationRelations(publication->pubviaroot);
+		if (publication->alltables || publication->pubschemas)
+			tables = GetAllTablesPublicationRelations(publication);
 		else
 			tables = GetPublicationRelations(publication->oid,
 											 publication->pubviaroot ?
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 2924949..e7c2745 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -661,6 +661,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid,
 		case OCLASS_POLICY:
 		case OCLASS_PUBLICATION:
 		case OCLASS_PUBLICATION_REL:
+		case OCLASS_PUBLICATION_SCHEMA:
 		case OCLASS_SUBSCRIPTION:
 		case OCLASS_TRANSFORM:
 			/* ignore object types that don't have schema-qualified names */
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index 5bde507..4673e8e 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -974,6 +974,7 @@ EventTriggerSupportsObjectType(ObjectType obtype)
 		case OBJECT_PROCEDURE:
 		case OBJECT_PUBLICATION:
 		case OBJECT_PUBLICATION_REL:
+		case OBJECT_PUBLICATION_SCHEMA:
 		case OBJECT_ROUTINE:
 		case OBJECT_RULE:
 		case OBJECT_SCHEMA:
@@ -1051,6 +1052,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass)
 		case OCLASS_POLICY:
 		case OCLASS_PUBLICATION:
 		case OCLASS_PUBLICATION_REL:
+		case OCLASS_PUBLICATION_SCHEMA:
 		case OCLASS_SUBSCRIPTION:
 		case OCLASS_TRANSFORM:
 			return true;
@@ -2120,6 +2122,7 @@ stringify_grant_objtype(ObjectType objtype)
 		case OBJECT_POLICY:
 		case OBJECT_PUBLICATION:
 		case OBJECT_PUBLICATION_REL:
+		case OBJECT_PUBLICATION_SCHEMA:
 		case OBJECT_ROLE:
 		case OBJECT_RULE:
 		case OBJECT_STATISTIC_EXT:
@@ -2202,6 +2205,7 @@ stringify_adefprivs_objtype(ObjectType objtype)
 		case OBJECT_POLICY:
 		case OBJECT_PUBLICATION:
 		case OBJECT_PUBLICATION_REL:
+		case OBJECT_PUBLICATION_SCHEMA:
 		case OBJECT_ROLE:
 		case OBJECT_RULE:
 		case OBJECT_STATISTIC_EXT:
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 95c253c..6bb6a74 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -25,7 +25,9 @@
 #include "catalog/objectaddress.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
+#include "catalog/pg_namespace.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_schema.h"
 #include "catalog/pg_publication_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/dbcommands.h"
@@ -53,6 +55,9 @@ static void CloseTableList(List *rels);
 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 								 AlterPublicationStmt *stmt);
 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
+static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
+								  AlterPublicationStmt *stmt);
+static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
 
 static void
 parse_publication_options(List *options,
@@ -212,6 +217,10 @@ CreatePublication(CreatePublicationStmt *stmt)
 		BoolGetDatum(pubactions.pubtruncate);
 	values[Anum_pg_publication_pubviaroot - 1] =
 		BoolGetDatum(publish_via_partition_root);
+	values[Anum_pg_publication_pubtables -1] =
+		stmt->tables ? BoolGetDatum(true) : BoolGetDatum(false);
+	values[Anum_pg_publication_pubschemas - 1] =
+		stmt->schemas ? BoolGetDatum(true) : BoolGetDatum(false);
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -226,6 +235,23 @@ CreatePublication(CreatePublicationStmt *stmt)
 	/* Make the changes visible. */
 	CommandCounterIncrement();
 
+	if (stmt->schemas)
+	{
+		ListCell   *cell;
+		List	   *schemaoidlist = NIL;
+		Relation	nspcrel;
+
+		nspcrel = table_open(NamespaceRelationId, ShareUpdateExclusiveLock);
+		foreach(cell, stmt->schemas)
+		{
+			char   *schema = strVal(lfirst(cell));
+			schemaoidlist = lappend_oid(schemaoidlist, get_namespace_oid(schema, false));
+		}
+
+		PublicationAddSchemas(puboid, schemaoidlist, true, NULL);
+		table_close(nspcrel, ShareUpdateExclusiveLock);
+	}
+
 	if (stmt->tables)
 	{
 		List	   *rels;
@@ -252,6 +278,30 @@ CreatePublication(CreatePublicationStmt *stmt)
 	return myself;
 }
 
+static void
+UpdatePublicationTupleValue(Relation rel, HeapTuple tup, int col, bool value)
+{
+	bool		nulls[Natts_pg_publication];
+	bool		replaces[Natts_pg_publication];
+	Datum		values[Natts_pg_publication];
+
+	/* Everything ok, form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	values[col - 1] = BoolGetDatum(value);
+	replaces[col - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	CommandCounterIncrement();
+}
+
 /*
  * Change options of a publication.
  */
@@ -370,14 +420,34 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 						NameStr(pubform->pubname)),
 				 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
 
+	if (pubform->pubschemas)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("publication \"%s\" is defined as FOR SCHEMA",
+						NameStr(pubform->pubname)),
+				 errdetail("Tables cannot be added to or dropped from FOR SCHEMA publications.")));
+
 	Assert(list_length(stmt->tables) > 0);
 
 	rels = OpenTableList(stmt->tables);
 
 	if (stmt->tableAction == DEFELEM_ADD)
+	{
 		PublicationAddTables(pubid, rels, false, stmt);
+		if (!pubform->pubtables)
+			UpdatePublicationTupleValue(rel, tup, Anum_pg_publication_pubtables,
+										true);
+	}
 	else if (stmt->tableAction == DEFELEM_DROP)
+	{
+		List	   *tables;
 		PublicationDropTables(pubid, rels, false);
+		tables = GetPublicationRelations(pubid, PUBLICATION_PART_ROOT);
+		if (!list_length(tables))
+			UpdatePublicationTupleValue(rel, tup, Anum_pg_publication_pubtables,
+										false);
+
+	}
 	else						/* DEFELEM_SET */
 	{
 		List	   *oldrelids = GetPublicationRelations(pubid,
@@ -422,16 +492,116 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 		PublicationAddTables(pubid, rels, true, stmt);
 
 		CloseTableList(delrels);
+
+		/* Update pubtables col to true */
+		if (!pubform->pubtables)
+			UpdatePublicationTupleValue(rel, tup, Anum_pg_publication_pubtables,
+										true);
 	}
 
 	CloseTableList(rels);
 }
 
 /*
+ * Alter the publication schemas.
+ *
+ * Add/Remove/Set the schemas to/from publication.
+ */
+static void
+AlterPublicationSchemas(AlterPublicationStmt *stmt, Relation rel,
+						HeapTuple tup, Form_pg_publication pubform)
+{
+	List	   *schemaoidlist = NIL;
+	ListCell   *cell;
+
+	/* Check that user is allowed to manipulate the publication tables. */
+	if (pubform->puballtables)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
+						NameStr(pubform->pubname)),
+				 errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
+
+	if (pubform->pubtables)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("publication \"%s\" is defined as FOR TABLE",
+						NameStr(pubform->pubname)),
+				 errdetail("Schemas cannot be added to or dropped from FOR TABLE publications.")));
+
+	/* Convert the text list into oid list. */
+	foreach(cell, stmt->schemas)
+	{
+		char   *schema = strVal(lfirst(cell));
+		schemaoidlist = lappend_oid(schemaoidlist, get_namespace_oid(schema, false));
+	}
+
+	if (stmt->tableAction == DEFELEM_ADD)
+	{
+		PublicationAddSchemas(pubform->oid, schemaoidlist, false, stmt);
+		if (!pubform->pubschemas)
+			UpdatePublicationTupleValue(rel, tup, Anum_pg_publication_pubschemas,
+										true);
+	}
+	else if (stmt->tableAction == DEFELEM_DROP)
+	{
+		List *schemas;
+		PublicationDropSchemas(pubform->oid, schemaoidlist, false);
+		schemas = GetPublicationSchemas(pubform->oid);
+		if (!list_length(schemas))
+			UpdatePublicationTupleValue(rel, tup, Anum_pg_publication_pubschemas,
+										false);
+	}
+	else
+	{
+		List	   *oldschemaids = GetPublicationSchemas(pubform->oid);
+		List	   *delschemas = NIL;
+		ListCell   *oldlc;
+
+		/* Identify which schemas should be dropped. */
+		foreach(oldlc, oldschemaids)
+		{
+			Oid			oldrelid = lfirst_oid(oldlc);
+			ListCell   *newlc;
+			bool		found = false;
+
+			foreach(newlc, schemaoidlist)
+			{
+				Oid	newschemaid = lfirst_oid(newlc);
+
+				if (newschemaid == oldrelid)
+				{
+					found = true;
+					break;
+				}
+			}
+
+			if (!found)
+				delschemas = lappend_oid(delschemas, oldrelid);
+		}
+
+		/* And drop them. */
+		PublicationDropSchemas(pubform->oid, delschemas, true);
+
+		/*
+		 * Don't bother calculating the difference for adding, we'll catch and
+		 * skip existing ones when doing catalog update.
+		 */
+		PublicationAddSchemas(pubform->oid, schemaoidlist, true, stmt);
+
+		if (!pubform->pubschemas)
+			UpdatePublicationTupleValue(rel, tup, Anum_pg_publication_pubschemas,
+										true);
+	}
+
+	return;
+}
+
+/*
  * Alter the existing publication.
  *
- * This is dispatcher function for AlterPublicationOptions and
- * AlterPublicationTables.
+ * This is dispatcher function for AlterPublicationOptions,
+ * AlterPublicationSchemas and AlterPublicationTables.
  */
 void
 AlterPublication(AlterPublicationStmt *stmt)
@@ -460,6 +630,8 @@ AlterPublication(AlterPublicationStmt *stmt)
 
 	if (stmt->options)
 		AlterPublicationOptions(stmt, rel, tup);
+	else if (stmt->schemas)
+		AlterPublicationSchemas(stmt, rel, tup, pubform);
 	else
 		AlterPublicationTables(stmt, rel, tup);
 
@@ -499,6 +671,30 @@ RemovePublicationRelById(Oid proid)
 }
 
 /*
+ * Remove schema from publication by mapping OID.
+ */
+void
+RemovePublicationSchemaById(Oid proid)
+{
+	Relation	rel;
+	HeapTuple	tup;
+
+	rel = table_open(PublicationSchemaRelationId, RowExclusiveLock);
+
+	tup = SearchSysCache1(PUBLICATIONSCHEMA, ObjectIdGetDatum(proid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "cache lookup failed for publication schema %u",
+			 proid);
+
+	CatalogTupleDelete(rel, &tup->t_self);
+
+	ReleaseSysCache(tup);
+
+	table_close(rel, RowExclusiveLock);
+}
+
+/*
  * Open relations specified by a RangeVar list.
  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
  * add them to a publication.
@@ -633,6 +829,39 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 }
 
 /*
+ * Add listed tables to the publication.
+ */
+static void
+PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
+					 AlterPublicationStmt *stmt)
+{
+	ListCell   *lc;
+
+	Assert(!stmt || !stmt->for_all_tables);
+
+	foreach(lc, schemas)
+	{
+		Oid			schemaoid = lfirst_oid(lc);
+		ObjectAddress obj;
+
+		/* Must be owner of the schema or superuser. */
+		if (!pg_namespace_ownercheck(schemaoid, GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
+						   get_namespace_name(schemaoid));
+
+		obj = publication_add_schema(pubid, schemaoid, if_not_exists);
+		if (stmt)
+		{
+			EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+											 (Node *) stmt);
+
+			InvokeObjectPostCreateHook(PublicationSchemaRelationId,
+									   obj.objectId, 0);
+		}
+	}
+}
+
+/*
  * Remove listed tables from the publication.
  */
 static void
@@ -667,6 +896,39 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 }
 
 /*
+ * Remove listed schemas from the publication.
+ */
+static void
+PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
+{
+	ObjectAddress obj;
+	ListCell   *lc;
+	Oid			prid;
+
+	foreach(lc, schemas)
+	{
+		Oid			schemaoid = lfirst_oid(lc);
+
+		prid = GetSysCacheOid2(PUBLICATIONSCHEMAMAP, Anum_pg_publication_schema_oid,
+							   ObjectIdGetDatum(schemaoid),
+							   ObjectIdGetDatum(pubid));
+		if (!OidIsValid(prid))
+		{
+			if (missing_ok)
+				continue;
+
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+					 errmsg("schema \"%s\" is not part of the publication",
+							get_namespace_name(schemaoid))));
+		}
+
+		ObjectAddressSet(obj, PublicationSchemaRelationId, prid);
+		performDeletion(&obj, DROP_CASCADE, 0);
+	}
+}
+
+/*
  * Internal workhorse for changing a publication owner
  */
 static void
diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c
index 6906714..b108b64 100644
--- a/src/backend/commands/seclabel.c
+++ b/src/backend/commands/seclabel.c
@@ -80,6 +80,7 @@ SecLabelSupportsObjectType(ObjectType objtype)
 		case OBJECT_OPFAMILY:
 		case OBJECT_POLICY:
 		case OBJECT_PUBLICATION_REL:
+		case OBJECT_PUBLICATION_SCHEMA:
 		case OBJECT_RULE:
 		case OBJECT_STATISTIC_EXT:
 		case OBJECT_TABCONSTRAINT:
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 36747e7..f4593af 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -11709,6 +11709,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
 			case OCLASS_EVENT_TRIGGER:
 			case OCLASS_PUBLICATION:
 			case OCLASS_PUBLICATION_REL:
+			case OCLASS_PUBLICATION_SCHEMA:
 			case OCLASS_SUBSCRIPTION:
 			case OCLASS_TRANSFORM:
 
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7574d54..88081fd 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -421,7 +421,6 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <list>	group_by_list
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
-%type <node>	opt_publication_for_tables publication_for_tables
 
 %type <list>	opt_fdw_options fdw_options
 %type <defelt>	fdw_option
@@ -9466,46 +9465,49 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec
 
 /*****************************************************************************
  *
- * CREATE PUBLICATION name [ FOR TABLE ] [ WITH options ]
+ * CREATE PUBLICATION name [WITH options]
  *
+ * CREATE PUBLICATION FOR ALL TABLES [WITH options]
+ *
+ * CREATE PUBLICATION FOR TABLE [WITH options]
+ *
+ * CREATE PUBLICATION FOR SCHEMA [WITH options]
  *****************************************************************************/
 
 CreatePublicationStmt:
-			CREATE PUBLICATION name opt_publication_for_tables opt_definition
+			CREATE PUBLICATION name opt_definition
 				{
 					CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
 					n->pubname = $3;
-					n->options = $5;
-					if ($4 != NULL)
-					{
-						/* FOR TABLE */
-						if (IsA($4, List))
-							n->tables = (List *)$4;
-						/* FOR ALL TABLES */
-						else
-							n->for_all_tables = true;
-					}
+					n->options = $4;
 					$$ = (Node *)n;
 				}
-		;
-
-opt_publication_for_tables:
-			publication_for_tables					{ $$ = $1; }
-			| /* EMPTY */							{ $$ = NULL; }
-		;
-
-publication_for_tables:
-			FOR TABLE relation_expr_list
+			| CREATE PUBLICATION name FOR ALL TABLES opt_definition
 				{
-					$$ = (Node *) $3;
+					CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
+					n->pubname = $3;
+					n->options = $7;
+					n->for_all_tables = true;
+					$$ = (Node *)n;
 				}
-			| FOR ALL TABLES
+			| CREATE PUBLICATION name FOR TABLE relation_expr_list opt_definition
 				{
-					$$ = (Node *) makeInteger(true);
+					CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
+					n->pubname = $3;
+					n->options = $7;
+					n->tables = (List *)$6;
+					$$ = (Node *)n;
+				}
+			| CREATE PUBLICATION name FOR SCHEMA name_list opt_definition
+				{
+					CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
+					n->pubname = $3;
+					n->options = $7;
+					n->schemas = (List *)$6;
+					$$ = (Node *)n;
 				}
 		;
 
-
 /*****************************************************************************
  *
  * ALTER PUBLICATION name SET ( options )
@@ -9516,6 +9518,11 @@ publication_for_tables:
  *
  * ALTER PUBLICATION name SET TABLE table [, table2]
  *
+ * ALTER PUBLICATION name ADD SCHEMA schema [, schema2]
+ *
+ * ALTER PUBLICATION name DROP SCHEMA schema [, schema2]
+ *
+ * ALTER PUBLICATION name SET SCHEMA schema [, schema2]
  *****************************************************************************/
 
 AlterPublicationStmt:
@@ -9550,6 +9557,30 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_DROP;
 					$$ = (Node *)n;
 				}
+			| ALTER PUBLICATION name ADD_P SCHEMA name_list
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					n->pubname = $3;
+					n->schemas = $6;
+					n->tableAction = DEFELEM_ADD;
+					$$ = (Node *)n;
+				}
+			| ALTER PUBLICATION name SET SCHEMA name_list
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					n->pubname = $3;
+					n->schemas = $6;
+					n->tableAction = DEFELEM_SET;
+					$$ = (Node *)n;
+				}
+			| ALTER PUBLICATION name DROP SCHEMA name_list
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					n->pubname = $3;
+					n->schemas = $6;
+					n->tableAction = DEFELEM_DROP;
+					$$ = (Node *)n;
+				}
 		;
 
 /*****************************************************************************
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 79765f9..458a04b 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1005,6 +1005,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 					publish_as_relid = llast_oid(get_partition_ancestors(relid));
 			}
 
+			if (pub->pubschemas)
+			{
+				Oid schemaId = get_rel_namespace(relid);
+				List *pubschemas = GetPublicationSchemas(pub->oid);
+				if (list_member_oid(pubschemas, schemaId))
+				{
+					publish = true;
+					if (pub->pubviaroot && am_partition)
+						publish_as_relid = llast_oid(get_partition_ancestors(relid));
+				}
+			}
+
 			if (!publish)
 			{
 				bool		ancestor_published = false;
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index e4dc4ee..953ab6f 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -50,6 +50,7 @@
 #include "catalog/pg_partitioned_table.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_schema.h"
 #include "catalog/pg_publication_rel.h"
 #include "catalog/pg_range.h"
 #include "catalog/pg_replication_origin.h"
@@ -650,6 +651,28 @@ static const struct cachedesc cacheinfo[] = {
 		},
 		64
 	},
+	{PublicationSchemaRelationId,	/* PUBLICATIONSCHEMA */
+		PublicationSchemaObjectIndexId,
+		1,
+		{
+			Anum_pg_publication_schema_oid,
+			0,
+			0,
+			0
+		},
+		64
+	},
+	{PublicationSchemaRelationId,	/* PUBLICATIONSCHEMAMAP */
+		PublicationSchemaPrnspcidPrpubidIndexId,
+		2,
+		{
+			Anum_pg_publication_schema_prnspcid,
+			Anum_pg_publication_schema_prpubid,
+			0,
+			0
+		},
+		64
+	},
 	{RangeRelationId,			/* RANGEMULTIRANGE */
 		RangeMultirangeTypidIndexId,
 		1,
diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c
index b0f02bc..17b132f 100644
--- a/src/bin/pg_dump/common.c
+++ b/src/bin/pg_dump/common.c
@@ -257,6 +257,9 @@ getSchemaData(Archive *fout, int *numTablesPtr)
 	pg_log_info("reading publication membership");
 	getPublicationTables(fout, tblinfo, numTables);
 
+	pg_log_info("reading publciation schemas");
+	getPublicationSchemas(fout, nspinfo, numNamespaces);
+
 	pg_log_info("reading subscriptions");
 	getSubscriptions(fout);
 
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 1f82c64..e21acb8 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -2847,7 +2847,8 @@ _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
 	 */
 	if (ropt->no_publications &&
 		(strcmp(te->desc, "PUBLICATION") == 0 ||
-		 strcmp(te->desc, "PUBLICATION TABLE") == 0))
+		 strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
+		 strcmp(te->desc, "PUBLICATION SCHEMA") == 0))
 		return 0;
 
 	/* If it's a security label, maybe ignore it */
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 798d145..3924e92 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3882,6 +3882,8 @@ getPublications(Archive *fout, int *numPublications)
 	int			i_pubdelete;
 	int			i_pubtruncate;
 	int			i_pubviaroot;
+	int			i_pubtables;
+	int			i_pubschemas;
 	int			i,
 				ntups;
 
@@ -3896,7 +3898,14 @@ getPublications(Archive *fout, int *numPublications)
 	resetPQExpBuffer(query);
 
 	/* Get the publications. */
-	if (fout->remoteVersion >= 130000)
+	if (fout->remoteVersion >= 140000)
+		appendPQExpBuffer(query,
+						  "SELECT p.tableoid, p.oid, p.pubname, "
+						  "(%s p.pubowner) AS rolname, "
+						  "p.puballtables, p.pubschemas, p.pubtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
+						  "FROM pg_publication p",
+						  username_subquery);
+	else if (fout->remoteVersion >= 130000)
 		appendPQExpBuffer(query,
 						  "SELECT p.tableoid, p.oid, p.pubname, "
 						  "(%s p.pubowner) AS rolname, "
@@ -3932,6 +3941,8 @@ getPublications(Archive *fout, int *numPublications)
 	i_pubdelete = PQfnumber(res, "pubdelete");
 	i_pubtruncate = PQfnumber(res, "pubtruncate");
 	i_pubviaroot = PQfnumber(res, "pubviaroot");
+	i_pubtables = PQfnumber(res, "pubtables");
+	i_pubschemas = PQfnumber(res, "pubschemas");
 
 	pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
 
@@ -3956,6 +3967,10 @@ getPublications(Archive *fout, int *numPublications)
 			(strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
 		pubinfo[i].pubviaroot =
 			(strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
+		pubinfo[i].pubtables =
+			(strcmp(PQgetvalue(res, i, i_pubtables), "t") == 0);
+		pubinfo[i].pubschemas =
+			(strcmp(PQgetvalue(res, i, i_pubschemas), "t") == 0);
 
 		if (strlen(pubinfo[i].rolname) == 0)
 			pg_log_warning("owner of publication \"%s\" appears to be invalid",
@@ -4066,6 +4081,102 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo)
 }
 
 /*
+ * getPublicationSchemas
+ *	  get information about publication membership for dumpable schemas.
+ */
+void
+getPublicationSchemas(Archive *fout, NamespaceInfo nspinfo[], int numSchemas)
+{
+	PQExpBuffer query;
+	PGresult   *res;
+	PublicationSchemaInfo *pubrinfo;
+	DumpOptions *dopt = fout->dopt;
+	int			i_schemaoid;
+	int			i_oid;
+	int			i_pubname;
+	int			i_pubid;
+	int			i,
+				j,
+				ntups;
+
+	if (dopt->no_publications || fout->remoteVersion < 140000)
+		return;
+
+	query = createPQExpBuffer();
+
+	for (i = 0; i < numSchemas; i++)
+	{
+		NamespaceInfo  *nsinfo = &nspinfo[i];
+		PublicationInfo *pubinfo;
+
+		/*
+		 * Ignore publication membership of schemas whose definitions are not
+		 * to be dumped.
+		 */
+		if (!(nsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
+			continue;
+
+		pg_log_info("reading publication membership for schema \"%s\"",
+					nsinfo->dobj.name);
+
+		resetPQExpBuffer(query);
+
+		/* Get the publication membership for the table. */
+		appendPQExpBuffer(query,
+						  "SELECT pr.prnspcid, pr.oid, p.pubname, p.oid AS pubid "
+						  "FROM pg_publication_schema pr, pg_publication p "
+						  "WHERE pr.prnspcid = '%u'"
+						  "  AND p.oid = pr.prpubid",
+						  nsinfo->dobj.catId.oid);
+		res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+		ntups = PQntuples(res);
+
+		if (ntups == 0)
+		{
+			/*
+			 * Schema is not member of any publications. Clean up and return.
+			 */
+			PQclear(res);
+			continue;
+		}
+
+		i_schemaoid = PQfnumber(res, "prnspcid");
+		i_oid = PQfnumber(res, "oid");
+		i_pubname = PQfnumber(res, "pubname");
+		i_pubid = PQfnumber(res, "pubid");
+
+
+		pubrinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo));
+
+		for (j = 0; j < ntups; j++)
+		{
+			Oid			prpubid = atooid(PQgetvalue(res, j, i_pubid));
+
+			pubinfo = findPublicationByOid(prpubid);
+			if (pubinfo == NULL)
+				continue;
+
+			pubrinfo[j].dobj.objType = DO_PUBLICATION_SCHEMA;
+			pubrinfo[j].dobj.catId.tableoid =
+				atooid(PQgetvalue(res, j, i_schemaoid));
+			pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
+			AssignDumpId(&pubrinfo[j].dobj);
+			pubrinfo[j].dobj.namespace = nsinfo->dobj.namespace;
+			pubrinfo[j].dobj.name = nsinfo->dobj.name;
+			pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname));
+			pubrinfo[j].pubschema = nsinfo;
+			pubrinfo[j].publication = pubinfo;
+
+			/* Decide whether we want to dump it */
+			selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout);
+		}
+		PQclear(res);
+	}
+	destroyPQExpBuffer(query);
+}
+
+/*
  * getPublicationTables
  *	  get information about publication membership for dumpable tables.
  */
@@ -4153,6 +4264,44 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 }
 
 /*
+ * dumpPublicationSchema
+ *	  dump the definition of the given publication schema mapping
+ */
+static void
+dumpPublicationSchema(Archive *fout, PublicationSchemaInfo *pubrinfo)
+{
+	NamespaceInfo  *schemainfo = pubrinfo->pubschema;
+	PublicationInfo *pubinfo = pubrinfo->publication;
+	PQExpBuffer query;
+	char	   *tag;
+
+	if (!(pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
+		return;
+
+	tag = psprintf("%s %s", pubrinfo->pubname, schemainfo->dobj.name);
+
+	query = createPQExpBuffer();
+
+	appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubrinfo->pubname));
+	appendPQExpBuffer(query, "ADD SCHEMA %s;\n", fmtId(schemainfo->dobj.name));
+
+	/*
+	 * There is no point in creating drop query as the drop is done by schema
+	 * drop.
+	 */
+	ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
+				 ARCHIVE_OPTS(.tag = tag,
+							  .namespace = schemainfo->dobj.name,
+							  .owner = pubinfo->rolname,
+							  .description = "PUBLICATION SCHEMA",
+							  .section = SECTION_POST_DATA,
+							  .createStmt = query->data));
+
+	free(tag);
+	destroyPQExpBuffer(query);
+}
+
+/*
  * dumpPublicationTable
  *	  dump the definition of the given publication table mapping
  */
@@ -10293,6 +10442,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
 		case DO_PUBLICATION_REL:
 			dumpPublicationTable(fout, (PublicationRelInfo *) dobj);
 			break;
+		case DO_PUBLICATION_SCHEMA:
+			dumpPublicationSchema(fout, (PublicationSchemaInfo *) dobj);
+			break;
 		case DO_SUBSCRIPTION:
 			dumpSubscription(fout, (SubscriptionInfo *) dobj);
 			break;
@@ -18429,6 +18581,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
 			case DO_POLICY:
 			case DO_PUBLICATION:
 			case DO_PUBLICATION_REL:
+			case DO_PUBLICATION_SCHEMA:
 			case DO_SUBSCRIPTION:
 				/* Post-data objects: must come after the post-data boundary */
 				addObjectDependency(dobj, postDataBound->dumpId);
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 1290f96..f3c05ad 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -81,6 +81,7 @@ typedef enum
 	DO_POLICY,
 	DO_PUBLICATION,
 	DO_PUBLICATION_REL,
+	DO_PUBLICATION_SCHEMA,
 	DO_SUBSCRIPTION
 } DumpableObjectType;
 
@@ -614,6 +615,8 @@ typedef struct _PublicationInfo
 	bool		pubdelete;
 	bool		pubtruncate;
 	bool		pubviaroot;
+	bool		pubtables;
+	bool		pubschemas;
 } PublicationInfo;
 
 /*
@@ -628,6 +631,18 @@ typedef struct _PublicationRelInfo
 } PublicationRelInfo;
 
 /*
+ * The PublicationSchemaInfo struct is used to represent publication schema
+ * mapping.
+ */
+typedef struct PublicationSchemaInfo
+{
+	DumpableObject 	dobj;
+	NamespaceInfo  *pubschema;
+	char		   *pubname;
+	PublicationInfo *publication;
+} PublicationSchemaInfo;
+
+/*
  * The SubscriptionInfo struct is used to represent subscription.
  */
 typedef struct _SubscriptionInfo
@@ -732,6 +747,8 @@ extern PublicationInfo *getPublications(Archive *fout,
 										int *numPublications);
 extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
 								 int numTables);
+extern void getPublicationSchemas(Archive *fout, NamespaceInfo nspinfo[],
+								 int numSchemas);
 extern void getSubscriptions(Archive *fout);
 
 #endif							/* PG_DUMP_H */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 46461fb..13a6fcd 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -82,6 +82,7 @@ enum dbObjectTypePriorities
 	PRIO_POLICY,
 	PRIO_PUBLICATION,
 	PRIO_PUBLICATION_REL,
+	PRIO_PUBLICATION_SCHEMA,
 	PRIO_SUBSCRIPTION,
 	PRIO_DEFAULT_ACL,			/* done in ACL pass */
 	PRIO_EVENT_TRIGGER,			/* must be next to last! */
@@ -135,6 +136,7 @@ static const int dbObjectTypePriority[] =
 	PRIO_POLICY,				/* DO_POLICY */
 	PRIO_PUBLICATION,			/* DO_PUBLICATION */
 	PRIO_PUBLICATION_REL,		/* DO_PUBLICATION_REL */
+	PRIO_PUBLICATION_SCHEMA,	/* DO_PUBLICATION_SCHEMA */
 	PRIO_SUBSCRIPTION			/* DO_SUBSCRIPTION */
 };
 
@@ -1477,6 +1479,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
 					 "PUBLICATION TABLE (ID %d OID %u)",
 					 obj->dumpId, obj->catId.oid);
 			return;
+		case DO_PUBLICATION_SCHEMA:
+			snprintf(buf, bufsize,
+					 "PUBLICATION SCHEMA (ID %d OID %u)",
+					 obj->dumpId, obj->catId.oid);
+			return;
 		case DO_SUBSCRIPTION:
 			snprintf(buf, bufsize,
 					 "SUBSCRIPTION (ID %d OID %u)",
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 20af5a9..f9a1283 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -2884,6 +2884,39 @@ describeOneTableDetails(const char *schemaname,
 				printTableAddFooter(&cont, buf.data);
 			}
 			PQclear(result);
+
+			if (pset.sversion >= 140000)
+			{
+				int pub_schema_tuples;
+				printfPQExpBuffer(&buf,
+								  "SELECT pubname\n"
+								  "FROM pg_catalog.pg_publication p\n"
+								  "WHERE p.oid in \n"
+								  "(SELECT s.prpubid FROM \n"
+								  "pg_catalog.pg_class c, \n"
+								  "pg_catalog.pg_publication_schema s \n"
+								  "where c.oid = '%s' AND \n"
+								  "c.relnamespace = s.prnspcid)",
+								  oid);
+				result = PSQLexec(buf.data);
+				if (!result)
+					goto error_return;
+				else
+					pub_schema_tuples = PQntuples(result);
+
+				if (!tuples && pub_schema_tuples > 0)
+					printTableAddFooter(&cont, _("Publications:"));
+
+				/* Might be an empty set - that's ok */
+				for (i = 0; i < pub_schema_tuples; i++)
+				{
+					printfPQExpBuffer(&buf, "    \"%s\"",
+									PQgetvalue(result, i, 0));
+
+					printTableAddFooter(&cont, buf.data);
+				}
+				PQclear(result);
+			}
 		}
 	}
 
@@ -5829,7 +5862,7 @@ listPublications(const char *pattern)
 	PQExpBufferData buf;
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
-	static const bool translate_columns[] = {false, false, false, false, false, false, false, false};
+	static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -5864,7 +5897,12 @@ listPublications(const char *pattern)
 		appendPQExpBuffer(&buf,
 						  ",\n  pubviaroot AS \"%s\"",
 						  gettext_noop("Via root"));
-
+	if (pset.sversion >= 140000)
+		appendPQExpBuffer(&buf,
+						  ",\n  pubtables AS \"%s\",\n"
+						  "     pubschemas AS  \"%s\"",
+						  gettext_noop("Tables"),
+						  gettext_noop("Schemas"));
 	appendPQExpBufferStr(&buf,
 						 "\nFROM pg_catalog.pg_publication\n");
 
@@ -5906,6 +5944,8 @@ describePublications(const char *pattern)
 	PGresult   *res;
 	bool		has_pubtruncate;
 	bool		has_pubviaroot;
+	bool		has_pubtables;
+	bool		has_pubschemas;
 
 	if (pset.sversion < 100000)
 	{
@@ -5919,6 +5959,8 @@ describePublications(const char *pattern)
 
 	has_pubtruncate = (pset.sversion >= 110000);
 	has_pubviaroot = (pset.sversion >= 130000);
+	has_pubtables = (pset.sversion >= 140000);
+	has_pubschemas= (pset.sversion >= 140000);
 
 	initPQExpBuffer(&buf);
 
@@ -5932,6 +5974,13 @@ describePublications(const char *pattern)
 	if (has_pubviaroot)
 		appendPQExpBufferStr(&buf,
 							 ", pubviaroot");
+	if (has_pubtables)
+		appendPQExpBufferStr(&buf,
+							 ", pubtables");
+	if (has_pubschemas)
+		appendPQExpBufferStr(&buf,
+							 ", pubschemas");
+
 	appendPQExpBufferStr(&buf,
 						 "\nFROM pg_catalog.pg_publication\n");
 
@@ -5974,6 +6023,8 @@ describePublications(const char *pattern)
 		char	   *pubid = PQgetvalue(res, i, 0);
 		char	   *pubname = PQgetvalue(res, i, 1);
 		bool		puballtables = strcmp(PQgetvalue(res, i, 3), "t") == 0;
+		bool		pubtables;
+		bool		pubschemas;
 		int			j;
 		PQExpBufferData title;
 		printTableOpt myopt = pset.popt.topt;
@@ -5983,6 +6034,16 @@ describePublications(const char *pattern)
 			ncols++;
 		if (has_pubviaroot)
 			ncols++;
+		if (has_pubtables)
+		{
+			pubtables = strcmp(PQgetvalue(res, i, 9), "t") == 0;
+			ncols++;
+		}
+		if (has_pubschemas)
+		{
+			pubschemas = strcmp(PQgetvalue(res, i, 10), "t") == 0;
+			ncols++;
+		}
 
 		initPQExpBuffer(&title);
 		printfPQExpBuffer(&title, _("Publication %s"), pubname);
@@ -5997,6 +6058,10 @@ describePublications(const char *pattern)
 			printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
 		if (has_pubviaroot)
 			printTableAddHeader(&cont, gettext_noop("Via root"), true, align);
+		if (has_pubtables)
+			printTableAddHeader(&cont, gettext_noop("Pubtables"), true, align);
+		if (has_pubschemas)
+			printTableAddHeader(&cont, gettext_noop("Pubschemas"), true, align);
 
 		printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
 		printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
@@ -6007,8 +6072,13 @@ describePublications(const char *pattern)
 			printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
 		if (has_pubviaroot)
 			printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false);
+		if (has_pubtables)
+			printTableAddCell(&cont, PQgetvalue(res, i, 9), false, false);
+		if (has_pubschemas)
+			printTableAddCell(&cont, PQgetvalue(res, i, 10), false, false);
 
-		if (!puballtables)
+		/* Prior to version 14 check was based on all tables */
+		if ((has_pubtables && pubtables) || (!has_pubtables && !puballtables))
 		{
 			printfPQExpBuffer(&buf,
 							  "SELECT n.nspname, c.relname\n"
@@ -6045,6 +6115,40 @@ describePublications(const char *pattern)
 			}
 			PQclear(tabres);
 		}
+		else if (pubschemas)
+		{
+			printfPQExpBuffer(&buf,
+							  "SELECT n.nspname\n"
+							  "FROM pg_catalog.pg_namespace n,\n"
+							  "     pg_catalog.pg_publication_schema ps\n"
+							  "WHERE n.oid = ps.prnspcid\n"
+							  "  AND ps.prpubid = '%s'\n"
+							  "ORDER BY 1", pubid);
+
+			tabres = PSQLexec(buf.data);
+			if (!tabres)
+			{
+				printTableCleanup(&cont);
+				PQclear(res);
+				termPQExpBuffer(&buf);
+				termPQExpBuffer(&title);
+				return false;
+			}
+			else
+				tables = PQntuples(tabres);
+
+			if (tables > 0)
+				printTableAddFooter(&cont, _("Schemas:"));
+
+			for (j = 0; j < tables; j++)
+			{
+				printfPQExpBuffer(&buf, "    \"%s\"",
+								  PQgetvalue(tabres, j, 0));
+
+				printTableAddFooter(&cont, buf.data);
+			}
+			PQclear(tabres);
+		}
 
 		printTable(&cont, pset.queryFout, false, pset.logfile);
 		printTableCleanup(&cont);
diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h
index f272e2c..11af0c8 100644
--- a/src/include/catalog/dependency.h
+++ b/src/include/catalog/dependency.h
@@ -131,6 +131,7 @@ typedef enum ObjectClass
 	OCLASS_POLICY,				/* pg_policy */
 	OCLASS_PUBLICATION,			/* pg_publication */
 	OCLASS_PUBLICATION_REL,		/* pg_publication_rel */
+	OCLASS_PUBLICATION_SCHEMA,	/* pg_publication_schema */
 	OCLASS_SUBSCRIPTION,		/* pg_subscription */
 	OCLASS_TRANSFORM			/* pg_transform */
 } ObjectClass;
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 0dd50fe..da3bd01 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -20,6 +20,7 @@
 #include "catalog/genbki.h"
 #include "catalog/objectaddress.h"
 #include "catalog/pg_publication_d.h"
+#include "utils/array.h"
 
 /* ----------------
  *		pg_publication definition.  cpp turns this into
@@ -54,6 +55,12 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 
 	/* true if partition changes are published using root schema */
 	bool		pubviaroot;
+
+	/* indicates publication is for specific tables */
+	bool 		pubtables;
+
+	/* indicates publication is for specific schema tables */
+	bool		pubschemas;
 } FormData_pg_publication;
 
 /* ----------------
@@ -83,6 +90,8 @@ typedef struct Publication
 	bool		alltables;
 	bool		pubviaroot;
 	PublicationActions pubactions;
+	bool		pubtables;
+	bool		pubschemas;
 } Publication;
 
 extern Publication *GetPublication(Oid pubid);
@@ -106,15 +115,16 @@ typedef enum PublicationPartOpt
 } PublicationPartOpt;
 
 extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
+extern List *GetPublicationSchemas(Oid pubid);
 extern List *GetAllTablesPublications(void);
-extern List *GetAllTablesPublicationRelations(bool pubviaroot);
+extern List *GetAllTablesPublicationRelations(Publication *publication);
 
 extern bool is_publishable_relation(Relation rel);
 extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
 											  bool if_not_exists);
-
+extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaoid,
+											bool if_not_exists);
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
 extern char *get_publication_name(Oid pubid, bool missing_ok);
 
-
 #endif							/* PG_PUBLICATION_H */
diff --git a/src/include/catalog/pg_publication_schema.h b/src/include/catalog/pg_publication_schema.h
new file mode 100644
index 0000000..73d5815
--- /dev/null
+++ b/src/include/catalog/pg_publication_schema.h
@@ -0,0 +1,49 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_publication_schema.h
+ *	  definition of the system catalog for mappings between schemas and
+ *	  publications (pg_publication_schema)
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_publication_schema.h
+ *
+ * NOTES
+ *	  The Catalog.pm module reads this file and derives schema
+ *	  information.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_PUBLICATION_SCHEMA_H
+#define PG_PUBLICATION_SCHEMA_H
+
+#include "catalog/genbki.h"
+#include "catalog/pg_publication_schema_d.h"
+
+
+/* ----------------
+ *		pg_publication_schema definition.  cpp turns this into
+ *		typedef struct FormData_pg_publication_schema
+ * ----------------
+ */
+CATALOG(pg_publication_schema,8901,PublicationSchemaRelationId)
+{
+	Oid			oid;			/* oid */
+	Oid			prpubid;		/* Oid of the publication */
+	Oid			prnspcid;		/* Oid of the schema */
+} FormData_pg_publication_schema;
+
+/* ----------------
+ *		Form_pg_publication_schema corresponds to a pointer to a tuple with
+ *		the format of pg_publication_schema relation.
+ * ----------------
+ */
+typedef FormData_pg_publication_schema *Form_pg_publication_schema;
+
+DECLARE_UNIQUE_INDEX(pg_publication_schema_oid_index, 8902, on pg_publication_schema using btree(oid oid_ops));
+#define PublicationSchemaObjectIndexId 8902
+DECLARE_UNIQUE_INDEX(pg_publication_schema_prrelid_prpubid_index, 8903, on pg_publication_schema using btree(prnspcid oid_ops, prpubid oid_ops));
+#define PublicationSchemaPrnspcidPrpubidIndexId 8903
+
+#endif							/* PG_PUBLICATION_REL_H */
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index 00e2e62..581fedb 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -21,6 +21,7 @@
 extern ObjectAddress CreatePublication(CreatePublicationStmt *stmt);
 extern void AlterPublication(AlterPublicationStmt *stmt);
 extern void RemovePublicationRelById(Oid proid);
+extern void RemovePublicationSchemaById(Oid proid);
 
 extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
 extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index dc2bb40..8f9639e 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1747,6 +1747,7 @@ typedef enum ObjectType
 	OBJECT_PROCEDURE,
 	OBJECT_PUBLICATION,
 	OBJECT_PUBLICATION_REL,
+	OBJECT_PUBLICATION_SCHEMA,
 	OBJECT_ROLE,
 	OBJECT_ROUTINE,
 	OBJECT_RULE,
@@ -3523,6 +3524,7 @@ typedef struct CreatePublicationStmt
 	List	   *options;		/* List of DefElem nodes */
 	List	   *tables;			/* Optional list of tables to add */
 	bool		for_all_tables; /* Special publication for all tables in db */
+	List	   *schemas;		/* Optional list of schemas */
 } CreatePublicationStmt;
 
 typedef struct AlterPublicationStmt
@@ -3537,6 +3539,7 @@ typedef struct AlterPublicationStmt
 	List	   *tables;			/* List of tables to add/drop */
 	bool		for_all_tables; /* Special publication for all tables in db */
 	DefElemAction tableAction;	/* What action to perform with the tables */
+	List	   *schemas;		/* Optional list of schemas */
 } AlterPublicationStmt;
 
 typedef struct CreateSubscriptionStmt
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index d74a348..1ba2952 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -79,6 +79,8 @@ enum SysCacheIdentifier
 	PUBLICATIONOID,
 	PUBLICATIONREL,
 	PUBLICATIONRELMAP,
+	PUBLICATIONSCHEMA,
+	PUBLICATIONSCHEMAMAP,
 	RANGEMULTIRANGE,
 	RANGETYPE,
 	RELNAMENSP,
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 63d6ab7..4052873 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -28,20 +28,20 @@ ERROR:  unrecognized "publish" value: "cluster"
 CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0');
 ERROR:  conflicting or redundant options
 \dRp
-                                              List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
- testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f
+                                                        List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Tables | Schemas 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------+---------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f        | f      | f
+ testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f        | f      | f
 (2 rows)
 
 ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
 \dRp
-                                              List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
- testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f
+                                                        List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Tables | Schemas 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------+---------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f        | f      | f
+ testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f        | f      | f
 (2 rows)
 
 --- adding tables
@@ -85,10 +85,10 @@ Publications:
     "testpub_foralltables"
 
 \dRp+ testpub_foralltables
-                              Publication testpub_foralltables
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | t          | t       | t       | f       | f         | f
+                                          Publication testpub_foralltables
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | t          | t       | t       | f       | f         | f        | f         | f
 (1 row)
 
 DROP TABLE testpub_tbl2;
@@ -100,19 +100,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
 CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
 RESET client_min_messages;
 \dRp+ testpub3
-                                    Publication testpub3
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                Publication testpub3
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | t         | f
 Tables:
     "public.testpub_tbl3"
     "public.testpub_tbl3a"
 
 \dRp+ testpub4
-                                    Publication testpub4
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                Publication testpub4
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | t         | f
 Tables:
     "public.testpub_tbl3"
 
@@ -131,10 +131,10 @@ ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
 -- only parent is listed as being in publication, not the partition
 ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
 \dRp+ testpub_forparted
-                               Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                            Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | t         | f
 Tables:
     "public.testpub_parted"
 
@@ -147,10 +147,10 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 UPDATE testpub_parted1 SET a = 1;
 ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
 \dRp+ testpub_forparted
-                               Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | t
+                                            Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | t        | t         | f
 Tables:
     "public.testpub_parted"
 
@@ -170,10 +170,10 @@ ERROR:  relation "testpub_tbl1" is already member of publication "testpub_fortbl
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 ERROR:  publication "testpub_fortbl" already exists
 \dRp+ testpub_fortbl
-                                 Publication testpub_fortbl
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                             Publication testpub_fortbl
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | t         | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -211,10 +211,10 @@ Publications:
     "testpub_fortbl"
 
 \dRp+ testpub_default
-                                Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | f         | f
+                                             Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | f         | f        | t         | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -258,10 +258,10 @@ DROP TABLE testpub_parted;
 DROP VIEW testpub_view;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
-                                Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | f         | f
+                                             Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | f         | f        | f         | f
 (1 row)
 
 -- fail - must be owner of publication
@@ -271,20 +271,20 @@ ERROR:  must be owner of publication testpub_default
 RESET ROLE;
 ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
 \dRp testpub_foo
-                                           List of publications
-    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
--------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f
+                                                    List of publications
+    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Tables | Schemas 
+-------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------+---------
+ testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f        | f      | f
 (1 row)
 
 -- rename back to keep the rest simple
 ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
 ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
 \dRp testpub_default
-                                             List of publications
-      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
------------------+---------------------------+------------+---------+---------+---------+-----------+----------
- testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f
+                                                       List of publications
+      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Tables | Schemas 
+-----------------+---------------------------+------------+---------+---------+---------+-----------+----------+--------+---------
+ testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f        | f      | f
 (1 row)
 
 DROP PUBLICATION testpub_default;
diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out
index d9ce961..fe5a038 100644
--- a/src/test/regress/expected/sanity_check.out
+++ b/src/test/regress/expected/sanity_check.out
@@ -140,6 +140,7 @@ pg_policy|t
 pg_proc|t
 pg_publication|t
 pg_publication_rel|t
+pg_publication_schema|t
 pg_range|t
 pg_replication_origin|t
 pg_rewrite|t
-- 
1.8.3.1

From e0bb91a040625100c2d2b3af4eba5c991eed6487 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@enterprisedb.com>
Date: Sun, 31 Jan 2021 22:53:23 +0530
Subject: [PATCH v3 2/2] Tests and documentation for schema level support for
 publication.

Tests and documentation for schema level support for publication.
---
 doc/src/sgml/ref/alter_publication.sgml      |  45 +++++-
 doc/src/sgml/ref/create_publication.sgml     |  31 ++++-
 src/test/regress/expected/object_address.out |   6 +-
 src/test/regress/expected/publication.out    | 198 ++++++++++++++++++++++++++-
 src/test/regress/sql/object_address.sql      |   3 +
 src/test/regress/sql/publication.sql         |  87 +++++++++++-
 src/test/subscription/t/001_rep_changes.pl   | 131 +++++++++++++++++-
 7 files changed, 494 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index faa114b..884cab4 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -24,6 +24,9 @@ PostgreSQL documentation
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD SCHEMA <replaceable class="parameter">schema_name</replaceable> [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET SCHEMA <replaceable class="parameter">schema_name</replaceable> [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP SCHEMA <replaceable class="parameter">schema_name</replaceable> [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
@@ -50,7 +53,18 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
   </para>
 
   <para>
-   The fourth variant of this command listed in the synopsis can change
+   The fourth, fifth and sixth variants change which schemas are part of the
+   publication.  The <literal>SET SCHEMA</literal> clause will replace the list
+   of schemas in the publication with the specified one.  The <literal>ADD
+   SCHEMA</literal> and <literal>DROP SCHEMA</literal> clauses will add and
+   remove one or more schemas from the publication.  Note that adding schemas
+   to a publication that is already subscribed to will require a <literal>ALTER
+   SUBSCRIPTION ... REFRESH PUBLICATION</literal> action on the subscribing side
+   in order to become effective.
+  </para>
+
+  <para>
+   The seventh variant of this command listed in the synopsis can change
    all of the publication properties specified in
    <xref linkend="sql-createpublication"/>.  Properties not mentioned in the
    command retain their previous settings.
@@ -98,6 +112,15 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
    </varlistentry>
 
    <varlistentry>
+    <term><replaceable class="parameter">schema_name</replaceable></term>
+    <listitem>
+     <para>
+      Name of an existing schema.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term><literal>SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term>
     <listitem>
      <para>
@@ -142,6 +165,26 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete');
 <programlisting>
 ALTER PUBLICATION mypublication ADD TABLE users, departments;
 </programlisting></para>
+
+  <para>
+   Add some schemas to the publication:
+<programlisting>
+ALTER PUBLICATION sales_publication ADD SCHEMA marketing_june, sales_june;
+</programlisting>
+  </para>
+
+  <para>
+   Drop some schema from the publication:
+<programlisting>
+ALTER PUBLICATION production_quarterly_publication DROP SCHEMA production_july;
+</programlisting>
+  </para>
+
+  <para>
+   Set schema to the publication:
+<programlisting>
+ALTER PUBLICATION production_publication SET SCHEMA production_july;
+</programlisting></para>
  </refsect1>
 
  <refsect1>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index ff82fbc..09c079d 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -22,8 +22,9 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
-    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
-      | FOR ALL TABLES ]
+    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+      | FOR SCHEMA <replaceable class="parameter">schema_name</replaceable> [, ... ]
+      | FOR ALL TABLES 
     [ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 </synopsis>
  </refsynopsisdiv>
@@ -100,6 +101,16 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
    </varlistentry>
 
    <varlistentry>
+    <term><literal>FOR SCHEMA</literal></term>
+    <listitem>
+     <para>
+      Marks the publication as one that replicates changes for the all tables in
+      the specified list of schemas, including tables created in the future.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term><literal>WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term>
     <listitem>
      <para>
@@ -222,6 +233,22 @@ CREATE PUBLICATION alltables FOR ALL TABLES;
 <programlisting>
 CREATE PUBLICATION insert_only FOR TABLE mydata
     WITH (publish = 'insert');
+</programlisting>
+  </para>
+
+  <para>
+   Create a publication that publishes all changes for all the tables present in
+production schema:
+<programlisting>
+CREATE PUBLICATION production_publication FOR SCHEMA production;
+</programlisting>
+  </para>
+
+  <para>
+   Create a publication that publishes all changes for all the tables present in
+marketing and sales schemas:
+<programlisting>
+CREATE PUBLICATION sales_publication FOR SCHEMA marketing, sales;
 </programlisting></para>
  </refsect1>
 
diff --git a/src/test/regress/expected/object_address.out b/src/test/regress/expected/object_address.out
index 388097a..49ea22f 100644
--- a/src/test/regress/expected/object_address.out
+++ b/src/test/regress/expected/object_address.out
@@ -45,6 +45,7 @@ CREATE TRANSFORM FOR int LANGUAGE SQL (
 -- suppress warning that depends on wal_level
 SET client_min_messages = 'ERROR';
 CREATE PUBLICATION addr_pub FOR TABLE addr_nsp.gentable;
+CREATE PUBLICATION addr_pub_schema FOR SCHEMA addr_nsp;
 RESET client_min_messages;
 CREATE SUBSCRIPTION regress_addr_sub CONNECTION '' PUBLICATION bar WITH (connect = false, slot_name = NONE);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
@@ -428,6 +429,7 @@ WITH objects (type, name, args) AS (VALUES
 				('access method', '{btree}', '{}'),
 				('publication', '{addr_pub}', '{}'),
 				('publication relation', '{addr_nsp, gentable}', '{addr_pub}'),
+				('publication schema', '{addr_nsp}', '{addr_pub_schema}'),
 				('subscription', '{regress_addr_sub}', '{}'),
 				('statistics object', '{addr_nsp, gentable_stat}', '{}')
         )
@@ -490,7 +492,8 @@ SELECT (pg_identify_object(addr1.classid, addr1.objid, addr1.objsubid)).*,
  subscription              |            | regress_addr_sub  | regress_addr_sub                                                     | t
  publication               |            | addr_pub          | addr_pub                                                             | t
  publication relation      |            |                   | addr_nsp.gentable in publication addr_pub                            | t
-(49 rows)
+ publication schema        |            |                   | addr_nsp in publication addr_pub_schema                              | t
+(50 rows)
 
 ---
 --- Cleanup resources
@@ -502,6 +505,7 @@ drop cascades to foreign table genftable
 drop cascades to server integer
 drop cascades to user mapping for regress_addr_user on server integer
 DROP PUBLICATION addr_pub;
+DROP PUBLICATION addr_pub_schema;
 DROP SUBSCRIPTION regress_addr_sub;
 DROP SCHEMA addr_nsp CASCADE;
 NOTICE:  drop cascades to 14 other objects
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 4052873..3349de7 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -255,7 +255,6 @@ DROP PUBLICATION testpub2;
 SET ROLE regress_publication_user;
 REVOKE CREATE ON DATABASE regression FROM regress_publication_user2;
 DROP TABLE testpub_parted;
-DROP VIEW testpub_view;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
                                              Publication testpub_default
@@ -287,11 +286,208 @@ ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
  testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f        | f      | f
 (1 row)
 
+-- CREATE publication with schema
+CREATE SCHEMA pub_test1;
+CREATE SCHEMA pub_test2;
+CREATE SCHEMA pub_test3;
+CREATE TABLE pub_test1.tbl1 (id serial primary key, data text);
+CREATE TABLE pub_test2.tbl1 (id serial primary key, data text);
+-- suppress warning that depends on wal_level
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub1_forschema FOR SCHEMA pub_test1;
+RESET client_min_messages;
+\dRp+ testpub1_forschema
+                                           Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+
+-- suppress warning that depends on wal_level
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub2_forschema FOR SCHEMA pub_test1, pub_test2, pub_test3;
+RESET client_min_messages;
+\dRp+ testpub2_forschema
+                                           Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+    "pub_test2"
+    "pub_test3"
+
+--- Check create publication on a schema that does not exist.
+CREATE PUBLICATION testpub_forschema FOR SCHEMA non_existent_schema;
+ERROR:  schema "non_existent_schema" does not exist
+--- Check create publication on a object which is not schema.
+CREATE PUBLICATION testpub1_forschema1 FOR SCHEMA testpub_view;
+ERROR:  schema "testpub_view" does not exist
+-- Dropping the schema should reflect the change in publication.
+DROP SCHEMA pub_test3;
+\dRp+ testpub2_forschema
+                                           Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+    "pub_test2"
+
+-- Renaming the schema should reflect the change in publication.
+ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed;
+\dRp+ testpub2_forschema
+                                           Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1_renamed"
+    "pub_test2"
+
+ALTER SCHEMA pub_test1_renamed RENAME to pub_test1;
+\dRp+ testpub2_forschema
+                                           Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+    "pub_test2"
+
+-- Alter publication add schema
+ALTER PUBLICATION testpub1_forschema ADD SCHEMA pub_test2;
+\dRp+ testpub1_forschema
+                                           Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+    "pub_test2"
+
+-- Add non existent schema
+ALTER PUBLICATION testpub1_forschema ADD SCHEMA non_existent_schema;
+ERROR:  schema "non_existent_schema" does not exist
+\dRp+ testpub1_forschema
+                                           Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+    "pub_test2"
+
+-- Add a schema which is already added to the publication
+ALTER PUBLICATION testpub1_forschema ADD SCHEMA pub_test1;
+ERROR:  schema "pub_test1" is already member of publication "testpub1_forschema"
+\dRp+ testpub1_forschema
+                                           Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+    "pub_test2"
+
+-- Alter publication drop schema
+ALTER PUBLICATION testpub1_forschema DROP SCHEMA pub_test2;
+\dRp+ testpub1_forschema
+                                           Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+
+-- Drop schema that is not preset in the publication
+ALTER PUBLICATION testpub1_forschema DROP SCHEMA pub_test2;
+ERROR:  schema "pub_test2" is not part of the publication
+\dRp+ testpub1_forschema
+                                           Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+
+-- Drop a schema that does not exist in the system
+ALTER PUBLICATION testpub1_forschema DROP SCHEMA non_existent_schema;
+ERROR:  schema "non_existent_schema" does not exist
+\dRp+ testpub1_forschema
+                                           Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+
+-- Drop all schemas
+ALTER PUBLICATION testpub1_forschema DROP SCHEMA pub_test1;
+\dRp+ testpub1_forschema
+                                           Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | f
+(1 row)
+
+-- Alter publication set schema
+ALTER PUBLICATION testpub1_forschema SET SCHEMA pub_test1;
+\dRp+ testpub1_forschema
+                                           Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+
+-- Alter publication set multiple schema
+ALTER PUBLICATION testpub1_forschema SET SCHEMA pub_test1, pub_test2;
+\dRp+ testpub1_forschema
+                                           Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+    "pub_test2"
+
+-- Alter publication set non-existent schema
+ALTER PUBLICATION testpub1_forschema SET SCHEMA non_existent_schema;
+ERROR:  schema "non_existent_schema" does not exist
+\dRp+ testpub1_forschema
+                                           Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+    "pub_test2"
+
+-- Alter publication set it with the same schema
+ALTER PUBLICATION testpub1_forschema SET SCHEMA pub_test1, pub_test2;
+\dRp+ testpub1_forschema
+                                           Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas 
+--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f         | t
+Schemas:
+    "pub_test1"
+    "pub_test2"
+
+DROP VIEW testpub_view;
 DROP PUBLICATION testpub_default;
 DROP PUBLICATION testpib_ins_trunct;
 DROP PUBLICATION testpub_fortbl;
+DROP PUBLICATION testpub1_forschema;
+DROP PUBLICATION testpub2_forschema;
 DROP SCHEMA pub_test CASCADE;
 NOTICE:  drop cascades to table pub_test.testpub_nopk
+DROP SCHEMA pub_test1 CASCADE;
+NOTICE:  drop cascades to table pub_test1.tbl1
+DROP SCHEMA pub_test2 CASCADE;
+NOTICE:  drop cascades to table pub_test2.tbl1
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_publication_user, regress_publication_user2;
 DROP ROLE regress_publication_user_dummy;
diff --git a/src/test/regress/sql/object_address.sql b/src/test/regress/sql/object_address.sql
index 2f4f66e..56d9b85 100644
--- a/src/test/regress/sql/object_address.sql
+++ b/src/test/regress/sql/object_address.sql
@@ -48,6 +48,7 @@ CREATE TRANSFORM FOR int LANGUAGE SQL (
 -- suppress warning that depends on wal_level
 SET client_min_messages = 'ERROR';
 CREATE PUBLICATION addr_pub FOR TABLE addr_nsp.gentable;
+CREATE PUBLICATION addr_pub_schema FOR SCHEMA addr_nsp;
 RESET client_min_messages;
 CREATE SUBSCRIPTION regress_addr_sub CONNECTION '' PUBLICATION bar WITH (connect = false, slot_name = NONE);
 CREATE STATISTICS addr_nsp.gentable_stat ON a, b FROM addr_nsp.gentable;
@@ -198,6 +199,7 @@ WITH objects (type, name, args) AS (VALUES
 				('access method', '{btree}', '{}'),
 				('publication', '{addr_pub}', '{}'),
 				('publication relation', '{addr_nsp, gentable}', '{addr_pub}'),
+				('publication schema', '{addr_nsp}', '{addr_pub_schema}'),
 				('subscription', '{regress_addr_sub}', '{}'),
 				('statistics object', '{addr_nsp, gentable_stat}', '{}')
         )
@@ -215,6 +217,7 @@ SELECT (pg_identify_object(addr1.classid, addr1.objid, addr1.objsubid)).*,
 ---
 DROP FOREIGN DATA WRAPPER addr_fdw CASCADE;
 DROP PUBLICATION addr_pub;
+DROP PUBLICATION addr_pub_schema;
 DROP SUBSCRIPTION regress_addr_sub;
 
 DROP SCHEMA addr_nsp CASCADE;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index d844075..ae23f9f 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -148,7 +148,6 @@ SET ROLE regress_publication_user;
 REVOKE CREATE ON DATABASE regression FROM regress_publication_user2;
 
 DROP TABLE testpub_parted;
-DROP VIEW testpub_view;
 DROP TABLE testpub_tbl1;
 
 \dRp+ testpub_default
@@ -169,11 +168,97 @@ ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
 
 \dRp testpub_default
 
+-- CREATE publication with schema
+CREATE SCHEMA pub_test1;
+CREATE SCHEMA pub_test2;
+CREATE SCHEMA pub_test3;
+CREATE TABLE pub_test1.tbl1 (id serial primary key, data text);
+CREATE TABLE pub_test2.tbl1 (id serial primary key, data text);
+
+-- suppress warning that depends on wal_level
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub1_forschema FOR SCHEMA pub_test1;
+RESET client_min_messages;
+\dRp+ testpub1_forschema
+
+-- suppress warning that depends on wal_level
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub2_forschema FOR SCHEMA pub_test1, pub_test2, pub_test3;
+RESET client_min_messages;
+\dRp+ testpub2_forschema
+
+--- Check create publication on a schema that does not exist.
+CREATE PUBLICATION testpub_forschema FOR SCHEMA non_existent_schema;
+
+--- Check create publication on a object which is not schema.
+CREATE PUBLICATION testpub1_forschema1 FOR SCHEMA testpub_view;
+
+-- Dropping the schema should reflect the change in publication.
+DROP SCHEMA pub_test3;
+\dRp+ testpub2_forschema
+
+-- Renaming the schema should reflect the change in publication.
+ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed;
+\dRp+ testpub2_forschema
+
+ALTER SCHEMA pub_test1_renamed RENAME to pub_test1;
+\dRp+ testpub2_forschema
+
+-- Alter publication add schema
+ALTER PUBLICATION testpub1_forschema ADD SCHEMA pub_test2;
+\dRp+ testpub1_forschema
+
+-- Add non existent schema
+ALTER PUBLICATION testpub1_forschema ADD SCHEMA non_existent_schema;
+\dRp+ testpub1_forschema
+
+-- Add a schema which is already added to the publication
+ALTER PUBLICATION testpub1_forschema ADD SCHEMA pub_test1;
+\dRp+ testpub1_forschema
+
+-- Alter publication drop schema
+ALTER PUBLICATION testpub1_forschema DROP SCHEMA pub_test2;
+\dRp+ testpub1_forschema
+
+-- Drop schema that is not preset in the publication
+ALTER PUBLICATION testpub1_forschema DROP SCHEMA pub_test2;
+\dRp+ testpub1_forschema
+
+-- Drop a schema that does not exist in the system
+ALTER PUBLICATION testpub1_forschema DROP SCHEMA non_existent_schema;
+\dRp+ testpub1_forschema
+
+-- Drop all schemas
+ALTER PUBLICATION testpub1_forschema DROP SCHEMA pub_test1;
+\dRp+ testpub1_forschema
+
+-- Alter publication set schema
+ALTER PUBLICATION testpub1_forschema SET SCHEMA pub_test1;
+\dRp+ testpub1_forschema
+
+-- Alter publication set multiple schema
+ALTER PUBLICATION testpub1_forschema SET SCHEMA pub_test1, pub_test2;
+\dRp+ testpub1_forschema
+
+-- Alter publication set non-existent schema
+ALTER PUBLICATION testpub1_forschema SET SCHEMA non_existent_schema;
+\dRp+ testpub1_forschema
+
+-- Alter publication set it with the same schema
+ALTER PUBLICATION testpub1_forschema SET SCHEMA pub_test1, pub_test2;
+\dRp+ testpub1_forschema
+
+DROP VIEW testpub_view;
+
 DROP PUBLICATION testpub_default;
 DROP PUBLICATION testpib_ins_trunct;
 DROP PUBLICATION testpub_fortbl;
+DROP PUBLICATION testpub1_forschema;
+DROP PUBLICATION testpub2_forschema;
 
 DROP SCHEMA pub_test CASCADE;
+DROP SCHEMA pub_test1 CASCADE;
+DROP SCHEMA pub_test2 CASCADE;
 
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_publication_user, regress_publication_user2;
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index c20fadc..c715340 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 27;
+use Test::More tests => 40;
 
 # Initialize publisher node
 my $node_publisher = get_new_node('publisher');
@@ -246,6 +246,135 @@ $node_publisher->safe_psql('postgres', "DROP TABLE temp2");
 $node_subscriber->safe_psql('postgres', "DROP TABLE temp1");
 $node_subscriber->safe_psql('postgres', "DROP TABLE temp2");
 
+# Test replication with publications created using FOR SCHEMA option.
+# Create schemas and tables on publisher
+$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch3");
+$node_publisher->safe_psql('postgres', "CREATE TABLE sch1.temp1 AS SELECT generate_series(1,10) AS a ");
+$node_publisher->safe_psql('postgres', "CREATE TABLE sch1.temp2 AS SELECT generate_series(1,10) AS a");
+$node_publisher->safe_psql('postgres', "CREATE TABLE sch2.temp1 AS SELECT generate_series(1,10) AS a");
+$node_publisher->safe_psql('postgres', "CREATE TABLE sch2.temp2 AS SELECT generate_series(1,10) AS a");
+
+# Create schemas and tables on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch3");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.temp1 (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.temp2 (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch2.temp1 (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch2.temp2 (a int)");
+
+# Setup logical replication for schema sch1 and sch2 that will only be used for
+# this test
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_schema FOR SCHEMA sch1,sch2");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema"
+	);
+
+$node_publisher->wait_for_catchup('tap_sub_schema');
+
+# Check the schema table data is synced up.
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM sch1.temp1");
+is($result, qq(10|1|10), 'check rows on subscriber catchup');
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM sch1.temp2");
+is($result, qq(10|1|10), 'check rows on subscriber catchup');
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM sch2.temp1");
+is($result, qq(10|1|10), 'check rows on subscriber catchup');
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM sch2.temp2");
+is($result, qq(10|1|10), 'check rows on subscriber catchup');
+
+# Insert some data into few tables and verify that inserted data is replicated.
+$node_publisher->safe_psql('postgres', "INSERT INTO  sch1.temp1 VALUES(generate_series(11,20))");
+$node_publisher->safe_psql('postgres', "INSERT INTO  sch2.temp1 VALUES(generate_series(11,20))");
+
+$node_publisher->wait_for_catchup('tap_sub_schema');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM sch1.temp1");
+is($result, qq(20|1|20), 'check rows on subscriber catchup');
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM sch2.temp1");
+is($result, qq(20|1|20), 'check rows on subscriber catchup');
+
+# Create new table in the publication schema, verify that subscriber does not get
+# the new table data in the subscriber before refresh.
+$node_publisher->safe_psql('postgres', "CREATE TABLE SCH1.temp3 AS SELECT generate_series(1,10) AS a");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE SCH1.temp3(a INT)");
+$node_publisher->wait_for_catchup('tap_sub_schema');
+$result = $node_subscriber->safe_psql('postgres',
+        "SELECT count(*) FROM sch1.temp3");
+is($result, qq(0), 'check rows on subscriber catchup');
+
+# Table data shsould be reflected after refreshing the publication in
+# subscriber.
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
+$node_publisher->safe_psql('postgres', "INSERT INTO SCH1.temp3 VALUES(11)");
+$node_publisher->wait_for_catchup('tap_sub_schema');
+$result = $node_subscriber->safe_psql('postgres',
+        "SELECT count(*), min(a), max(a) FROM sch1.temp3");
+is($result, qq(11|1|11), 'check rows on subscriber catchup');
+
+# Set the schema of a publication schema table to a non publication schema and
+# verify that inserted data is not reflected by the subscriber.
+$node_publisher->safe_psql('postgres', "ALTER TABLE SCH1.temp3 SET SCHEMA SCH3");
+$node_publisher->safe_psql('postgres', "INSERT INTO SCH3.temp3 VALUES(11)");
+$node_publisher->wait_for_catchup('tap_sub_schema');
+$result = $node_subscriber->safe_psql('postgres',
+        "SELECT count(*), min(a), max(a) FROM sch1.temp3");
+is($result, qq(11|1|11), 'check rows on subscriber catchup');
+
+# Verify that the subscription relation list is updated after refresh.
+$result = $node_subscriber->safe_psql('postgres',
+        "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')");
+is($result, qq(5),
+        'check subscription relation status was dropped on subscriber');
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
+$result = $node_subscriber->safe_psql('postgres',
+        "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')");
+is($result, qq(4),
+        'check subscription relation status was dropped on subscriber');
+
+
+# Drop table from the publication schema, verify that subscriber removes the
+# table entry after refresh.
+$node_publisher->safe_psql('postgres', "DROP TABLE SCH1.temp2");
+$node_publisher->wait_for_catchup('tap_sub_schema');
+$result = $node_subscriber->safe_psql('postgres',
+        "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')");
+is($result, qq(4),
+        'check subscription relation status is not yet dropped on subscriber');
+
+# Table should be removed from pg_subscription_rel after refreshing the
+# publication in subscriber.
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
+$result = $node_subscriber->safe_psql('postgres',
+        "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')");
+is($result, qq(3),
+        'check subscription relation status was dropped on subscriber');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_schema");
+
+# Drop publications as we don't need them anymore
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_schema");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_publisher->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_publisher->safe_psql('postgres', "DROP SCHEMA sch3 cascade");
+$node_subscriber->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_subscriber->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_subscriber->safe_psql('postgres', "DROP SCHEMA sch3 cascade");
+
 # add REPLICA IDENTITY FULL so we can update
 $node_publisher->safe_psql('postgres',
 	"ALTER TABLE tab_full REPLICA IDENTITY FULL");
-- 
1.8.3.1

Reply via email to