Hi Euler,

On 2019-02-03 13:14, Andres Freund wrote:

On 2018-11-23 13:15:08 -0300, Euler Taveira wrote:
Besides the problem presented by Hironobu-san, I'm doing some cleanup
and improving docs. I also forget to declare pg_publication_rel TOAST
table.

Thanks for your review.

As far as I can tell, the patch has not been refreshed since. So I'm
marking this as returned with feedback for now. Please resubmit once
ready.


Do you have any plans for continuing working on this patch and submitting it again on the closest September commitfest? There are only a few days left. Anyway, I will be glad to review the patch if you do submit it, though I didn't yet dig deeply into the code.

I've rebased recently the entire patch set (attached) and it works fine. Your tap test is passed. Also I've added a new test case (see 0009 attached) with real life example of bidirectional replication (BDR) utilising this new WHERE clause. This naive BDR is implemented using is_cloud flag, which is set to TRUE/FALSE on cloud/remote nodes respectively.

Although almost all new tests are passed, there is a problem with DELETE replication, so 1 out of 10 tests is failed. It isn't replicated if the record was created with is_cloud=TRUE on cloud, replicated to remote; then updated with is_cloud=FALSE on remote, replicated to cloud; then deleted on remote.


Regards
--
Alexey Kondratov
Postgres Professional https://www.postgrespro.com
Russian Postgres Company
From ae80e1616fb6374968a09e3c44f0abe59ebf3a87 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Fri, 9 Mar 2018 18:39:22 +0000
Subject: [PATCH v2 1/9] Remove unused atttypmod column from initial table
 synchronization

 Since commit 7c4f52409a8c7d85ed169bbbc1f6092274d03920, atttypmod was
 added but not used. The removal is safe because COPY from publisher
 does not need such information.
---
 src/backend/replication/logical/tablesync.c | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 7881079e96..0a565dd837 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -647,7 +647,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[2] = {OIDOID, CHAROID};
-	Oid			attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
+	Oid			attrRow[3] = {TEXTOID, OIDOID, BOOLOID};
 	bool		isnull;
 	int			natt;
 
@@ -691,7 +691,6 @@ fetch_remote_table_info(char *nspname, char *relname,
 	appendStringInfo(&cmd,
 					 "SELECT a.attname,"
 					 "       a.atttypid,"
-					 "       a.atttypmod,"
 					 "       a.attnum = ANY(i.indkey)"
 					 "  FROM pg_catalog.pg_attribute a"
 					 "  LEFT JOIN pg_catalog.pg_index i"
@@ -703,7 +702,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 lrel->remoteid,
 					 (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
 					 lrel->remoteid);
-	res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
+	res = walrcv_exec(wrconn, cmd.data, 3, attrRow);
 
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
@@ -724,7 +723,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 		Assert(!isnull);
 		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
-		if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
+		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
 			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
 
 		/* Should never happen. */

base-commit: 9acda731184c1ebdf99172cbb19d0404b7eebc37
-- 
2.19.1

From 362f2cc97745690ff4739b530f5ba95aea59be09 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Fri, 9 Mar 2018 17:37:36 +0000
Subject: [PATCH v2 2/9] Store number of tuples in WalRcvExecResult

It seems to be a useful information while allocating memory for queries
that returns more than one row. It reduces memory allocation
for initial table synchronization.

While in it, since we have the number of columns, allocate only nfields
for cstrs instead of MaxTupleAttributeNumber.
---
 .../replication/libpqwalreceiver/libpqwalreceiver.c      | 9 ++++++---
 src/backend/replication/logical/tablesync.c              | 5 ++---
 src/include/replication/walreceiver.h                    | 1 +
 3 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 6eba08a920..846b6f89f1 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -878,6 +878,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 				 errdetail("Expected %d fields, got %d fields.",
 						   nRetTypes, nfields)));
 
+	walres->ntuples = PQntuples(pgres);
 	walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
 
 	/* Create tuple descriptor corresponding to expected result. */
@@ -888,7 +889,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 	attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
 
 	/* No point in doing more here if there were no tuples returned. */
-	if (PQntuples(pgres) == 0)
+	if (walres->ntuples == 0)
 		return;
 
 	/* Create temporary context for local allocations. */
@@ -897,15 +898,17 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 									   ALLOCSET_DEFAULT_SIZES);
 
 	/* Process returned rows. */
-	for (tupn = 0; tupn < PQntuples(pgres); tupn++)
+	for (tupn = 0; tupn < walres->ntuples; tupn++)
 	{
-		char	   *cstrs[MaxTupleAttributeNumber];
+		char	**cstrs;
 
 		ProcessWalRcvInterrupts();
 
 		/* Do the allocations in temporary context. */
 		oldcontext = MemoryContextSwitchTo(rowcontext);
 
+		cstrs = palloc(nfields * sizeof(char *));
+
 		/*
 		 * Fill cstrs with null-terminated strings of column values.
 		 */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0a565dd837..42db4ada9e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -709,9 +709,8 @@ fetch_remote_table_info(char *nspname, char *relname,
 				(errmsg("could not fetch table info for table \"%s.%s\": %s",
 						nspname, relname, res->err)));
 
-	/* We don't know the number of rows coming, so allocate enough space. */
-	lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
-	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
+	lrel->attnames = palloc0(res->ntuples * sizeof(char *));
+	lrel->atttyps = palloc0(res->ntuples * sizeof(Oid));
 	lrel->attkeys = NULL;
 
 	natt = 0;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e12a934966..0d32d598d8 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -196,6 +196,7 @@ typedef struct WalRcvExecResult
 	char	   *err;
 	Tuplestorestate *tuplestore;
 	TupleDesc	tupledesc;
+	int			ntuples;
 } WalRcvExecResult;
 
 /* libpqwalreceiver hooks */
-- 
2.19.1

From 2b23421d208c4760e7b3d38adc57ec62685b2d35 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Tue, 27 Feb 2018 02:21:03 +0000
Subject: [PATCH v2 3/9] Refactor function create_estate_for_relation

Relation localrel is the only LogicalRepRelMapEntry structure member
that is useful for create_estate_for_relation.
---
 src/backend/replication/logical/worker.c | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 11e6331f49..d9952c8b7e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -173,7 +173,7 @@ ensure_transaction(void)
  * This is based on similar code in copy.c
  */
 static EState *
-create_estate_for_relation(LogicalRepRelMapEntry *rel)
+create_estate_for_relation(Relation rel)
 {
 	EState	   *estate;
 	ResultRelInfo *resultRelInfo;
@@ -183,13 +183,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
 
 	rte = makeNode(RangeTblEntry);
 	rte->rtekind = RTE_RELATION;
-	rte->relid = RelationGetRelid(rel->localrel);
-	rte->relkind = rel->localrel->rd_rel->relkind;
+	rte->relid = RelationGetRelid(rel);
+	rte->relkind = rel->rd_rel->relkind;
 	rte->rellockmode = AccessShareLock;
 	ExecInitRangeTable(estate, list_make1(rte));
 
 	resultRelInfo = makeNode(ResultRelInfo);
-	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
+	InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
 
 	estate->es_result_relations = resultRelInfo;
 	estate->es_num_result_relations = 1;
@@ -589,7 +589,7 @@ apply_handle_insert(StringInfo s)
 	}
 
 	/* Initialize the executor state. */
-	estate = create_estate_for_relation(rel);
+	estate = create_estate_for_relation(rel->localrel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel),
 										&TTSOpsVirtual);
@@ -696,7 +696,7 @@ apply_handle_update(StringInfo s)
 	check_relation_updatable(rel);
 
 	/* Initialize the executor state. */
-	estate = create_estate_for_relation(rel);
+	estate = create_estate_for_relation(rel->localrel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel),
 										&TTSOpsVirtual);
