On Mon, Mar 16, 2020, at 10:58 AM, David Steele wrote: > Please submit to a future CF when a new patch is available. Hi,
This is another version of the row filter patch. Patch summary: 0001: refactor to remove dead code 0002: grammar refactor for row filter 0003: core code, documentation, and tests 0004: psql code 0005: pg_dump support 0006: debug messages (only for test purposes) 0007: measure row filter overhead (only for test purposes) >From the previous version I incorporated Amit's suggestions [1], improve >documentation and tests. I refactored to code to make it simple to read (break >the row filter code into functions). This new version covers the new parameter >publish_via_partition_root that was introduced (cf 83fd4532a7). Regarding function prohibition, I wouldn't like to open a can of worms (see previous discussions in this thread). Simple expressions covers most of the use cases that I worked with until now. This prohibition can be removed in another patch after some careful analysis. I did some limited tests and didn't observe some excessive CPU usage while testing this patch tough I agree with Andres that retain some expression context into a cache would certainly speed up this piece of code. I measured the row filter overhead in my i7 (see 0007) and got: mean: 92.49 us stddev: 32.63 us median: 83.45 us min-max: [11.13 .. 2731.55] us percentile(95): 117.76 us [1] https://www.postgresql.org/message-id/CA%2BHiwqG3Jz-cRS%3D4gqXmZDjDAi%3D%3D19GvrFCCqAawwHcOCEn4fQ%40mail.gmail.com -- Euler Taveira EnterpriseDB: https://www.enterprisedb.com/
From 78aa13f958d883f52ef0a9796536adf06cd58273 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Mon, 18 Jan 2021 11:13:23 -0300 Subject: [PATCH 1/7] Remove unused column from initial table synchronization Column atttypmod was added in the commit 7c4f52409a, but it is not used. The removal is safe because COPY from publisher does not need such information. --- src/backend/replication/logical/tablesync.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 863d196fd7..a18f847ade 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -640,7 +640,7 @@ fetch_remote_table_info(char *nspname, char *relname, StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; - Oid attrRow[] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; + Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; bool isnull; int natt; @@ -685,7 +685,6 @@ fetch_remote_table_info(char *nspname, char *relname, appendStringInfo(&cmd, "SELECT a.attname," " a.atttypid," - " a.atttypmod," " a.attnum = ANY(i.indkey)" " FROM pg_catalog.pg_attribute a" " LEFT JOIN pg_catalog.pg_index i" @@ -718,7 +717,7 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); Assert(!isnull); - if (DatumGetBool(slot_getattr(slot, 4, &isnull))) + if (DatumGetBool(slot_getattr(slot, 3, &isnull))) lrel->attkeys = bms_add_member(lrel->attkeys, natt); /* Should never happen. */ -- 2.20.1
From 13917594bd781cd614875498eacec48ce1d64537 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Mon, 18 Jan 2021 11:53:34 -0300 Subject: [PATCH 2/7] Rename a WHERE node A WHERE clause will be used for row filtering in logical replication. We already have a similar node: 'WHERE (condition here)'. Let's rename the node to a generic name and use it for row filtering too. --- src/backend/parser/gram.y | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index b2f447bf9a..793aac5377 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -485,7 +485,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> def_arg columnElem where_clause where_or_current_clause a_expr b_expr c_expr AexprConst indirection_el opt_slice_bound columnref in_expr having_clause func_table xmltable array_expr - ExclusionWhereClause operator_def_arg + OptWhereClause operator_def_arg %type <list> rowsfrom_item rowsfrom_list opt_col_def_list %type <boolean> opt_ordinality %type <list> ExclusionConstraintList ExclusionConstraintElem @@ -3805,7 +3805,7 @@ ConstraintElem: $$ = (Node *)n; } | EXCLUDE access_method_clause '(' ExclusionConstraintList ')' - opt_c_include opt_definition OptConsTableSpace ExclusionWhereClause + opt_c_include opt_definition OptConsTableSpace OptWhereClause ConstraintAttributeSpec { Constraint *n = makeNode(Constraint); @@ -3907,7 +3907,7 @@ ExclusionConstraintElem: index_elem WITH any_operator } ; -ExclusionWhereClause: +OptWhereClause: WHERE '(' a_expr ')' { $$ = $3; } | /*EMPTY*/ { $$ = NULL; } ; -- 2.20.1
From 4bb2622ba46407989133d8e2766bcac513635a51 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Mon, 18 Jan 2021 12:07:51 -0300 Subject: [PATCH 3/7] Row filter for logical replication This feature adds row filter for publication tables. When you define or modify a publication you can optionally filter rows that does not satisfy a WHERE condition. It allows you to partially replicate a database or set of tables. The row filter is per table which means that you can define different row filters for different tables. A new row filter can be added simply by informing the WHERE clause after the table name. The WHERE expression must be enclosed by parentheses. The WHERE clause should contain only columns that are part of the primary key or that are covered by REPLICA IDENTITY. Otherwise, UPDATEs and DELETEs won't be replicated. For simplicity, functions are not allowed; it could possibly be addressed in another patch. If you choose to do the initial table synchronization, only data that satisfies the row filters is sent. If the subscription has several publications in which a table has been published with different WHERE clauses, rows must satisfy all expressions to be copied. If your publication contains partitioned table, the parameter publish_via_partition_root determines if it uses the partition row filter (if the parameter is false -- default) or the partitioned table row filter. --- doc/src/sgml/catalogs.sgml | 8 + doc/src/sgml/ref/alter_publication.sgml | 11 +- doc/src/sgml/ref/create_publication.sgml | 37 +++- doc/src/sgml/ref/create_subscription.sgml | 8 +- src/backend/catalog/pg_publication.c | 110 +++++++++++- src/backend/commands/publicationcmds.c | 96 ++++++---- src/backend/parser/gram.y | 28 ++- src/backend/parser/parse_agg.c | 10 ++ src/backend/parser/parse_expr.c | 13 ++ src/backend/parser/parse_func.c | 3 + src/backend/replication/logical/tablesync.c | 93 +++++++++- src/backend/replication/logical/worker.c | 14 +- src/backend/replication/pgoutput/pgoutput.c | 179 ++++++++++++++++-- src/include/catalog/pg_publication.h | 10 +- src/include/catalog/pg_publication_rel.h | 6 + src/include/nodes/nodes.h | 1 + src/include/nodes/parsenodes.h | 11 +- src/include/parser/parse_node.h | 1 + src/include/replication/logicalrelation.h | 2 + src/test/regress/expected/publication.out | 32 ++++ src/test/regress/sql/publication.sql | 23 +++ src/test/subscription/t/020_row_filter.pl | 190 ++++++++++++++++++++ 22 files changed, 801 insertions(+), 85 deletions(-) create mode 100644 src/test/subscription/t/020_row_filter.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 865e826fb0..1ea4076219 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -5493,6 +5493,14 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l are simple references. </para></entry> </row> + + <row> + <entry><structfield>prqual</structfield></entry> + <entry><type>pg_node_tree</type></entry> + <entry></entry> + <entry>Expression tree (in <function>nodeToString()</function> + representation) for the relation's qualifying condition</entry> + </row> </tbody> </tgroup> </table> diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index faa114b2c6..ca091aae33 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -21,8 +21,8 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> -ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] -ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER } @@ -92,7 +92,12 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r table name, only that table is affected. If <literal>ONLY</literal> is not specified, the table and all its descendant tables (if any) are affected. Optionally, <literal>*</literal> can be specified after the table - name to explicitly indicate that descendant tables are included. + name to explicitly indicate that descendant tables are included. If the + optional <literal>WHERE</literal> clause is specified, rows that do not + satisfy the <replaceable class="parameter">expression</replaceable> will + not be published. Note that parentheses are required around the + expression. The <replaceable class="parameter">expression</replaceable> + is executed with the role used for the replication connection. </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index ff82fbca55..5253037155 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -22,7 +22,7 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> CREATE PUBLICATION <replaceable class="parameter">name</replaceable> - [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] + [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...] | FOR ALL TABLES ] [ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] </synopsis> @@ -71,6 +71,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> This does not apply to a partitioned table, however. The partitions of a partitioned table are always implicitly considered part of the publication, so they are never explicitly added to the publication. + If the optional <literal>WHERE</literal> clause is specified, rows that do + not satisfy the <replaceable class="parameter">expression</replaceable> + will not be published. Note that parentheses are required around the + expression. </para> <para> @@ -131,9 +135,15 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> on its partitions) contained in the publication will be published using the identity and schema of the partitioned table rather than that of the individual partitions that are actually changed; the - latter is the default. Enabling this allows the changes to be - replicated into a non-partitioned table or a partitioned table - consisting of a different set of partitions. + latter is the default (<literal>false</literal>). Enabling this + allows the changes to be replicated into a non-partitioned table or a + partitioned table consisting of a different set of partitions. + </para> + + <para> + If this parameter is <literal>false</literal>, it uses the + <literal>WHERE</literal> clause from the partition; otherwise,the + <literal>WHERE</literal> clause from the partitioned table is used. </para> <para> @@ -182,6 +192,13 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> disallowed on those tables. </para> + <para> + Columns used in the <literal>WHERE</literal> clause must be part of the + primary key or be covered by <literal>REPLICA IDENTITY</literal> otherwise + <command>UPDATE</command> and <command>DELETE</command> operations will not + be replicated. + </para> + <para> For an <command>INSERT ... ON CONFLICT</command> command, the publication will publish the operation that actually results from the command. So depending @@ -197,6 +214,11 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> <para> <acronym>DDL</acronym> operations are not published. </para> + + <para> + The <literal>WHERE</literal> clause expression is executed with the role used + for the replication connection. + </para> </refsect1> <refsect1> @@ -209,6 +231,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments; </programlisting> </para> + <para> + Create a publication that publishes all changes from active departments: +<programlisting> +CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE); +</programlisting> + </para> + <para> Create a publication that publishes all changes in all tables: <programlisting> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index e812beee37..b8f4ea5603 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -102,7 +102,13 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl <para> Specifies whether the existing data in the publications that are being subscribed to should be copied once the replication starts. - The default is <literal>true</literal>. + The default is <literal>true</literal>. If any table in the + publications has a <literal>WHERE</literal> clause, rows that do not + satisfy the <replaceable class="parameter">expression</replaceable> + will not be copied. If the subscription has several publications in + which a table has been published with different + <literal>WHERE</literal> clauses, rows must satisfy all expressions + to be copied. </para> </listitem> </varlistentry> diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 84d2efcfd2..fd9549a630 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -33,6 +33,11 @@ #include "catalog/pg_type.h" #include "funcapi.h" #include "miscadmin.h" + +#include "parser/parse_clause.h" +#include "parser/parse_collate.h" +#include "parser/parse_relation.h" + #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" @@ -141,18 +146,20 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS) * Insert new publication / relation mapping. */ ObjectAddress -publication_add_relation(Oid pubid, Relation targetrel, +publication_add_relation(Oid pubid, PublicationRelationQual *targetrel, bool if_not_exists) { Relation rel; HeapTuple tup; Datum values[Natts_pg_publication_rel]; bool nulls[Natts_pg_publication_rel]; - Oid relid = RelationGetRelid(targetrel); Oid prrelid; Publication *pub = GetPublication(pubid); ObjectAddress myself, referenced; + ParseState *pstate; + ParseNamespaceItem *nsitem; + Node *whereclause; rel = table_open(PublicationRelRelationId, RowExclusiveLock); @@ -161,7 +168,7 @@ publication_add_relation(Oid pubid, Relation targetrel, * duplicates, it's here just to provide nicer error message in common * case. The real protection is the unique key on the catalog. */ - if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), + if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(targetrel->relid), ObjectIdGetDatum(pubid))) { table_close(rel, RowExclusiveLock); @@ -172,10 +179,27 @@ publication_add_relation(Oid pubid, Relation targetrel, ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("relation \"%s\" is already member of publication \"%s\"", - RelationGetRelationName(targetrel), pub->name))); + RelationGetRelationName(targetrel->relation), pub->name))); } - check_publication_add_relation(targetrel); + check_publication_add_relation(targetrel->relation); + + /* Set up a pstate to parse with */ + pstate = make_parsestate(NULL); + pstate->p_sourcetext = nodeToString(targetrel->whereClause); + + nsitem = addRangeTableEntryForRelation(pstate, targetrel->relation, + AccessShareLock, + NULL, false, false); + addNSItemToQuery(pstate, nsitem, false, true, true); + + whereclause = transformWhereClause(pstate, + copyObject(targetrel->whereClause), + EXPR_KIND_PUBLICATION_WHERE, + "PUBLICATION"); + + /* Fix up collation information */ + assign_expr_collations(pstate, whereclause); /* Form a tuple. */ memset(values, 0, sizeof(values)); @@ -187,7 +211,13 @@ publication_add_relation(Oid pubid, Relation targetrel, values[Anum_pg_publication_rel_prpubid - 1] = ObjectIdGetDatum(pubid); values[Anum_pg_publication_rel_prrelid - 1] = - ObjectIdGetDatum(relid); + ObjectIdGetDatum(targetrel->relid); + + /* Add qualifications, if available */ + if (whereclause) + values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(whereclause)); + else + nulls[Anum_pg_publication_rel_prqual - 1] = true; tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -202,14 +232,20 @@ publication_add_relation(Oid pubid, Relation targetrel, recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); /* Add dependency on the relation */ - ObjectAddressSet(referenced, RelationRelationId, relid); + ObjectAddressSet(referenced, RelationRelationId, targetrel->relid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + /* Add dependency on the objects mentioned in the qualifications */ + if (whereclause) + recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL); + + free_parsestate(pstate); + /* Close the table. */ table_close(rel, RowExclusiveLock); /* Invalidate relcache so that publication info is rebuilt. */ - CacheInvalidateRelcache(targetrel); + CacheInvalidateRelcache(targetrel->relation); return myself; } @@ -304,6 +340,64 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) return result; } +/* + * Gets list of PublicationRelationQuals for a publication. + */ +List * +GetPublicationRelationQuals(Oid pubid) +{ + List *result; + Relation pubrelsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all publications associated with the relation. */ + pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_rel_prpubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId, + true, NULL, 1, &scankey); + + result = NIL; + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_rel pubrel; + PublicationRelationQual *prq; + Datum value_datum; + char *qual_value; + Node *qual_expr; + bool isnull; + + pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + + value_datum = heap_getattr(tup, Anum_pg_publication_rel_prqual, RelationGetDescr(pubrelsrel), &isnull); + if (!isnull) + { + qual_value = TextDatumGetCString(value_datum); + qual_expr = (Node *) stringToNode(qual_value); + } + else + qual_expr = NULL; + + prq = palloc(sizeof(PublicationRelationQual)); + prq->relid = pubrel->prrelid; + /* table will be opened in AlterPublicationTables */ + prq->relation = NULL; + prq->whereClause = qual_expr; + result = lappend(result, prq); + } + + systable_endscan(scan); + table_close(pubrelsrel, AccessShareLock); + + return result; +} + /* * Gets list of publication oids for publications marked as FOR ALL TABLES. */ diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 95c253c8e0..a5eccbbfb5 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -372,6 +372,28 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, Assert(list_length(stmt->tables) > 0); + /* + * Although ALTER PUBLICATION grammar allows WHERE clause to be specified + * for DROP TABLE action, it doesn't make sense to allow it. We implement + * this restriction here, instead of complicating the grammar to enforce + * it. + */ + if (stmt->tableAction == DEFELEM_DROP) + { + ListCell *lc; + + foreach(lc, stmt->tables) + { + PublicationTable *t = lfirst(lc); + + if (t->whereClause) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use a WHERE clause when removing table from publication \"%s\"", + NameStr(pubform->pubname)))); + } + } + rels = OpenTableList(stmt->tables); if (stmt->tableAction == DEFELEM_ADD) @@ -389,27 +411,22 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, foreach(oldlc, oldrelids) { Oid oldrelid = lfirst_oid(oldlc); - ListCell *newlc; - bool found = false; + PublicationRelationQual *oldrel; - foreach(newlc, rels) - { - Relation newrel = (Relation) lfirst(newlc); + /* + * Remove all publication-table mappings. We could possibly + * remove (i) tables that are not found in the new table list and + * (ii) tables that are being re-added with a different qual + * expression. For (ii), simply updating the existing tuple is not + * enough, because of qual expression dependencies. + */ + oldrel = palloc(sizeof(PublicationRelationQual)); + oldrel->relid = oldrelid; + oldrel->whereClause = NULL; + oldrel->relation = table_open(oldrel->relid, + ShareUpdateExclusiveLock); - if (RelationGetRelid(newrel) == oldrelid) - { - found = true; - break; - } - } - - if (!found) - { - Relation oldrel = table_open(oldrelid, - ShareUpdateExclusiveLock); - - delrels = lappend(delrels, oldrel); - } + delrels = lappend(delrels, oldrel); } /* And drop them. */ @@ -509,13 +526,15 @@ OpenTableList(List *tables) List *relids = NIL; List *rels = NIL; ListCell *lc; + PublicationRelationQual *prq; /* * Open, share-lock, and check all the explicitly-specified relations */ foreach(lc, tables) { - RangeVar *rv = castNode(RangeVar, lfirst(lc)); + PublicationTable *t = lfirst(lc); + RangeVar *rv = castNode(RangeVar, t->relation); bool recurse = rv->inh; Relation rel; Oid myrelid; @@ -538,8 +557,11 @@ OpenTableList(List *tables) table_close(rel, ShareUpdateExclusiveLock); continue; } - - rels = lappend(rels, rel); + prq = palloc(sizeof(PublicationRelationQual)); + prq->relid = myrelid; + prq->relation = rel; + prq->whereClause = t->whereClause; + rels = lappend(rels, prq); relids = lappend_oid(relids, myrelid); /* @@ -572,7 +594,12 @@ OpenTableList(List *tables) /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); - rels = lappend(rels, rel); + prq = palloc(sizeof(PublicationRelationQual)); + prq->relid = childrelid; + prq->relation = rel; + /* child inherits WHERE clause from parent */ + prq->whereClause = t->whereClause; + rels = lappend(rels, prq); relids = lappend_oid(relids, childrelid); } } @@ -593,10 +620,12 @@ CloseTableList(List *rels) foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationQual *prq = (PublicationRelationQual *) lfirst(lc); - table_close(rel, NoLock); + table_close(prq->relation, NoLock); } + + list_free_deep(rels); } /* @@ -612,15 +641,15 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationQual *prq = (PublicationRelationQual *) lfirst(lc); ObjectAddress obj; /* Must be owner of the table or superuser. */ - if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), - RelationGetRelationName(rel)); + if (!pg_class_ownercheck(prq->relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(prq->relation->rd_rel->relkind), + RelationGetRelationName(prq->relation)); - obj = publication_add_relation(pubid, rel, if_not_exists); + obj = publication_add_relation(pubid, prq, if_not_exists); if (stmt) { EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, @@ -644,11 +673,10 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); - Oid relid = RelationGetRelid(rel); + PublicationRelationQual *prq = (PublicationRelationQual *) lfirst(lc); prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, - ObjectIdGetDatum(relid), + ObjectIdGetDatum(prq->relid), ObjectIdGetDatum(pubid)); if (!OidIsValid(prid)) { @@ -658,7 +686,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("relation \"%s\" is not part of the publication", - RelationGetRelationName(rel)))); + RelationGetRelationName(prq->relation)))); } ObjectAddressSet(obj, PublicationRelRelationId, prid); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 793aac5377..39088f1c83 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -416,12 +416,12 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list + drop_option_list publication_table_list %type <list> group_by_list %type <node> group_by_item empty_grouping_set rollup_clause cube_clause %type <node> grouping_sets_clause -%type <node> opt_publication_for_tables publication_for_tables +%type <node> opt_publication_for_tables publication_for_tables publication_table_elem %type <list> opt_fdw_options fdw_options %type <defelt> fdw_option @@ -3805,7 +3805,7 @@ ConstraintElem: $$ = (Node *)n; } | EXCLUDE access_method_clause '(' ExclusionConstraintList ')' - opt_c_include opt_definition OptConsTableSpace OptWhereClause + opt_c_include opt_definition OptConsTableSpace OptWhereClause ConstraintAttributeSpec { Constraint *n = makeNode(Constraint); @@ -9498,7 +9498,7 @@ opt_publication_for_tables: ; publication_for_tables: - FOR TABLE relation_expr_list + FOR TABLE publication_table_list { $$ = (Node *) $3; } @@ -9529,7 +9529,7 @@ AlterPublicationStmt: n->options = $5; $$ = (Node *)n; } - | ALTER PUBLICATION name ADD_P TABLE relation_expr_list + | ALTER PUBLICATION name ADD_P TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9537,7 +9537,7 @@ AlterPublicationStmt: n->tableAction = DEFELEM_ADD; $$ = (Node *)n; } - | ALTER PUBLICATION name SET TABLE relation_expr_list + | ALTER PUBLICATION name SET TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9545,7 +9545,7 @@ AlterPublicationStmt: n->tableAction = DEFELEM_SET; $$ = (Node *)n; } - | ALTER PUBLICATION name DROP TABLE relation_expr_list + | ALTER PUBLICATION name DROP TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9555,6 +9555,20 @@ AlterPublicationStmt: } ; +publication_table_list: + publication_table_elem { $$ = list_make1($1); } + | publication_table_list ',' publication_table_elem { $$ = lappend($1, $3); } + ; + +publication_table_elem: relation_expr OptWhereClause + { + PublicationTable *n = makeNode(PublicationTable); + n->relation = $1; + n->whereClause = $2; + $$ = (Node *) n; + } + ; + /***************************************************************************** * * CREATE SUBSCRIPTION name ... diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index 588f005dd9..cf2b3868bd 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -544,6 +544,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr) err = _("grouping operations are not allowed in COPY FROM WHERE conditions"); break; + case EXPR_KIND_PUBLICATION_WHERE: + if (isAgg) + err = _("aggregate functions are not allowed in publication WHERE expressions"); + else + err = _("grouping operations are not allowed in publication WHERE expressions"); + + break; /* * There is intentionally no default: case here, so that the @@ -933,6 +940,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc, case EXPR_KIND_GENERATED_COLUMN: err = _("window functions are not allowed in column generation expressions"); break; + case EXPR_KIND_PUBLICATION_WHERE: + err = _("window functions are not allowed in publication WHERE expressions"); + break; /* * There is intentionally no default: case here, so that the diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index 379355f9bf..0f2045363b 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -119,6 +119,13 @@ transformExprRecurse(ParseState *pstate, Node *expr) /* Guard against stack overflow due to overly complex expressions */ check_stack_depth(); + /* Functions are not allowed in publication WHERE clauses */ + if (pstate->p_expr_kind == EXPR_KIND_PUBLICATION_WHERE && nodeTag(expr) == T_FuncCall) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functions are not allowed in publication WHERE expressions"), + parser_errposition(pstate, exprLocation(expr)))); + switch (nodeTag(expr)) { case T_ColumnRef: @@ -507,6 +514,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) case EXPR_KIND_CALL_ARGUMENT: case EXPR_KIND_COPY_WHERE: case EXPR_KIND_GENERATED_COLUMN: + case EXPR_KIND_PUBLICATION_WHERE: /* okay */ break; @@ -1763,6 +1771,9 @@ transformSubLink(ParseState *pstate, SubLink *sublink) case EXPR_KIND_GENERATED_COLUMN: err = _("cannot use subquery in column generation expression"); break; + case EXPR_KIND_PUBLICATION_WHERE: + err = _("cannot use subquery in publication WHERE expression"); + break; /* * There is intentionally no default: case here, so that the @@ -3044,6 +3055,8 @@ ParseExprKindName(ParseExprKind exprKind) return "WHERE"; case EXPR_KIND_GENERATED_COLUMN: return "GENERATED AS"; + case EXPR_KIND_PUBLICATION_WHERE: + return "publication expression"; /* * There is intentionally no default: case here, so that the diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c index 07d0013e84..6ba01452a3 100644 --- a/src/backend/parser/parse_func.c +++ b/src/backend/parser/parse_func.c @@ -2527,6 +2527,9 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location) case EXPR_KIND_GENERATED_COLUMN: err = _("set-returning functions are not allowed in column generation expressions"); break; + case EXPR_KIND_PUBLICATION_WHERE: + err = _("set-returning functions are not allowed in publication WHERE expressions"); + break; /* * There is intentionally no default: case here, so that the diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index a18f847ade..63d00a75ed 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -630,19 +630,25 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. + * message provides during replication. This function also returns the relation + * qualifications to be used in COPY command. */ static void fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel) + LogicalRepRelation *lrel, List **qual) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid qualRow[] = {TEXTOID}; bool isnull; int natt; + ListCell *lc; + bool first; + + memset(lrel, 0, sizeof(LogicalRepRelation)); lrel->nspname = nspname; lrel->relname = relname; @@ -731,6 +737,51 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->natts = natt; + walrcv_clear_result(res); + + /* Get relation qual */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT pg_get_expr(prqual, prrelid) " + " FROM pg_publication p " + " INNER JOIN pg_publication_rel pr " + " ON (p.oid = pr.prpubid) " + " WHERE pr.prrelid = %u " + " AND p.pubname IN (", lrel->remoteid); + + first = true; + foreach(lc, MySubscription->publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_literal_cstr(pubname)); + } + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 1, qualRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch relation qualifications for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + Datum rf = slot_getattr(slot, 1, &isnull); + + if (!isnull) + *qual = lappend(*qual, makeString(TextDatumGetCString(rf))); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); pfree(cmd.data); } @@ -745,6 +796,7 @@ copy_table(Relation rel) { LogicalRepRelMapEntry *relmapentry; LogicalRepRelation lrel; + List *qual = NIL; WalRcvExecResult *res; StringInfoData cmd; CopyFromState cstate; @@ -753,7 +805,7 @@ copy_table(Relation rel) /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel); + RelationGetRelationName(rel), &lrel, &qual); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -762,16 +814,23 @@ copy_table(Relation rel) relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); + /* List of columns for COPY */ + attnamelist = make_copy_attnamelist(relmapentry); + /* Start copy on the publisher. */ initStringInfo(&cmd); - if (lrel.relkind == RELKIND_RELATION) + + /* Regular table with no row filter */ + if (lrel.relkind == RELKIND_RELATION && list_length(qual) == 0) appendStringInfo(&cmd, "COPY %s TO STDOUT", quote_qualified_identifier(lrel.nspname, lrel.relname)); else { /* * For non-tables, we need to do COPY (SELECT ...), but we can't just - * do SELECT * because we need to not copy generated columns. + * do SELECT * because we need to not copy generated columns. For + * tables with any row filters, build a SELECT query with AND'ed row + * filters for COPY. */ appendStringInfoString(&cmd, "COPY (SELECT "); for (int i = 0; i < lrel.natts; i++) @@ -780,9 +839,31 @@ copy_table(Relation rel) if (i < lrel.natts - 1) appendStringInfoString(&cmd, ", "); } - appendStringInfo(&cmd, " FROM %s) TO STDOUT", + appendStringInfo(&cmd, " FROM %s", quote_qualified_identifier(lrel.nspname, lrel.relname)); + /* list of AND'ed filters */ + if (list_length(qual) > 0) + { + ListCell *lc; + bool first = true; + + appendStringInfoString(&cmd, " WHERE "); + foreach(lc, qual) + { + char *q = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, " AND "); + appendStringInfo(&cmd, "%s", q); + } + list_free_deep(qual); + } + + appendStringInfoString(&cmd, ") TO STDOUT"); } + res = walrcv_exec(wrconn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index eb7db89cef..6092ae0df0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -340,8 +340,8 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) * * This is based on similar code in copy.c */ -static EState * -create_estate_for_relation(LogicalRepRelMapEntry *rel) +EState * +create_estate_for_relation(Relation rel) { EState *estate; RangeTblEntry *rte; @@ -350,8 +350,8 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) rte = makeNode(RangeTblEntry); rte->rtekind = RTE_RELATION; - rte->relid = RelationGetRelid(rel->localrel); - rte->relkind = rel->localrel->rd_rel->relkind; + rte->relid = RelationGetRelid(rel); + rte->relkind = rel->rd_rel->relkind; rte->rellockmode = AccessShareLock; ExecInitRangeTable(estate, list_make1(rte)); @@ -1176,7 +1176,7 @@ apply_handle_insert(StringInfo s) } /* Initialize the executor state. */ - estate = create_estate_for_relation(rel); + estate = create_estate_for_relation(rel->localrel); remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -1301,7 +1301,7 @@ apply_handle_update(StringInfo s) check_relation_updatable(rel); /* Initialize the executor state. */ - estate = create_estate_for_relation(rel); + estate = create_estate_for_relation(rel->localrel); remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -1458,7 +1458,7 @@ apply_handle_delete(StringInfo s) check_relation_updatable(rel); /* Initialize the executor state. */ - estate = create_estate_for_relation(rel); + estate = create_estate_for_relation(rel->localrel); remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 79765f9696..dd6f3bda3a 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,12 +15,22 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel.h" +#include "catalog/pg_type.h" #include "commands/defrem.h" +#include "executor/executor.h" #include "fmgr.h" +#include "nodes/execnodes.h" +#include "nodes/nodeFuncs.h" +#include "optimizer/planner.h" +#include "optimizer/optimizer.h" +#include "parser/parse_coerce.h" #include "replication/logical.h" #include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/int8.h" #include "utils/inval.h" #include "utils/lsyscache.h" @@ -57,6 +67,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple, + HeapTuple newtuple, List *rowfilter); static bool publications_valid; static bool in_streaming; @@ -98,6 +110,7 @@ typedef struct RelationSyncEntry bool replicate_valid; PublicationActions pubactions; + List *qual; /* * OID of the relation to publish changes as. For a partition, this may @@ -121,7 +134,7 @@ static HTAB *RelationSyncCache = NULL; static void init_rel_sync_cache(MemoryContext decoding_context); static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); -static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); +static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation rel); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -489,6 +502,99 @@ send_relation_and_attrs(Relation relation, TransactionId xid, OutputPluginWrite(ctx, false); } +/* + * Evaluates row filter. + * + * If the row filter evaluates to NULL, it is taken as false i.e. the change + * isn't replicated. + */ +static inline bool +pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) +{ + Datum ret; + bool isnull; + + Assert(state != NULL); + + ret = ExecEvalExprSwitchContext(state, econtext, &isnull); + + if (isnull) + return false; + + return DatumGetBool(ret); +} + +/* + * Change is checked against the row filter, if any. + * + * If it returns true, the change is replicated, otherwise, it is not. + */ +static bool +pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, List *rowfilter) +{ + TupleDesc tupdesc; + EState *estate; + ExprContext *ecxt; + MemoryContext oldcxt; + ListCell *lc; + bool result = true; + + /* Bail out if there is no row filter */ + if (list_length(rowfilter) == 0) + return true; + + tupdesc = RelationGetDescr(relation); + + estate = create_estate_for_relation(relation); + + /* Prepare context per tuple */ + ecxt = GetPerTupleExprContext(estate); + oldcxt = MemoryContextSwitchTo(estate->es_query_cxt); + ecxt->ecxt_scantuple = ExecInitExtraTupleSlot(estate, tupdesc, &TTSOpsHeapTuple); + MemoryContextSwitchTo(oldcxt); + + ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false); + + /* + * If the subscription has multiple publications and the same table has a + * different row filter in these publications, all row filters must be + * matched in order to replicate this change. + */ + foreach(lc, rowfilter) + { + Node *rfnode = (Node *) lfirst(lc); + Oid exprtype; + Expr *expr; + ExprState *exprstate; + + /* Prepare expression for execution */ + exprtype = exprType(rfnode); + expr = (Expr *) coerce_to_target_type(NULL, rfnode, exprtype, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1); + + if (expr == NULL) + ereport(ERROR, + (errcode(ERRCODE_CANNOT_COERCE), + errmsg("row filter returns type %s that cannot be coerced to the expected type %s", + format_type_be(exprtype), + format_type_be(BOOLOID)), + errhint("You will need to rewrite the row filter."))); + + exprstate = ExecPrepareExpr(expr, estate); + + /* Evaluates row filter */ + result = pgoutput_row_filter_exec_expr(exprstate, ecxt); + + /* If the tuple does not match one of the row filters, bail out */ + if (!result) + break; + } + + ResetExprContext(ecxt); + FreeExecutorState(estate); + + return result; +} + /* * Sends the decoded DML over wire. * @@ -516,7 +622,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = change->txn->xid; - relentry = get_rel_sync_entry(data, RelationGetRelid(relation)); + relentry = get_rel_sync_entry(data, relation); /* First check the table filter */ switch (change->action) @@ -560,6 +666,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, tuple = execute_attr_map_tuple(tuple, relentry->map); } + /* Check row filter. */ + if (!pgoutput_row_filter(relation, NULL, tuple, relentry->qual)) + return; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_insert(ctx->out, xid, relation, tuple, data->binary); @@ -586,6 +696,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } } + /* Check row filter. */ + if (!pgoutput_row_filter(relation, oldtuple, newtuple, relentry->qual)) + return; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_update(ctx->out, xid, relation, oldtuple, newtuple, data->binary); @@ -608,6 +722,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); } + /* Check row filter. */ + if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry->qual)) + return; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_delete(ctx->out, xid, relation, oldtuple, data->binary); @@ -655,12 +773,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, for (i = 0; i < nrelations; i++) { Relation relation = relations[i]; - Oid relid = RelationGetRelid(relation); if (!is_publishable_relation(relation)) continue; - relentry = get_rel_sync_entry(data, relid); + relentry = get_rel_sync_entry(data, relation); if (!relentry->pubactions.pubtruncate) continue; @@ -670,10 +787,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * root tables through it. */ if (relation->rd_rel->relispartition && - relentry->publish_as_relid != relid) + relentry->publish_as_relid != relentry->relid) continue; - relids[nrelids++] = relid; + relids[nrelids++] = relentry->relid; maybe_send_schema(ctx, txn, change, relation, relentry); } @@ -941,16 +1058,21 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) * when publishing. */ static RelationSyncEntry * -get_rel_sync_entry(PGOutputData *data, Oid relid) +get_rel_sync_entry(PGOutputData *data, Relation rel) { RelationSyncEntry *entry; - bool am_partition = get_rel_relispartition(relid); - char relkind = get_rel_relkind(relid); + Oid relid; + bool am_partition; + char relkind; bool found; MemoryContext oldctx; Assert(RelationSyncCache != NULL); + relid = RelationGetRelid(rel); + am_partition = get_rel_relispartition(relid); + relkind = get_rel_relkind(relid); + /* Find cached relation info, creating if not found */ entry = (RelationSyncEntry *) hash_search(RelationSyncCache, (void *) &relid, @@ -966,6 +1088,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->replicate_valid = false; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->qual = NIL; entry->publish_as_relid = InvalidOid; } @@ -997,6 +1120,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { Publication *pub = lfirst(lc); bool publish = false; + HeapTuple rftuple; + Datum rfdatum; + bool rfisnull; if (pub->alltables) { @@ -1056,9 +1182,29 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } - if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) - break; + /* + * Cache row filter, if available. All publication-table mappings + * must be checked. If it is a partition and pubviaroot is true, + * use the row filter of the topmost partitioned table instead of + * the row filter of its own partition. + */ + rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid), ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(rftuple)) + { + rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull); + + if (!rfisnull) + { + Node *rfnode; + + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + rfnode = stringToNode(TextDatumGetCString(rfdatum)); + entry->qual = lappend(entry->qual, rfnode); + MemoryContextSwitchTo(oldctx); + } + + ReleaseSysCache(rftuple); + } } list_free(pubids); @@ -1164,6 +1310,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) { HASH_SEQ_STATUS status; RelationSyncEntry *entry; + MemoryContext oldctx; /* * We can get here if the plugin was used in SQL interface as the @@ -1173,6 +1320,8 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) if (RelationSyncCache == NULL) return; + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + /* * There is no way to find which entry in our cache the hash belongs to so * mark the whole cache as invalid. @@ -1190,5 +1339,11 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; + + if (list_length(entry->qual) > 0) + list_free_deep(entry->qual); + entry->qual = NIL; } + + MemoryContextSwitchTo(oldctx); } diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 4127611f5a..0263baf72a 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -85,6 +85,13 @@ typedef struct Publication PublicationActions pubactions; } Publication; +typedef struct PublicationRelationQual +{ + Oid relid; + Relation relation; + Node *whereClause; +} PublicationRelationQual; + extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); @@ -106,11 +113,12 @@ typedef enum PublicationPartOpt } PublicationPartOpt; extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); +extern List *GetPublicationRelationQuals(Oid pubid); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); extern bool is_publishable_relation(Relation rel); -extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, +extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationQual *targetrel, bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index c79b7fb487..d26673a111 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -31,6 +31,10 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) Oid oid; /* oid */ Oid prpubid; /* Oid of the publication */ Oid prrelid; /* Oid of the relation */ + +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + pg_node_tree prqual; /* qualifications */ +#endif } FormData_pg_publication_rel; /* ---------------- @@ -40,6 +44,8 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) */ typedef FormData_pg_publication_rel *Form_pg_publication_rel; +DECLARE_TOAST(pg_publication_rel, 8287, 8288); + DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, on pg_publication_rel using btree(oid oid_ops)); #define PublicationRelObjectIndexId 6112 DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops)); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index caed683ba9..f912c3d6f1 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -480,6 +480,7 @@ typedef enum NodeTag T_PartitionRangeDatum, T_PartitionCmd, T_VacuumRelation, + T_PublicationTable, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 068c6ec440..cf9973f8a3 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3517,12 +3517,19 @@ typedef struct AlterTSConfigurationStmt } AlterTSConfigurationStmt; +typedef struct PublicationTable +{ + NodeTag type; + RangeVar *relation; /* relation to be published */ + Node *whereClause; /* qualifications */ +} PublicationTable; + typedef struct CreatePublicationStmt { NodeTag type; char *pubname; /* Name of the publication */ List *options; /* List of DefElem nodes */ - List *tables; /* Optional list of tables to add */ + List *tables; /* Optional list of PublicationTable to add */ bool for_all_tables; /* Special publication for all tables in db */ } CreatePublicationStmt; @@ -3535,7 +3542,7 @@ typedef struct AlterPublicationStmt List *options; /* List of DefElem nodes */ /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ - List *tables; /* List of tables to add/drop */ + List *tables; /* List of PublicationTable to add/drop */ bool for_all_tables; /* Special publication for all tables in db */ DefElemAction tableAction; /* What action to perform with the tables */ } AlterPublicationStmt; diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h index dfc214b06f..ac8ae4fa9c 100644 --- a/src/include/parser/parse_node.h +++ b/src/include/parser/parse_node.h @@ -78,6 +78,7 @@ typedef enum ParseExprKind EXPR_KIND_CALL_ARGUMENT, /* procedure argument in CALL */ EXPR_KIND_COPY_WHERE, /* WHERE condition in COPY FROM */ EXPR_KIND_GENERATED_COLUMN, /* generation expression for a column */ + EXPR_KIND_PUBLICATION_WHERE /* WHERE condition for a table in PUBLICATION */ } ParseExprKind; diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 3f0b3deefb..a59ad2c9c8 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -49,4 +49,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp); extern char *logicalrep_typmap_gettypname(Oid remoteid); +extern EState *create_estate_for_relation(Relation rel); + #endif /* LOGICALRELATION_H */ diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 63d6ab7a4e..c8cf1b685e 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -156,6 +156,38 @@ Tables: DROP TABLE testpub_parted1; DROP PUBLICATION testpub_forparted, testpub_forparted1; +CREATE TABLE testpub_rf_tbl1 (a integer, b text); +CREATE TABLE testpub_rf_tbl2 (c text, d integer); +CREATE TABLE testpub_rf_tbl3 (e integer); +CREATE TABLE testpub_rf_tbl4 (g text); +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5); +RESET client_min_messages; +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000); +ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2; +-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression) +ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500); +-- fail - functions disallowed +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6); +ERROR: functions are not allowed in publication WHERE expressions +LINE 1: ...ICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) ... + ^ +-- fail - WHERE not allowed in DROP +ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl3 WHERE (e < 27); +ERROR: cannot use a WHERE clause when removing table from publication "testpub5" +\dRp+ testpub5 + Publication testpub5 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500)) + +DROP TABLE testpub_rf_tbl1; +DROP TABLE testpub_rf_tbl2; +DROP TABLE testpub_rf_tbl3; +DROP TABLE testpub_rf_tbl4; +DROP PUBLICATION testpub5; -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; ERROR: "testpub_view" is not a table diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index d844075368..35211c56f6 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -93,6 +93,29 @@ ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true); DROP TABLE testpub_parted1; DROP PUBLICATION testpub_forparted, testpub_forparted1; +CREATE TABLE testpub_rf_tbl1 (a integer, b text); +CREATE TABLE testpub_rf_tbl2 (c text, d integer); +CREATE TABLE testpub_rf_tbl3 (e integer); +CREATE TABLE testpub_rf_tbl4 (g text); +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5); +RESET client_min_messages; +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000); +ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2; +-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression) +ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500); +-- fail - functions disallowed +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6); +-- fail - WHERE not allowed in DROP +ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl3 WHERE (e < 27); +\dRp+ testpub5 + +DROP TABLE testpub_rf_tbl1; +DROP TABLE testpub_rf_tbl2; +DROP TABLE testpub_rf_tbl3; +DROP TABLE testpub_rf_tbl4; +DROP PUBLICATION testpub5; + -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; SET client_min_messages = 'ERROR'; diff --git a/src/test/subscription/t/020_row_filter.pl b/src/test/subscription/t/020_row_filter.pl new file mode 100644 index 0000000000..b8c059d44b --- /dev/null +++ b/src/test/subscription/t/020_row_filter.pl @@ -0,0 +1,190 @@ +# Test logical replication behavior with row filtering +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 6; + +# create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# setup structure on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_1 (a int primary key, b text)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_2 (c int primary key)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)" +); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_less_10k (LIKE tab_rowfilter_partitioned)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_less_10k FOR VALUES FROM (MINVALUE) TO (10000)" +); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_greater_10k (LIKE tab_rowfilter_partitioned)" +); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)" +); + +# setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_1 (a int primary key, b text)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_2 (c int primary key)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)" +); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_less_10k (LIKE tab_rowfilter_partitioned)"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_less_10k FOR VALUES FROM (MINVALUE) TO (10000)" +); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_greater_10k (LIKE tab_rowfilter_partitioned)" +); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)" +); + +# setup logical replication +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_1 FOR TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')" +); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_1 ADD TABLE tab_rowfilter_2 WHERE (c % 7 = 0)" +); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_1 SET TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered'), tab_rowfilter_2 WHERE (c % 2 = 0), tab_rowfilter_3" +); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_2 FOR TABLE tab_rowfilter_2 WHERE (c % 3 = 0)" +); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_rowfilter_partitioned WHERE (a < 5000)" +); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_3 ADD TABLE tab_rowfilter_less_10k WHERE (a < 6000)" +); + +# test row filtering +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1, 'not replicated')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1500, 'filtered')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1980, 'not filtered')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) SELECT x, 'test ' || x FROM generate_series(990,1002) x" +); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_2 (c) SELECT generate_series(1, 20)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_3 (a, b) SELECT x, (x % 3 = 0) FROM generate_series(1, 10) x" +); +# use partition row filter: +# - replicate (1, 100) because 1 < 6000 is true +# - don't replicate (8000, 101) because 8000 < 6000 is false +# - replicate (15000, 102) because partition tab_rowfilter_greater_10k doesn't have row filter +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(1, 100),(8000, 101),(15000, 102)" +); +# insert directly into partition +# use partition row filter: replicate (2, 200) because 2 < 6000 is true +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(2, 200)"); +# use partition row filter: replicate (5500, 300) because 5500 < 6000 is true +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(5500, 300)"); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3" +); + +$node_publisher->wait_for_catchup($appname); + +# wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2"); +is( $result, qq(1001|test 1001 +1002|test 1002 +1980|not filtered), 'check filtered data was copied to subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(c), min(c), max(c) FROM tab_rowfilter_2"); +is($result, qq(3|6|18), 'check filtered data was copied to subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(a) FROM tab_rowfilter_3"); +is($result, qq(10), 'check filtered data was copied to subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT a, b FROM tab_rowfilter_less_10k ORDER BY 1, 2"); +is( $result, qq(1|100 +2|200 +5500|300), 'check filtered data was copied to subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT a, b FROM tab_rowfilter_greater_10k ORDER BY 1, 2"); +is($result, qq(15000|102), 'check filtered data was copied to subscriber'); + +# publish using partitioned table +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_3 SET (publish_via_partition_root = true)"); +$node_subscriber->safe_psql('postgres', + "TRUNCATE TABLE tab_rowfilter_partitioned"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)"); +# use partitioned table row filter: replicate, 4000 < 5000 is true +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400)"); +# use partitioned table row filter: replicate, 4500 < 5000 is true +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(4500, 450)"); +# use partitioned table row filter: don't replicate, 5600 < 5000 is false +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(5600, 123)"); +# use partitioned table row filter: don't replicate, 16000 < 5000 is false +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_greater_10k (a, b) VALUES(16000, 1950)"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT a, b FROM tab_rowfilter_partitioned ORDER BY 1, 2"); +is( $result, qq(1|100 +2|200 +4000|400 +4500|450), 'check publish_via_partition_root behavior'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 2.20.1
From 6983d9831cd4cc8f9d40eaaa28ef74bec05a6147 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Mon, 18 Jan 2021 12:09:19 -0300 Subject: [PATCH 4/7] Print publication WHERE condition in psql --- src/bin/psql/describe.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 20af5a92b4..8a7113ce15 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6011,7 +6011,8 @@ describePublications(const char *pattern) if (!puballtables) { printfPQExpBuffer(&buf, - "SELECT n.nspname, c.relname\n" + "SELECT n.nspname, c.relname,\n" + " pg_get_expr(pr.prqual, c.oid)\n" "FROM pg_catalog.pg_class c,\n" " pg_catalog.pg_namespace n,\n" " pg_catalog.pg_publication_rel pr\n" @@ -6041,6 +6042,10 @@ describePublications(const char *pattern) PQgetvalue(tabres, j, 0), PQgetvalue(tabres, j, 1)); + if (!PQgetisnull(tabres, j, 2)) + appendPQExpBuffer(&buf, " WHERE %s", + PQgetvalue(tabres, j, 2)); + printTableAddFooter(&cont, buf.data); } PQclear(tabres); -- 2.20.1
From 8a8d125c3b3f6a14a6560598ac99e2a52f8a0028 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Mon, 18 Jan 2021 14:29:59 -0300 Subject: [PATCH 5/7] Publication WHERE condition support for pg_dump --- src/bin/pg_dump/pg_dump.c | 14 ++++++++++++-- src/bin/pg_dump/pg_dump.h | 1 + 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 39da742e32..50388ae8ca 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4080,6 +4080,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) int i_oid; int i_prpubid; int i_prrelid; + int i_prrelqual; int i, j, ntups; @@ -4091,7 +4092,8 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) /* Collect all publication membership info. */ appendPQExpBufferStr(query, - "SELECT tableoid, oid, prpubid, prrelid " + "SELECT tableoid, oid, prpubid, prrelid, " + "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual " "FROM pg_catalog.pg_publication_rel"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); @@ -4101,6 +4103,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) i_oid = PQfnumber(res, "oid"); i_prpubid = PQfnumber(res, "prpubid"); i_prrelid = PQfnumber(res, "prrelid"); + i_prrelqual = PQfnumber(res, "prrelqual"); /* this allocation may be more than we need */ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -4141,6 +4144,10 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) pubrinfo[j].dobj.name = tbinfo->dobj.name; pubrinfo[j].publication = pubinfo; pubrinfo[j].pubtable = tbinfo; + if (PQgetisnull(res, i, i_prrelqual)) + pubrinfo[j].pubrelqual = NULL; + else + pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual)); /* Decide whether we want to dump it */ selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout); @@ -4173,8 +4180,11 @@ dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo) appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY", fmtId(pubinfo->dobj.name)); - appendPQExpBuffer(query, " %s;\n", + appendPQExpBuffer(query, " %s", fmtQualifiedDumpable(tbinfo)); + if (pubrinfo->pubrelqual) + appendPQExpBuffer(query, " WHERE %s", pubrinfo->pubrelqual); + appendPQExpBufferStr(query, ";\n"); /* * There is no point in creating a drop query as the drop is done by table diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 1290f9659b..7927039d1d 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -625,6 +625,7 @@ typedef struct _PublicationRelInfo DumpableObject dobj; PublicationInfo *publication; TableInfo *pubtable; + char *pubrelqual; } PublicationRelInfo; /* -- 2.20.1
From 24aea97f9e56755eaefbe6b77d45e0dee8ffb550 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Fri, 29 Jan 2021 23:28:13 -0300 Subject: [PATCH 6/7] Debug messages --- src/backend/replication/pgoutput/pgoutput.c | 14 ++++++++++++++ src/test/subscription/t/020_row_filter.pl | 1 + 2 files changed, 15 insertions(+) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index dd6f3bda3a..17a728c0bf 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -518,6 +518,10 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) ret = ExecEvalExprSwitchContext(state, econtext, &isnull); + elog(DEBUG2, "pgoutput_row_filter_exec_expr: ret: %d ; isnull: %d", + DatumGetBool(ret) ? 1 : 0, + isnull ? 1 : 0); + if (isnull) return false; @@ -543,6 +547,8 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L if (list_length(rowfilter) == 0) return true; + elog(DEBUG1, "table %s has row filter", get_rel_name(relation->rd_id)); + tupdesc = RelationGetDescr(relation); estate = create_estate_for_relation(relation); @@ -566,6 +572,7 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L Oid exprtype; Expr *expr; ExprState *exprstate; + char *s = NULL; /* Prepare expression for execution */ exprtype = exprType(rfnode); @@ -585,6 +592,13 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L result = pgoutput_row_filter_exec_expr(exprstate, ecxt); /* If the tuple does not match one of the row filters, bail out */ + s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(rfnode)), ObjectIdGetDatum(relation->rd_id))); + if (result) + elog(DEBUG2, "pgoutput_row_filter: row filter \"%s\" matched", s); + else + elog(DEBUG2, "pgoutput_row_filter: row filter \"%s\" not matched", s); + pfree(s); + if (!result) break; } diff --git a/src/test/subscription/t/020_row_filter.pl b/src/test/subscription/t/020_row_filter.pl index b8c059d44b..ea1d7c30ae 100644 --- a/src/test/subscription/t/020_row_filter.pl +++ b/src/test/subscription/t/020_row_filter.pl @@ -8,6 +8,7 @@ use Test::More tests => 6; # create publisher node my $node_publisher = get_new_node('publisher'); $node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'log_min_messages = DEBUG2'); $node_publisher->start; # create subscriber node -- 2.20.1
From 6c684ac80914a388a942b435d0e78ab327564b6c Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Sun, 31 Jan 2021 20:48:43 -0300 Subject: [PATCH 7/7] Measure row filter overhead --- src/backend/replication/pgoutput/pgoutput.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 17a728c0bf..ebd45c1c84 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -542,6 +542,8 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L MemoryContext oldcxt; ListCell *lc; bool result = true; + instr_time start_time; + instr_time end_time; /* Bail out if there is no row filter */ if (list_length(rowfilter) == 0) @@ -549,6 +551,8 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L elog(DEBUG1, "table %s has row filter", get_rel_name(relation->rd_id)); + INSTR_TIME_SET_CURRENT(start_time); + tupdesc = RelationGetDescr(relation); estate = create_estate_for_relation(relation); @@ -606,6 +610,11 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L ResetExprContext(ecxt); FreeExecutorState(estate); + INSTR_TIME_SET_CURRENT(end_time); + INSTR_TIME_SUBTRACT(end_time, start_time); + + elog(DEBUG2, "row filter time: %0.3f us", INSTR_TIME_GET_DOUBLE(end_time) * 1e6); + return result; } -- 2.20.1