@@ -815,7 +815,7 @@ apply_handle_delete(StringInfo s)
 	check_relation_updatable(rel);
 
 	/* Initialize the executor state. */
-	estate = create_estate_for_relation(rel);
+	estate = create_estate_for_relation(rel->localrel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel),
 										&TTSOpsVirtual);
-- 
2.19.1

From b5e55cace866ad8b4de554fe6cd326ad2d11fe81 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Wed, 24 Jan 2018 17:01:31 -0200
Subject: [PATCH v2 4/9] Rename a WHERE node

A WHERE clause will be used for row filtering in logical replication. We
already have a similar node: 'WHERE (condition here)'. Let's rename the
node to a generic name and use it for row filtering too.
---
 src/backend/parser/gram.y | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c97bb367f8..1de8f56794 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -476,7 +476,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <node>	def_arg columnElem where_clause where_or_current_clause
 				a_expr b_expr c_expr AexprConst indirection_el opt_slice_bound
 				columnref in_expr having_clause func_table xmltable array_expr
-				ExclusionWhereClause operator_def_arg
+				OptWhereClause operator_def_arg
 %type <list>	rowsfrom_item rowsfrom_list opt_col_def_list
 %type <boolean> opt_ordinality
 %type <list>	ExclusionConstraintList ExclusionConstraintElem
@@ -3710,7 +3710,7 @@ ConstraintElem:
 					$$ = (Node *)n;
 				}
 			| EXCLUDE access_method_clause '(' ExclusionConstraintList ')'
-				opt_c_include opt_definition OptConsTableSpace  ExclusionWhereClause
+				opt_c_include opt_definition OptConsTableSpace  OptWhereClause
 				ConstraintAttributeSpec
 				{
 					Constraint *n = makeNode(Constraint);
@@ -3812,7 +3812,7 @@ ExclusionConstraintElem: index_elem WITH any_operator
 			}
 		;
 
-ExclusionWhereClause:
+OptWhereClause:
 			WHERE '(' a_expr ')'					{ $$ = $3; }
 			| /*EMPTY*/								{ $$ = NULL; }
 		;
-- 
2.19.1

From fcb9a06babe9ce94bff256e93ae1a2f44c4532b6 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Tue, 27 Feb 2018 04:03:13 +0000
Subject: [PATCH v2 5/9] Row filtering for logical replication

When you define or modify a publication you optionally can filter rows
to be published using a WHERE condition. This condition is any
expression that evaluates to boolean. Only those rows that
satisfy the WHERE condition will be sent to subscribers.
---
 doc/src/sgml/ref/alter_publication.sgml     |   9 +-
 doc/src/sgml/ref/create_publication.sgml    |  14 ++-
 src/backend/catalog/pg_publication.c        |  46 +++++++-
 src/backend/commands/publicationcmds.c      |  74 ++++++++----
 src/backend/parser/gram.y                   |  26 ++++-
 src/backend/parser/parse_agg.c              |  10 ++
 src/backend/parser/parse_expr.c             |   5 +
 src/backend/parser/parse_func.c             |   2 +
 src/backend/replication/logical/proto.c     |   2 +-
 src/backend/replication/logical/relation.c  |  13 +++
 src/backend/replication/logical/tablesync.c | 119 ++++++++++++++++++--
 src/backend/replication/logical/worker.c    |   2 +-
 src/backend/replication/pgoutput/pgoutput.c |  98 +++++++++++++++-
 src/include/catalog/pg_publication.h        |  10 +-
 src/include/catalog/pg_publication_rel.h    |  10 +-
 src/include/nodes/nodes.h                   |   1 +
 src/include/nodes/parsenodes.h              |  11 +-
 src/include/parser/parse_node.h             |   1 +
 src/include/replication/logicalproto.h      |   2 +
 src/include/replication/logicalrelation.h   |   2 +
 src/test/regress/expected/misc_sanity.out   |   3 +-
 src/test/subscription/t/010_row_filter.pl   |  97 ++++++++++++++++
 22 files changed, 499 insertions(+), 58 deletions(-)
 create mode 100644 src/test/subscription/t/010_row_filter.pl

diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index 534e598d93..5984915767 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -21,8 +21,8 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
-ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_USER | SESSION_USER }
@@ -91,7 +91,10 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       table name, only that table is affected.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
-      name to explicitly indicate that descendant tables are included.
+      name to explicitly indicate that descendant tables are included. If the
+      optional <literal>WHERE</literal> clause is specified, rows that do not
+      satisfy the <replaceable class="parameter">expression</replaceable> will
+      not be published. Note that parentheses are required around the expression.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index 99f87ca393..d5fed304a1 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
-    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
       | FOR ALL TABLES ]
     [ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 
@@ -68,7 +68,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       that table is added to the publication.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are added.
       Optionally, <literal>*</literal> can be specified after the table name to
-      explicitly indicate that descendant tables are included.
+      explicitly indicate that descendant tables are included. If the optional
+      <literal>WHERE</literal> clause is specified, rows that do not satisfy
+      the <replaceable class="parameter">expression</replaceable> will not be
+      published. Note that parentheses are required around the expression.
      </para>
 
      <para>
@@ -183,6 +186,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments;
 </programlisting>
   </para>
 
+  <para>
+   Create a publication that publishes all changes from active departments:
+<programlisting>
+CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
+</programlisting>
+  </para>
+
   <para>
    Create a publication that publishes all changes in all tables:
 <programlisting>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index fd5da7d5f7..47a793408d 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -34,6 +34,10 @@
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
 
+#include "parser/parse_clause.h"
+#include "parser/parse_collate.h"
+#include "parser/parse_relation.h"
+
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
@@ -149,18 +153,21 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
  * Insert new publication / relation mapping.
  */
 ObjectAddress
-publication_add_relation(Oid pubid, Relation targetrel,
+publication_add_relation(Oid pubid, PublicationRelationQual *targetrel,
 						 bool if_not_exists)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	Datum		values[Natts_pg_publication_rel];
 	bool		nulls[Natts_pg_publication_rel];
-	Oid			relid = RelationGetRelid(targetrel);
+	Oid			relid = RelationGetRelid(targetrel->relation);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
+	ParseState		*pstate;
+	RangeTblEntry	*rte;
+	Node			*whereclause;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -180,10 +187,27 @@ publication_add_relation(Oid pubid, Relation targetrel,
 		ereport(ERROR,
 				(errcode(ERRCODE_DUPLICATE_OBJECT),
 				 errmsg("relation \"%s\" is already member of publication \"%s\"",
-						RelationGetRelationName(targetrel), pub->name)));
+						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel);
+	check_publication_add_relation(targetrel->relation);
+
+	/* Set up a pstate to parse with */
+	pstate = make_parsestate(NULL);
+	pstate->p_sourcetext = nodeToString(targetrel->whereClause);
+
+	rte = addRangeTableEntryForRelation(pstate, targetrel->relation,
+										AccessShareLock,
+										NULL, false, false);
+	addRTEtoQuery(pstate, rte, false, true, true);
+
+	whereclause = transformWhereClause(pstate,
+								copyObject(targetrel->whereClause),
+								EXPR_KIND_PUBLICATION_WHERE,
+								"PUBLICATION");
+
+	/* Fix up collation information */
+	assign_expr_collations(pstate, whereclause);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -197,6 +221,12 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
 
+	/* Add row filter, if available */
+	if (whereclause)
+		values[Anum_pg_publication_rel_prrowfilter - 1] = CStringGetTextDatum(nodeToString(whereclause));
+	else
+		nulls[Anum_pg_publication_rel_prrowfilter - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -213,11 +243,17 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	ObjectAddressSet(referenced, RelationRelationId, relid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
+	/* Add dependency on the objects mentioned in the row filter expression */
+	if (whereclause)
+		recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL);
+
+	free_parsestate(pstate);
+
 	/* Close the table. */
 	table_close(rel, RowExclusiveLock);
 
 	/* Invalidate relcache so that publication info is rebuilt. */
-	CacheInvalidateRelcache(targetrel);
+	CacheInvalidateRelcache(targetrel->relation);
 
 	return myself;
 }
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index f115d4bf80..bc7f9210e9 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -352,6 +352,27 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	Assert(list_length(stmt->tables) > 0);
 
+	/*
+	 * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause.  Use
+	 * publication_table_list node (that accepts a WHERE clause) but forbid the
+	 * WHERE clause in it.  The use of relation_expr_list node just for the
+	 * DROP TABLE part does not worth the trouble.
+	 */
+	if (stmt->tableAction == DEFELEM_DROP)
+	{
+		ListCell	*lc;
+
+		foreach(lc, stmt->tables)
+		{
+			PublicationTable *t = lfirst(lc);
+			if (t->whereClause)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("cannot use a WHERE clause for removing table from publication \"%s\"",
+								NameStr(pubform->pubname))));
+		}
+	}
+
 	rels = OpenTableList(stmt->tables);
 
 	if (stmt->tableAction == DEFELEM_ADD)
@@ -373,9 +394,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 			foreach(newlc, rels)
 			{
-				Relation	newrel = (Relation) lfirst(newlc);
+				PublicationRelationQual	*newrel = (PublicationRelationQual *) lfirst(newlc);
 
-				if (RelationGetRelid(newrel) == oldrelid)
+				if (RelationGetRelid(newrel->relation) == oldrelid)
 				{
 					found = true;
 					break;
@@ -384,7 +405,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 			if (!found)
 			{
-				Relation	oldrel = table_open(oldrelid,
+				PublicationRelationQual *oldrel = palloc(sizeof(PublicationRelationQual));
+				oldrel->relation = table_open(oldrelid,
 												ShareUpdateExclusiveLock);
 
 				delrels = lappend(delrels, oldrel);
@@ -510,16 +532,22 @@ OpenTableList(List *tables)
 	List	   *relids = NIL;
 	List	   *rels = NIL;
 	ListCell   *lc;
+	PublicationRelationQual	*relqual;
 
 	/*
 	 * Open, share-lock, and check all the explicitly-specified relations
 	 */
 	foreach(lc, tables)
 	{
-		RangeVar   *rv = castNode(RangeVar, lfirst(lc));
-		bool		recurse = rv->inh;
-		Relation	rel;
-		Oid			myrelid;
+		// RangeVar   *rv = castNode(RangeVar, lfirst(lc));
+		// bool		recurse = rv->inh;
+		// Relation	rel;
+		// Oid			myrelid;
+		PublicationTable	*t = lfirst(lc);
+		RangeVar  			*rv = t->relation;
+		Relation			rel;
+		bool				recurse = rv->inh;
+		Oid					myrelid;
 
 		/* Allow query cancel in case this takes a long time */
 		CHECK_FOR_INTERRUPTS();
@@ -539,8 +567,10 @@ OpenTableList(List *tables)
 			table_close(rel, ShareUpdateExclusiveLock);
 			continue;
 		}
-
-		rels = lappend(rels, rel);
+		relqual = palloc(sizeof(PublicationRelationQual));
+		relqual->relation = rel;
+		relqual->whereClause = t->whereClause;
+		rels = lappend(rels, relqual);
 		relids = lappend_oid(relids, myrelid);
 
 		/* Add children of this rel, if requested */
@@ -568,7 +598,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = table_open(childrelid, NoLock);
-				rels = lappend(rels, rel);
+				relqual = palloc(sizeof(PublicationRelationQual));
+				relqual->relation = rel;
+				/* child inherits WHERE clause from parent */
+				relqual->whereClause = t->whereClause;
+				rels = lappend(rels, relqual);
 				relids = lappend_oid(relids, childrelid);
 			}
 		}
@@ -589,10 +623,12 @@ CloseTableList(List *rels)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationQual	*rel = (PublicationRelationQual *) lfirst(lc);
 
-		table_close(rel, NoLock);
+		table_close(rel->relation, NoLock);
 	}
+
+	list_free_deep(rels);
 }
 
 /*
@@ -608,13 +644,13 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationQual	*rel = (PublicationRelationQual *) lfirst(lc);
 		ObjectAddress obj;
 
 		/* Must be owner of the table or superuser. */
-		if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
-			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
-						   RelationGetRelationName(rel));
+		if (!pg_class_ownercheck(RelationGetRelid(rel->relation), GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->relation->rd_rel->relkind),
+						   RelationGetRelationName(rel->relation));
 
 		obj = publication_add_relation(pubid, rel, if_not_exists);
 		if (stmt)
@@ -640,8 +676,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
-		Oid			relid = RelationGetRelid(rel);
+		PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);
+		Oid			relid = RelationGetRelid(rel->relation);
 
 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
 							   ObjectIdGetDatum(relid),
@@ -654,7 +690,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 			ereport(ERROR,
 					(errcode(ERRCODE_UNDEFINED_OBJECT),
 					 errmsg("relation \"%s\" is not part of the publication",
-							RelationGetRelationName(rel))));
+							RelationGetRelationName(rel->relation))));
 		}
 
 		ObjectAddressSet(obj, PublicationRelRelationId, prid);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 1de8f56794..bd87e80e1b 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -404,13 +404,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				relation_expr_list dostmt_opt_list
 				transform_element_list transform_type_list
 				TriggerTransitions TriggerReferencing
-				publication_name_list
+				publication_name_list publication_table_list
 				vacuum_relation_list opt_vacuum_relation_list
 
 %type <list>	group_by_list
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
-%type <node>	opt_publication_for_tables publication_for_tables
+%type <node>	opt_publication_for_tables publication_for_tables publication_table_elem
 %type <value>	publication_name_item
 
 %type <list>	opt_fdw_options fdw_options
@@ -9518,7 +9518,7 @@ opt_publication_for_tables:
 		;
 
 publication_for_tables:
-			FOR TABLE relation_expr_list
+			FOR TABLE publication_table_list
 				{
 					$$ = (Node *) $3;
 				}
@@ -9549,7 +9549,7 @@ AlterPublicationStmt:
 					n->options = $5;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name ADD_P TABLE relation_expr_list
+			| ALTER PUBLICATION name ADD_P TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9557,7 +9557,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_ADD;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name SET TABLE relation_expr_list
+			| ALTER PUBLICATION name SET TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9565,7 +9565,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_SET;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name DROP TABLE relation_expr_list
+			| ALTER PUBLICATION name DROP TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9575,6 +9575,20 @@ AlterPublicationStmt:
 				}
 		;
 
+publication_table_list:
+			publication_table_elem									{ $$ = list_make1($1); }
+			| publication_table_list ',' publication_table_elem		{ $$ = lappend($1, $3); }
+		;
+
+publication_table_elem: relation_expr OptWhereClause
+				{
+					PublicationTable *n = makeNode(PublicationTable);
+					n->relation = $1;
+					n->whereClause = $2;
+					$$ = (Node *) n;
+				}
+		;
+
 /*****************************************************************************
  *
  * CREATE SUBSCRIPTION name ...
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index f418c61545..e317e04695 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -536,6 +536,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr)
 				err = _("grouping operations are not allowed in CALL arguments");
 
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			if (isAgg)
+				err = _("aggregate functions are not allowed in publication WHERE expressions");
+			else
+				err = _("grouping operations are not allowed in publication WHERE expressions");
+
+			break;
 
 		case EXPR_KIND_COPY_WHERE:
 			if (isAgg)
@@ -933,6 +940,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
 		case EXPR_KIND_GENERATED_COLUMN:
 			err = _("window functions are not allowed in column generation expressions");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("window functions are not allowed in publication WHERE expressions");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
index 76f3dd7076..4942f28f50 100644
--- a/src/backend/parser/parse_expr.c
+++ b/src/backend/parser/parse_expr.c
@@ -571,6 +571,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref)
 		case EXPR_KIND_CALL_ARGUMENT:
 		case EXPR_KIND_COPY_WHERE:
 		case EXPR_KIND_GENERATED_COLUMN:
+		case EXPR_KIND_PUBLICATION_WHERE:
 			/* okay */
 			break;
 
@@ -1924,6 +1925,8 @@ transformSubLink(ParseState *pstate, SubLink *sublink)
 			break;
 		case EXPR_KIND_CALL_ARGUMENT:
 			err = _("cannot use subquery in CALL argument");
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("cannot use subquery in publication WHERE expression");
 			break;
 		case EXPR_KIND_COPY_WHERE:
 			err = _("cannot use subquery in COPY FROM WHERE condition");
@@ -3561,6 +3564,8 @@ ParseExprKindName(ParseExprKind exprKind)
 			return "WHERE";
 		case EXPR_KIND_GENERATED_COLUMN:
 			return "GENERATED AS";
+		case EXPR_KIND_PUBLICATION_WHERE:
+			return "publication expression";
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 8e926539e6..c7ec300d0e 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -2509,6 +2509,8 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location)
 			break;
 		case EXPR_KIND_CALL_ARGUMENT:
 			err = _("set-returning functions are not allowed in CALL arguments");
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("set-returning functions are not allowed in publication WHERE expressions");
 			break;
 		case EXPR_KIND_COPY_WHERE:
 			err = _("set-returning functions are not allowed in COPY FROM WHERE conditions");
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index e7df47de3e..eb4eaa2a33 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -378,7 +378,7 @@ logicalrep_write_rel(StringInfo out, Relation rel)
 LogicalRepRelation *
 logicalrep_read_rel(StringInfo in)
 {
-	LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
+	LogicalRepRelation *rel = palloc0(sizeof(LogicalRepRelation));
 
 	rel->remoteid = pq_getmsgint(in, 4);
 
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 85269c037d..a14986e3ab 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -140,6 +140,16 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
 	}
 	bms_free(remoterel->attkeys);
 
+	if (remoterel->nrowfilters > 0)
+	{
+		int		i;
+
+		for (i = 0; i < remoterel->nrowfilters; i++)
+			pfree(remoterel->rowfiltercond[i]);
+
+		pfree(remoterel->rowfiltercond);
+	}
+
 	if (entry->attrmap)
 		pfree(entry->attrmap);
 }
@@ -187,6 +197,9 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
 	}
 	entry->remoterel.replident = remoterel->replident;
 	entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+	entry->remoterel.rowfiltercond = palloc(remoterel->nrowfilters * sizeof(char *));
+	for (i = 0; i < remoterel->nrowfilters; i++)
+		entry->remoterel.rowfiltercond[i] = pstrdup(remoterel->rowfiltercond[i]);
 	MemoryContextSwitchTo(oldctx);
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 42db4ada9e..fc37f74e89 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -648,8 +648,14 @@ fetch_remote_table_info(char *nspname, char *relname,
 	TupleTableSlot *slot;
 	Oid			tableRow[2] = {OIDOID, CHAROID};
 	Oid			attrRow[3] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			rowfilterRow[1] = {TEXTOID};
 	bool		isnull;
-	int			natt;
+	int			n;
+	ListCell   *lc;
+	bool		first;
+
+	/* Avoid trashing relation map cache */
+	memset(lrel, 0, sizeof(LogicalRepRelation));
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
@@ -713,20 +719,20 @@ fetch_remote_table_info(char *nspname, char *relname,
 	lrel->atttyps = palloc0(res->ntuples * sizeof(Oid));
 	lrel->attkeys = NULL;
 
-	natt = 0;
+	n = 0;
 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
-		lrel->attnames[natt] =
+		lrel->attnames[n] =
 			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
 		Assert(!isnull);
-		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
+		lrel->atttyps[n] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
 		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
-			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
+			lrel->attkeys = bms_add_member(lrel->attkeys, n);
 
 		/* Should never happen. */
-		if (++natt >= MaxTupleAttributeNumber)
+		if (++n >= MaxTupleAttributeNumber)
 			elog(ERROR, "too many columns in remote table \"%s.%s\"",
 				 nspname, relname);
 
@@ -734,7 +740,54 @@ fetch_remote_table_info(char *nspname, char *relname,
 	}
 	ExecDropSingleTupleTableSlot(slot);
 
-	lrel->natts = natt;
+	lrel->natts = n;
+
+	walrcv_clear_result(res);
+
+	/* Fetch row filtering info */
+	resetStringInfo(&cmd);
+	appendStringInfo(&cmd, "SELECT pg_get_expr(prrowfilter, prrelid) FROM pg_publication p INNER JOIN pg_publication_rel pr ON (p.oid = pr.prpubid) WHERE pr.prrelid = %u AND p.pubname IN (", MyLogicalRepWorker->relid);
+
+	first = true;
+	foreach(lc, MySubscription->publications)
+	{
+		char	*pubname = strVal(lfirst(lc));
+
+		if (first)
+			first = false;
+		else
+			appendStringInfoString(&cmd, ", ");
+
+		appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+	}
+	appendStringInfoChar(&cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd.data, 1, rowfilterRow);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not fetch row filter info for table \"%s.%s\" from publisher: %s",
+						nspname, relname, res->err)));
+
+	lrel->rowfiltercond = palloc0(res->ntuples * sizeof(char *));
+
+	n = 0;
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		Datum rf = slot_getattr(slot, 1, &isnull);
+
+		if (!isnull)
+		{
+			char *p = TextDatumGetCString(rf);
+			lrel->rowfiltercond[n++] = p;
+		}
+
+		ExecClearTuple(slot);
+	}
+	ExecDropSingleTupleTableSlot(slot);
+
+	lrel->nrowfilters = n;
 
 	walrcv_clear_result(res);
 	pfree(cmd.data);
@@ -767,10 +820,57 @@ copy_table(Relation rel)
 	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
 	Assert(rel == relmapentry->localrel);
 
+	/* list of columns for COPY */
+	attnamelist = make_copy_attnamelist(relmapentry);
+
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "COPY %s TO STDOUT",
-					 quote_qualified_identifier(lrel.nspname, lrel.relname));
+	/*
+	 * If publication has any row filter, build a SELECT query with OR'ed row
+	 * filters for COPY.
+	 * If no row filters are available, use COPY for all
+	 * table contents.
+	 */
+	if (lrel.nrowfilters > 0)
+	{
+		ListCell   *lc;
+		bool		first;
+		int			i;
+
+		appendStringInfoString(&cmd, "COPY (SELECT ");
+		/* list of attribute names */
+		first = true;
+		foreach(lc, attnamelist)
+		{
+			char	*col = strVal(lfirst(lc));
+
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&cmd, ", ");
+			appendStringInfo(&cmd, "%s", quote_identifier(col));
+		}
+		appendStringInfo(&cmd, " FROM %s",
+						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		appendStringInfoString(&cmd, " WHERE ");
+		/* list of OR'ed filters */
+		first = true;
+		for (i = 0; i < lrel.nrowfilters; i++)
+		{
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&cmd, " OR ");
+			appendStringInfo(&cmd, "%s", lrel.rowfiltercond[i]);
+		}
+
+		appendStringInfoString(&cmd, ") TO STDOUT");
+	}
+	else
+	{
+		appendStringInfo(&cmd, "COPY %s TO STDOUT",
+						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+	}
 	res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
@@ -785,7 +885,6 @@ copy_table(Relation rel)
 	addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
 								  NULL, false, false);
 
-	attnamelist = make_copy_attnamelist(relmapentry);
 	cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
 
 	/* Do the copy */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d9952c8b7e..cef0c52955 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -172,7 +172,7 @@ ensure_transaction(void)
  *
  * This is based on similar code in copy.c
  */
-static EState *
+EState *
 create_estate_for_relation(Relation rel)
 {
 	EState	   *estate;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9c08757fca..e9646ac483 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -12,15 +12,26 @@
  */
 #include "postgres.h"
 
+#include "catalog/pg_type.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
+
+#include "executor/executor.h"
+#include "nodes/execnodes.h"
+#include "nodes/nodeFuncs.h"
+#include "optimizer/planner.h"
+#include "optimizer/optimizer.h"
+#include "parser/parse_coerce.h"
 
 #include "fmgr.h"
 
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
+#include "replication/logicalrelation.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
 
+#include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/int8.h"
 #include "utils/memutils.h"
@@ -60,6 +71,7 @@ typedef struct RelationSyncEntry
 	bool		schema_sent;	/* did we send the schema? */
 	bool		replicate_valid;
 	PublicationActions pubactions;
+	List		*row_filter;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -335,6 +347,63 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/* ... then check row filter */
+	if (list_length(relentry->row_filter) > 0)
+	{
+		HeapTuple		old_tuple;
+		HeapTuple		new_tuple;
+		TupleDesc		tupdesc;
+		EState			*estate;
+		ExprContext		*ecxt;
+		MemoryContext	oldcxt;
+		ListCell		*lc;
+
+		old_tuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL;
+		new_tuple = change->data.tp.newtuple ? &change->data.tp.newtuple->tuple : NULL;
+		tupdesc = RelationGetDescr(relation);
+		estate = create_estate_for_relation(relation);
+
+		/* prepare context per tuple */
+		ecxt = GetPerTupleExprContext(estate);
+		oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
+		ecxt->ecxt_scantuple = ExecInitExtraTupleSlot(estate, tupdesc, &TTSOpsHeapTuple);
+
+		ExecStoreHeapTuple(new_tuple ? new_tuple : old_tuple, ecxt->ecxt_scantuple, false);
+
+		foreach (lc, relentry->row_filter)
+		{
+			Node		*row_filter;
+			ExprState	*expr_state;
+			Expr		*expr;
+			Oid			expr_type;
+			Datum		res;
+			bool		isnull;
+
+			row_filter = (Node *) lfirst(lc);
+
+			/* evaluates row filter */
+			expr_type = exprType(row_filter);
+			expr = (Expr *) coerce_to_target_type(NULL, row_filter, expr_type, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1);
+			expr = expression_planner(expr);
+			expr_state = ExecInitExpr(expr, NULL);
+			res = ExecEvalExpr(expr_state, ecxt, &isnull);
+
+			/* if tuple does not match row filter, bail out */
+			if (!DatumGetBool(res) || isnull)
+			{
+				MemoryContextSwitchTo(oldcxt);
+				ExecDropSingleTupleTableSlot(ecxt->ecxt_scantuple);
+				FreeExecutorState(estate);
+				return;
+			}
+		}
+
+		MemoryContextSwitchTo(oldcxt);
+
+		ExecDropSingleTupleTableSlot(ecxt->ecxt_scantuple);
+		FreeExecutorState(estate);
+	}
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -570,10 +639,14 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		 */
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+		entry->row_filter = NIL;
 
 		foreach(lc, data->publications)
 		{
 			Publication *pub = lfirst(lc);
+			HeapTuple	rf_tuple;
+			Datum		rf_datum;
+			bool		rf_isnull;
 
 			if (pub->alltables || list_member_oid(pubids, pub->oid))
 			{
@@ -583,9 +656,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
 
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
+			/* Cache row filters, if available */
+			rf_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pub->oid));
+			if (HeapTupleIsValid(rf_tuple))
+			{
+				rf_datum = SysCacheGetAttr(PUBLICATIONRELMAP, rf_tuple, Anum_pg_publication_rel_prrowfilter, &rf_isnull);
+
+				if (!rf_isnull)
+				{
+					MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+					char	*s = TextDatumGetCString(rf_datum);
+					Node	*rf_node = stringToNode(s);
+					entry->row_filter = lappend(entry->row_filter, rf_node);
+					MemoryContextSwitchTo(oldctx);
+				}
+
+				ReleaseSysCache(rf_tuple);
+			}
 		}
 
 		list_free(pubids);
@@ -660,5 +747,10 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	 */
 	hash_seq_init(&status, RelationSyncCache);
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
 		entry->replicate_valid = false;
+		if (list_length(entry->row_filter) > 0)
+			list_free(entry->row_filter);
+		entry->row_filter = NIL;
+	}
 }
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 20a2f0ac1b..fe878c6957 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -78,6 +78,12 @@ typedef struct Publication
 	PublicationActions pubactions;
 } Publication;
 
+typedef struct PublicationRelationQual
+{
+	Relation	relation;
+	Node		*whereClause;
+} PublicationRelationQual;
+
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
 extern List *GetRelationPublications(Oid relid);
@@ -86,8 +92,8 @@ extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(void);
 
 extern bool is_publishable_relation(Relation rel);
-extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
-											  bool if_not_exists);
+extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationQual *targetrel,
+						 					  bool if_not_exists);
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
 extern char *get_publication_name(Oid pubid, bool missing_ok);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index 5f5bc92ab3..70ffef5dfd 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -28,9 +28,13 @@
  */
 CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 {
-	Oid			oid;			/* oid */
-	Oid			prpubid;		/* Oid of the publication */
-	Oid			prrelid;		/* Oid of the relation */
+	Oid				oid;			/* oid */
+	Oid				prpubid;		/* Oid of the publication */
+	Oid				prrelid;		/* Oid of the relation */
+
+#ifdef	CATALOG_VARLEN				/* variable-length fields start here */
+	pg_node_tree	prrowfilter;	/* nodeToString representation of row filter */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 3cbb08df92..7f83da1ee8 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -476,6 +476,7 @@ typedef enum NodeTag
 	T_PartitionRangeDatum,
 	T_PartitionCmd,
 	T_VacuumRelation,
+	T_PublicationTable,
 
 	/*
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 94ded3c135..359f773092 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3461,12 +3461,19 @@ typedef struct AlterTSConfigurationStmt
 } AlterTSConfigurationStmt;
 
 
+typedef struct PublicationTable
+{
+	NodeTag		type;
+	RangeVar	*relation;		/* relation to be published */
+	Node		*whereClause;	/* qualifications */
+} PublicationTable;
+
 typedef struct CreatePublicationStmt
 {
 	NodeTag		type;
 	char	   *pubname;		/* Name of the publication */
 	List	   *options;		/* List of DefElem nodes */
-	List	   *tables;			/* Optional list of tables to add */
+	List	   *tables;			/* Optional list of PublicationTable to add */
 	bool		for_all_tables; /* Special publication for all tables in db */
 } CreatePublicationStmt;
 
@@ -3479,7 +3486,7 @@ typedef struct AlterPublicationStmt
 	List	   *options;		/* List of DefElem nodes */
 
 	/* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
-	List	   *tables;			/* List of tables to add/drop */
+	List	   *tables;			/* List of PublicationTable to add/drop */
 	bool		for_all_tables; /* Special publication for all tables in db */
 	DefElemAction tableAction;	/* What action to perform with the tables */
 } AlterPublicationStmt;
diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h
index 7c099e7084..c2e8b9fcb9 100644
--- a/src/include/parser/parse_node.h
+++ b/src/include/parser/parse_node.h
@@ -73,6 +73,7 @@ typedef enum ParseExprKind
 	EXPR_KIND_CALL_ARGUMENT,	/* procedure argument in CALL */
 	EXPR_KIND_COPY_WHERE,		/* WHERE condition in COPY FROM */
 	EXPR_KIND_GENERATED_COLUMN, /* generation expression for a column */
+	EXPR_KIND_PUBLICATION_WHERE	/* WHERE condition for a table in PUBLICATION */
 } ParseExprKind;
 
 
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 3fc430af01..8dabc35791 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -50,6 +50,8 @@ typedef struct LogicalRepRelation
 	Oid		   *atttyps;		/* column types */
 	char		replident;		/* replica identity */
 	Bitmapset  *attkeys;		/* Bitmap of key columns */
+	char	  **rowfiltercond;	/* condition for row filtering */
+	int			nrowfilters;	/* number of row filters */
 } LogicalRepRelation;
 
 /* Type mapping info */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 2642a3f94e..5cc307ee0e 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -39,4 +39,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
 extern char *logicalrep_typmap_gettypname(Oid remoteid);
 
+extern EState *create_estate_for_relation(Relation rel);
+
 #endif							/* LOGICALRELATION_H */
diff --git a/src/test/regress/expected/misc_sanity.out b/src/test/regress/expected/misc_sanity.out
index 8538173ff8..68bbdf7016 100644
--- a/src/test/regress/expected/misc_sanity.out
+++ b/src/test/regress/expected/misc_sanity.out
@@ -107,5 +107,6 @@ ORDER BY 1, 2;
  pg_index                | indpred       | pg_node_tree
  pg_largeobject          | data          | bytea
  pg_largeobject_metadata | lomacl        | aclitem[]
-(11 rows)
+ pg_publication_rel      | prrowfilter   | pg_node_tree
+(12 rows)
 
diff --git a/src/test/subscription/t/010_row_filter.pl b/src/test/subscription/t/010_row_filter.pl
new file mode 100644
index 0000000000..6c174fa895
--- /dev/null
+++ b/src/test/subscription/t/010_row_filter.pl
@@ -0,0 +1,97 @@
+# Teste logical replication behavior with row filtering
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# setup structure on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_2 (c int primary key)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
+
+# setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_2 (c int primary key)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
+
+# setup logical replication
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_1 FOR TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')");
+
+my $result = $node_publisher->psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 DROP TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')");
+is($result, 3, "syntax error for ALTER PUBLICATION DROP TABLE");
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 ADD TABLE tab_rowfilter_2 WHERE (c % 2 = 0)");
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 SET TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered'), tab_rowfilter_2 WHERE (c % 2 = 0), tab_rowfilter_3");
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_2 FOR TABLE tab_rowfilter_2 WHERE (c % 3 = 0)");
+
+# test row filtering
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1, 'not replicated')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1500, 'filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1980, 'not filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) SELECT x, 'test ' || x FROM generate_series(990,1003) x");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_2 (c) SELECT generate_series(1, 10)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_3 (a, b) SELECT x, (x % 3 = 0) FROM generate_series(1, 10) x");
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+#$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab_rowfilter_1");
+is($result, qq(1980|not filtered
+1001|test 1001
+1002|test 1002
+1003|test 1003), 'check filtered data was copied to subscriber');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(c), min(c), max(c) FROM tab_rowfilter_2");
+is($result, qq(7|2|10), 'check filtered data was copied to subscriber');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(a) FROM tab_rowfilter_3");
+is($result, qq(10), 'check filtered data was copied to subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.19.1

From 19cb78c319e3755f412b3d622e68837ccaf05496 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Thu, 17 May 2018 20:52:28 +0000
Subject: [PATCH v2 6/9] Print publication WHERE condition in psql

---
 src/bin/psql/describe.c | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 774cc764ff..f103747901 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5875,7 +5875,8 @@ describePublications(const char *pattern)
 		if (!puballtables)
 		{
 			printfPQExpBuffer(&buf,
-							  "SELECT n.nspname, c.relname\n"
+							  "SELECT n.nspname, c.relname,\n"
+							  "  pg_get_expr(pr.prrowfilter, c.oid)\n"
 							  "FROM pg_catalog.pg_class c,\n"
 							  "     pg_catalog.pg_namespace n,\n"
 							  "     pg_catalog.pg_publication_rel pr\n"
@@ -5905,6 +5906,10 @@ describePublications(const char *pattern)
 								  PQgetvalue(tabres, j, 0),
 								  PQgetvalue(tabres, j, 1));
 
+				if (!PQgetisnull(tabres, j, 2))
+					appendPQExpBuffer(&buf, "  WHERE %s",
+									PQgetvalue(tabres, j, 2));
+
 				printTableAddFooter(&cont, buf.data);
 			}
 			PQclear(tabres);
-- 
2.19.1

From c773fc66047feed639b50648f86d9a5cb783fc07 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Sat, 15 Sep 2018 02:52:00 +0000
Subject: [PATCH v2 7/9] Publication where condition support for pg_dump

---
 src/bin/pg_dump/pg_dump.c | 15 +++++++++++++--
 src/bin/pg_dump/pg_dump.h |  1 +
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 34981401bf..91475bc5f8 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3959,6 +3959,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	int			i_tableoid;
 	int			i_oid;
 	int			i_pubname;
+	int			i_pubrelqual;
 	int			i,
 				j,
 				ntups;
@@ -3991,7 +3992,8 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 
 		/* Get the publication membership for the table. */
 		appendPQExpBuffer(query,
-						  "SELECT pr.tableoid, pr.oid, p.pubname "
+						  "SELECT pr.tableoid, pr.oid, p.pubname, "
+						  "pg_catalog.pg_get_expr(pr.prrowfilter, pr.prrelid) AS pubrelqual "
 						  "FROM pg_publication_rel pr, pg_publication p "
 						  "WHERE pr.prrelid = '%u'"
 						  "  AND p.oid = pr.prpubid",
@@ -4012,6 +4014,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 		i_tableoid = PQfnumber(res, "tableoid");
 		i_oid = PQfnumber(res, "oid");
 		i_pubname = PQfnumber(res, "pubname");
+		i_pubrelqual = PQfnumber(res, "pubrelqual");
 
 		pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
 
@@ -4027,6 +4030,11 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 			pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname));
 			pubrinfo[j].pubtable = tbinfo;
 
+			if (PQgetisnull(res, j, i_pubrelqual))
+				pubrinfo[j].pubrelqual = NULL;
+			else
+				pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, j, i_pubrelqual));
+
 			/* Decide whether we want to dump it */
 			selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout);
 		}
@@ -4055,8 +4063,11 @@ dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo)
 
 	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
 					  fmtId(pubrinfo->pubname));
-	appendPQExpBuffer(query, " %s;\n",
+	appendPQExpBuffer(query, " %s",
 					  fmtQualifiedDumpable(tbinfo));
+	if (pubrinfo->pubrelqual)
+		appendPQExpBuffer(query, " WHERE %s", pubrinfo->pubrelqual);
+	appendPQExpBufferStr(query, ";\n");
 
 	/*
 	 * There is no point in creating drop query as the drop is done by table
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index dfba58ac58..3ed2e1be9c 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -608,6 +608,7 @@ typedef struct _PublicationRelInfo
 	DumpableObject dobj;
 	TableInfo  *pubtable;
 	char	   *pubname;
+	char	   *pubrelqual;
 } PublicationRelInfo;
 
 /*
-- 
2.19.1

From 61a3e08edd5c3ce3cbcc750a2c074922afd6676d Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Wed, 14 Mar 2018 00:53:17 +0000
Subject: [PATCH v2 8/9] Debug for row filtering

---
 src/backend/commands/publicationcmds.c      | 11 ++++
 src/backend/replication/logical/tablesync.c |  1 +
 src/backend/replication/pgoutput/pgoutput.c | 66 +++++++++++++++++++++
 3 files changed, 78 insertions(+)

diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index bc7f9210e9..8e107bddfb 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -341,6 +341,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 	List	   *rels = NIL;
 	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
 	Oid			pubid = pubform->oid;
+	ListCell   *xpto;
 
 	/* Check that user is allowed to manipulate the publication tables. */
 	if (pubform->puballtables)
@@ -352,6 +353,16 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	Assert(list_length(stmt->tables) > 0);
 
+	foreach(xpto, stmt->tables)
+	{
+		PublicationTable *t = lfirst(xpto);
+
+		if (t->whereClause == NULL)
+			elog(DEBUG3, "publication \"%s\" has no WHERE clause", NameStr(pubform->pubname));
+		else
+			elog(DEBUG3, "publication \"%s\" has WHERE clause", NameStr(pubform->pubname));
+	}
+
 	/*
 	 * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause.  Use
 	 * publication_table_list node (that accepts a WHERE clause) but forbid the
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index fc37f74e89..c86affad03 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -871,6 +871,7 @@ copy_table(Relation rel)
 		appendStringInfo(&cmd, "COPY %s TO STDOUT",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
 	}
+	elog(DEBUG2, "COPY for initial synchronization: %s", cmd.data);
 	res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e9646ac483..5012cfdde7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -34,6 +34,7 @@
 #include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/int8.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
@@ -323,6 +324,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 
+	Form_pg_class	class_form;
+	char			*schemaname;
+	char			*tablename;
+
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -347,6 +352,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	class_form = RelationGetForm(relation);
+	schemaname = get_namespace_name(class_form->relnamespace);
+	tablename = NameStr(class_form->relname);
+
+	if (change->action == REORDER_BUFFER_CHANGE_INSERT)
+		elog(DEBUG1, "INSERT \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+	else if (change->action == REORDER_BUFFER_CHANGE_UPDATE)
+		elog(DEBUG1, "UPDATE \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+	else if (change->action == REORDER_BUFFER_CHANGE_DELETE)
+		elog(DEBUG1, "DELETE \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+
 	/* ... then check row filter */
 	if (list_length(relentry->row_filter) > 0)
 	{
@@ -363,6 +379,42 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		tupdesc = RelationGetDescr(relation);
 		estate = create_estate_for_relation(relation);
 
+#ifdef	_NOT_USED
+		if (old_tuple)
+		{
+			int i;
+
+			for (i = 0; i < tupdesc->natts; i++)
+			{
+				Form_pg_attribute	attr;
+				HeapTuple			type_tuple;
+				Oid					typoutput;
+				bool				typisvarlena;
+				bool				isnull;
+				Datum				val;
+				char				*outputstr = NULL;
+
+				attr = TupleDescAttr(tupdesc, i);
+
+				/* Figure out type name */
+				type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(attr->atttypid));
+				if (HeapTupleIsValid(type_tuple))
+				{
+					/* Get information needed for printing values of a type */
+					getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
+
+					val = heap_getattr(old_tuple, i + 1, tupdesc, &isnull);
+					if (!isnull)
+					{
+						outputstr = OidOutputFunctionCall(typoutput, val);
+						elog(DEBUG2, "row filter: REPLICA IDENTITY %s: %s", NameStr(attr->attname), outputstr);
+						pfree(outputstr);
+					}
+				}
+			}
+		}
+#endif
+
 		/* prepare context per tuple */
 		ecxt = GetPerTupleExprContext(estate);
 		oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
@@ -378,6 +430,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Oid			expr_type;
 			Datum		res;
 			bool		isnull;
+			char		*s = NULL;
 
 			row_filter = (Node *) lfirst(lc);
 
@@ -388,14 +441,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			expr_state = ExecInitExpr(expr, NULL);
 			res = ExecEvalExpr(expr_state, ecxt, &isnull);
 
+			elog(DEBUG3, "row filter: result: %s ; isnull: %s", (DatumGetBool(res)) ? "true" : "false", (isnull) ? "true" : "false");
+
 			/* if tuple does not match row filter, bail out */
 			if (!DatumGetBool(res) || isnull)
 			{
+				s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(row_filter)), ObjectIdGetDatum(relentry->relid)));
+				elog(DEBUG2, "row filter \"%s\" was not matched", s);
+				pfree(s);
+
 				MemoryContextSwitchTo(oldcxt);
 				ExecDropSingleTupleTableSlot(ecxt->ecxt_scantuple);
 				FreeExecutorState(estate);
 				return;
 			}
+
+			s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(row_filter)), ObjectIdGetDatum(relentry->relid)));
+			elog(DEBUG2, "row filter \"%s\" was matched", s);
+			pfree(s);
 		}
 
 		MemoryContextSwitchTo(oldcxt);
@@ -666,9 +729,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				{
 					MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext);
 					char	*s = TextDatumGetCString(rf_datum);
+					char	*t = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, rf_datum, ObjectIdGetDatum(entry->relid)));
 					Node	*rf_node = stringToNode(s);
 					entry->row_filter = lappend(entry->row_filter, rf_node);
 					MemoryContextSwitchTo(oldctx);
+
+					elog(DEBUG2, "row filter \"%s\" found for publication \"%s\" and relation \"%s\"", t, pub->name, get_rel_name(relid));
 				}
 
 				ReleaseSysCache(rf_tuple);
-- 
2.19.1

From 59d97603c8b0ecc20a7f04acee2e123fb1a26265 Mon Sep 17 00:00:00 2001
From: Alexey Kondratov <kondratov.alek...@gmail.com>
Date: Sun, 25 Aug 2019 16:21:32 +0300
Subject: [PATCH v2 9/9] Add simple BDR test for row filtering

---
 .../{010_row_filter.pl => 013_row_filter.pl}  |   0
 src/test/subscription/t/014_simple_bdr.pl     | 194 ++++++++++++++++++
 2 files changed, 194 insertions(+)
 rename src/test/subscription/t/{010_row_filter.pl => 013_row_filter.pl} (100%)
 create mode 100644 src/test/subscription/t/014_simple_bdr.pl

diff --git a/src/test/subscription/t/010_row_filter.pl b/src/test/subscription/t/013_row_filter.pl
similarity index 100%
rename from src/test/subscription/t/010_row_filter.pl
rename to src/test/subscription/t/013_row_filter.pl
diff --git a/src/test/subscription/t/014_simple_bdr.pl b/src/test/subscription/t/014_simple_bdr.pl
new file mode 100644
index 0000000000..f33b754d2a
--- /dev/null
+++ b/src/test/subscription/t/014_simple_bdr.pl
@@ -0,0 +1,194 @@
+# Test simple bidirectional logical replication behavior with row filtering
+# ID is meant to be something like uuid (e.g. from pgcrypto), but integer
+# type is used for simplicity.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 10;
+
+our $node_cloud;
+our $node_remote;
+our $cloud_appname = 'cloud_sub';
+our $remote_appname = 'remote_sub';
+
+sub check_data_consistency
+{
+	my $test_name = shift;
+	my $query = shift;
+	my $true_result = shift;
+	my $result;
+
+	$node_cloud->wait_for_catchup($remote_appname);
+	$node_remote->wait_for_catchup($cloud_appname);
+
+	$result =
+	$node_remote->safe_psql('postgres', $query);
+	is($result, $true_result, $test_name . ' on remote');
+	$result =
+	$node_cloud->safe_psql('postgres', $query);
+	is($result, $true_result, $test_name . ' on cloud');
+
+	return;
+}
+
+# Create cloud node
+$node_cloud = get_new_node('publisher');
+$node_cloud->init(allows_streaming => 'logical');
+$node_cloud->start;
+
+# Create remote node
+$node_remote = get_new_node('subscriber');
+$node_remote->init(allows_streaming => 'logical');
+$node_remote->start;
+
+# Test tables
+my $users_table = "CREATE TABLE users (
+						id integer primary key,
+						name text,
+						is_cloud boolean
+					);";
+my $docs_table = "CREATE TABLE docs (
+						id integer primary key,
+						user_id integer,
+						FOREIGN KEY (user_id) REFERENCES users (id),
+						content text,
+						is_cloud boolean
+					);";
+
+# Setup structure on cloud server
+$node_cloud->safe_psql('postgres', $users_table);
+
+# Setup structure on remote server
+$node_remote->safe_psql('postgres', $users_table);
+
+# Put in initial data
+$node_cloud->safe_psql('postgres',
+	"INSERT INTO users (id, name, is_cloud) VALUES (1, 'user1_on_cloud', TRUE);");
+$node_remote->safe_psql('postgres',
+	"INSERT INTO users (id, name, is_cloud) VALUES (2, 'user2_on_remote', FALSE);");
+$node_remote->safe_psql('postgres',
+	"INSERT INTO users (id, name, is_cloud) VALUES (100, 'user100_local_on_remote', TRUE);");
+
+# Setup logical replication
+$node_cloud->safe_psql('postgres', "CREATE PUBLICATION cloud;");
+$node_cloud->safe_psql('postgres', "ALTER PUBLICATION cloud ADD TABLE users WHERE (is_cloud IS TRUE);");
+
+$node_remote->safe_psql('postgres', "CREATE PUBLICATION remote;");
+$node_remote->safe_psql('postgres', "ALTER PUBLICATION remote ADD TABLE users WHERE (is_cloud IS FALSE);");
+
+my $cloud_connstr = $node_cloud->connstr . ' dbname=postgres';
+$node_remote->safe_psql('postgres',
+	"CREATE SUBSCRIPTION cloud_to_remote CONNECTION '$cloud_connstr application_name=$remote_appname' PUBLICATION cloud"
+);
+
+my $remote_connstr = $node_remote->connstr . ' dbname=postgres';
+$node_cloud->safe_psql('postgres',
+	"CREATE SUBSCRIPTION remote_to_cloud CONNECTION '$remote_connstr application_name=$cloud_appname' PUBLICATION remote"
+);
+
+# Wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_remote->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for remote to synchronize data";
+$node_cloud->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for cloud to synchronize data";
+
+$node_cloud->wait_for_catchup($remote_appname);
+$node_remote->wait_for_catchup($cloud_appname);
+
+# Test initial table sync
+my $result =
+  $node_remote->safe_psql('postgres', "SELECT count(*) from users");
+is($result, qq(3), 'check initial table sync on remote');
+$result =
+  $node_cloud->safe_psql('postgres', "SELECT count(*) from users");
+is($result, qq(2), 'check initial table sync on cloud');
+
+# Test BDR
+$node_cloud->safe_psql('postgres',
+	"INSERT INTO users (id, name, is_cloud) VALUES (3, 'user3_on_cloud', TRUE);");
+$node_cloud->safe_psql('postgres',
+	"INSERT INTO users (id, name, is_cloud) VALUES (4, 'user4_on_cloud', TRUE);");
+$node_remote->safe_psql('postgres',
+	"INSERT INTO users (id, name, is_cloud) VALUES (5, 'user5_on_remote', FALSE);");
+
+$node_cloud->wait_for_catchup($remote_appname);
+$node_remote->wait_for_catchup($cloud_appname);
+
+$result =
+  $node_remote->safe_psql('postgres', "SELECT id, name, is_cloud FROM users ORDER BY id;");
+is($result, qq(1|user1_on_cloud|t
+2|user2_on_remote|f
+3|user3_on_cloud|t
+4|user4_on_cloud|t
+5|user5_on_remote|f
+100|user100_local_on_remote|t), 'check users on remote');
+$result =
+  $node_cloud->safe_psql('postgres', "SELECT id, name, is_cloud FROM users ORDER BY id;");
+is($result, qq(1|user1_on_cloud|t
+2|user2_on_remote|f
+3|user3_on_cloud|t
+4|user4_on_cloud|t
+5|user5_on_remote|f), 'check users on cloud');
+
+# Add table to cloud server
+$node_cloud->safe_psql('postgres', $docs_table);
+
+# Add table to remote server
+$node_remote->safe_psql('postgres', $docs_table);
+
+# Put in initial data
+$node_cloud->safe_psql('postgres',
+	"INSERT INTO docs (id, user_id, content, is_cloud) VALUES (1, 3, 'user3__doc1_on_cloud', TRUE);");
+
+# Add table to publication
+$node_cloud->safe_psql('postgres', "ALTER PUBLICATION cloud ADD TABLE docs WHERE (is_cloud IS TRUE);");
+$node_remote->safe_psql('postgres', "ALTER PUBLICATION remote ADD TABLE docs WHERE (is_cloud IS FALSE);");
+
+# Refresh
+$node_cloud->safe_psql('postgres', "ALTER SUBSCRIPTION remote_to_cloud REFRESH PUBLICATION;");
+$node_remote->safe_psql('postgres', "ALTER SUBSCRIPTION cloud_to_remote REFRESH PUBLICATION;");
+
+# Test BDR on new table
+$node_cloud->safe_psql('postgres',
+	"INSERT INTO docs (id, user_id, content, is_cloud) VALUES (2, 3, 'user3__doc2_on_cloud', TRUE);");
+$node_remote->safe_psql('postgres',
+	"INSERT INTO docs (id, user_id, content, is_cloud) VALUES (3, 3, 'user3__doc3_on_remote', FALSE);");
+
+check_data_consistency(
+	'check docs after insert',
+	"SELECT id, user_id, content, is_cloud FROM docs WHERE user_id = 3 ORDER BY id;",
+	qq(1|3|user3__doc1_on_cloud|t
+2|3|user3__doc2_on_cloud|t
+3|3|user3__doc3_on_remote|f)
+);
+
+# Test update of remote doc on cloud and vice versa
+$node_cloud->safe_psql('postgres',
+	"UPDATE docs SET content = 'user3__doc3_on_remote__updated', is_cloud = TRUE WHERE id = 3;");
+$node_remote->safe_psql('postgres',
+	"UPDATE docs SET content = 'user3__doc2_on_cloud__to_be_deleted', is_cloud = FALSE WHERE id = 2;");
+
+check_data_consistency(
+	'check docs after update',
+	"SELECT id, user_id, content, is_cloud FROM docs WHERE user_id = 3 ORDER BY id;",
+	qq(1|3|user3__doc1_on_cloud|t
+2|3|user3__doc2_on_cloud__to_be_deleted|f
+3|3|user3__doc3_on_remote__updated|t)
+);
+
+# Test delete
+$node_remote->safe_psql('postgres',
+	"DELETE FROM docs WHERE id = 2;");
+
+check_data_consistency(
+	'check docs after delete',
+	"SELECT id, user_id, content, is_cloud FROM docs WHERE user_id = 3 ORDER BY id;",
+	qq(1|3|user3__doc1_on_cloud|t
+3|3|user3__doc3_on_remote__updated|t)
+);
+
+$node_remote->stop('fast');
+$node_cloud->stop('fast');
-- 
2.19.1

Reply via email to