Em dom, 3 de fev de 2019 às 07:14, Andres Freund <and...@anarazel.de> escreveu: > > As far as I can tell, the patch has not been refreshed since. So I'm > marking this as returned with feedback for now. Please resubmit once > ready. > I fix all of the bugs pointed in this thread. I decide to disallow UDFs in filters (it is safer for a first version). We can add this functionality later. However, I'll check if allow "safe" functions (aka builtin functions) are ok. I add more docs explaining that expressions are executed with the role used for replication connection and also that columns used in expressions must be part of PK or REPLICA IDENTITY. I add regression tests.
Comments? -- Euler Taveira Timbira - http://www.timbira.com.br/ PostgreSQL: Consultoria, Desenvolvimento, Suporte 24x7 e Treinamento
From 87945236590e9fd37b203d325b74dc5baccee64d Mon Sep 17 00:00:00 2001 From: Euler Taveira <eu...@timbira.com.br> Date: Fri, 9 Mar 2018 18:39:22 +0000 Subject: [PATCH 1/8] Remove unused atttypmod column from initial table synchronization Since commit 7c4f52409a8c7d85ed169bbbc1f6092274d03920, atttypmod was added but not used. The removal is safe because COPY from publisher does not need such information. --- src/backend/replication/logical/tablesync.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 7881079e96..0a565dd837 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -647,7 +647,7 @@ fetch_remote_table_info(char *nspname, char *relname, StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[2] = {OIDOID, CHAROID}; - Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; + Oid attrRow[3] = {TEXTOID, OIDOID, BOOLOID}; bool isnull; int natt; @@ -691,7 +691,6 @@ fetch_remote_table_info(char *nspname, char *relname, appendStringInfo(&cmd, "SELECT a.attname," " a.atttypid," - " a.atttypmod," " a.attnum = ANY(i.indkey)" " FROM pg_catalog.pg_attribute a" " LEFT JOIN pg_catalog.pg_index i" @@ -703,7 +702,7 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->remoteid, (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""), lrel->remoteid); - res = walrcv_exec(wrconn, cmd.data, 4, attrRow); + res = walrcv_exec(wrconn, cmd.data, 3, attrRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -724,7 +723,7 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); Assert(!isnull); - if (DatumGetBool(slot_getattr(slot, 4, &isnull))) + if (DatumGetBool(slot_getattr(slot, 3, &isnull))) lrel->attkeys = bms_add_member(lrel->attkeys, natt); /* Should never happen. */ -- 2.11.0
From 3a5b4c541982357c2231b9882ac01f1f0d0a8e29 Mon Sep 17 00:00:00 2001 From: Euler Taveira <eu...@timbira.com.br> Date: Tue, 27 Feb 2018 02:21:03 +0000 Subject: [PATCH 3/8] Refactor function create_estate_for_relation Relation localrel is the only LogicalRepRelMapEntry structure member that is useful for create_estate_for_relation. --- src/backend/replication/logical/worker.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 43edfef089..31fc7c5048 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -173,7 +173,7 @@ ensure_transaction(void) * This is based on similar code in copy.c */ static EState * -create_estate_for_relation(LogicalRepRelMapEntry *rel) +create_estate_for_relation(Relation rel) { EState *estate; ResultRelInfo *resultRelInfo; @@ -183,13 +183,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) rte = makeNode(RangeTblEntry); rte->rtekind = RTE_RELATION; - rte->relid = RelationGetRelid(rel->localrel); - rte->relkind = rel->localrel->rd_rel->relkind; + rte->relid = RelationGetRelid(rel); + rte->relkind = rel->rd_rel->relkind; rte->rellockmode = AccessShareLock; ExecInitRangeTable(estate, list_make1(rte)); resultRelInfo = makeNode(ResultRelInfo); - InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0); + InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0); estate->es_result_relations = resultRelInfo; estate->es_num_result_relations = 1; @@ -589,7 +589,7 @@ apply_handle_insert(StringInfo s) } /* Initialize the executor state. */ - estate = create_estate_for_relation(rel); + estate = create_estate_for_relation(rel->localrel); remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -696,7 +696,7 @@ apply_handle_update(StringInfo s) check_relation_updatable(rel); /* Initialize the executor state. */ - estate = create_estate_for_relation(rel); + estate = create_estate_for_relation(rel->localrel); remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -815,7 +815,7 @@ apply_handle_delete(StringInfo s) check_relation_updatable(rel); /* Initialize the executor state. */ - estate = create_estate_for_relation(rel); + estate = create_estate_for_relation(rel->localrel); remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); -- 2.11.0
From 7ef5ccffcb7bc71d298427e7b2c3a2cfae8556c6 Mon Sep 17 00:00:00 2001 From: Euler Taveira <eu...@timbira.com.br> Date: Wed, 24 Jan 2018 17:01:31 -0200 Subject: [PATCH 4/8] Rename a WHERE node A WHERE clause will be used for row filtering in logical replication. We already have a similar node: 'WHERE (condition here)'. Let's rename the node to a generic name and use it for row filtering too. --- src/backend/parser/gram.y | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 3dc0e8a4fb..61cc59fe7c 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -476,7 +476,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> def_arg columnElem where_clause where_or_current_clause a_expr b_expr c_expr AexprConst indirection_el opt_slice_bound columnref in_expr having_clause func_table xmltable array_expr - ExclusionWhereClause operator_def_arg + OptWhereClause operator_def_arg %type <list> rowsfrom_item rowsfrom_list opt_col_def_list %type <boolean> opt_ordinality %type <list> ExclusionConstraintList ExclusionConstraintElem @@ -3710,7 +3710,7 @@ ConstraintElem: $$ = (Node *)n; } | EXCLUDE access_method_clause '(' ExclusionConstraintList ')' - opt_c_include opt_definition OptConsTableSpace ExclusionWhereClause + opt_c_include opt_definition OptConsTableSpace OptWhereClause ConstraintAttributeSpec { Constraint *n = makeNode(Constraint); @@ -3812,7 +3812,7 @@ ExclusionConstraintElem: index_elem WITH any_operator } ; -ExclusionWhereClause: +OptWhereClause: WHERE '(' a_expr ')' { $$ = $3; } | /*EMPTY*/ { $$ = NULL; } ; -- 2.11.0
From 4b5ca55f83e8036d6892a458bf73c891329c01f8 Mon Sep 17 00:00:00 2001 From: Euler Taveira <eu...@timbira.com.br> Date: Fri, 9 Mar 2018 17:37:36 +0000 Subject: [PATCH 2/8] Store number of tuples in WalRcvExecResult It seems to be a useful information while allocating memory for queries that returns more than one row. It reduces memory allocation for initial table synchronization. --- src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 5 +++-- src/backend/replication/logical/tablesync.c | 5 ++--- src/include/replication/walreceiver.h | 1 + 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 765d58d120..e657177c00 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -878,6 +878,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, errdetail("Expected %d fields, got %d fields.", nRetTypes, nfields))); + walres->ntuples = PQntuples(pgres); walres->tuplestore = tuplestore_begin_heap(true, false, work_mem); /* Create tuple descriptor corresponding to expected result. */ @@ -888,7 +889,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, attinmeta = TupleDescGetAttInMetadata(walres->tupledesc); /* No point in doing more here if there were no tuples returned. */ - if (PQntuples(pgres) == 0) + if (walres->ntuples == 0) return; /* Create temporary context for local allocations. */ @@ -897,7 +898,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, ALLOCSET_DEFAULT_SIZES); /* Process returned rows. */ - for (tupn = 0; tupn < PQntuples(pgres); tupn++) + for (tupn = 0; tupn < walres->ntuples; tupn++) { char *cstrs[MaxTupleAttributeNumber]; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 0a565dd837..42db4ada9e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -709,9 +709,8 @@ fetch_remote_table_info(char *nspname, char *relname, (errmsg("could not fetch table info for table \"%s.%s\": %s", nspname, relname, res->err))); - /* We don't know the number of rows coming, so allocate enough space. */ - lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); - lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); + lrel->attnames = palloc0(res->ntuples * sizeof(char *)); + lrel->atttyps = palloc0(res->ntuples * sizeof(Oid)); lrel->attkeys = NULL; natt = 0; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 7f2927cb46..d0fb98df09 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -197,6 +197,7 @@ typedef struct WalRcvExecResult char *err; Tuplestorestate *tuplestore; TupleDesc tupledesc; + int ntuples; } WalRcvExecResult; /* libpqwalreceiver hooks */ -- 2.11.0
From fc1090c6922d1a66d3ad03d21441829f8cae0472 Mon Sep 17 00:00:00 2001 From: Euler Taveira <eu...@timbira.com.br> Date: Tue, 27 Feb 2018 04:03:13 +0000 Subject: [PATCH 5/8] Row filtering for logical replication When you define or modify a publication you optionally can filter rows to be published using a WHERE condition. This condition is any expression that evaluates to boolean. Only those rows that satisfy the WHERE condition will be sent to subscribers. --- doc/src/sgml/catalogs.sgml | 9 +++ doc/src/sgml/ref/alter_publication.sgml | 11 ++- doc/src/sgml/ref/create_publication.sgml | 26 +++++- src/backend/catalog/pg_publication.c | 102 ++++++++++++++++++++++-- src/backend/commands/publicationcmds.c | 93 +++++++++++++++------- src/backend/parser/gram.y | 26 ++++-- src/backend/parser/parse_agg.c | 10 +++ src/backend/parser/parse_expr.c | 14 +++- src/backend/parser/parse_func.c | 3 + src/backend/replication/logical/tablesync.c | 119 +++++++++++++++++++++++++--- src/backend/replication/logical/worker.c | 2 +- src/backend/replication/pgoutput/pgoutput.c | 100 ++++++++++++++++++++++- src/include/catalog/pg_publication.h | 9 ++- src/include/catalog/pg_publication_rel.h | 10 ++- src/include/catalog/toasting.h | 1 + src/include/nodes/nodes.h | 1 + src/include/nodes/parsenodes.h | 11 ++- src/include/parser/parse_node.h | 1 + src/include/replication/logicalrelation.h | 2 + src/test/regress/expected/publication.out | 29 +++++++ src/test/regress/sql/publication.sql | 21 +++++ src/test/subscription/t/013_row_filter.pl | 96 ++++++++++++++++++++++ 22 files changed, 629 insertions(+), 67 deletions(-) create mode 100644 src/test/subscription/t/013_row_filter.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 4c7e93892a..88177279c7 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -5587,6 +5587,15 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l <entry><literal><link linkend="catalog-pg-class"><structname>pg_class</structname></link>.oid</literal></entry> <entry>Reference to relation</entry> </row> + + <row> + <entry><structfield>prqual</structfield></entry> + <entry><type>pg_node_tree</type></entry> + <entry></entry> + <entry>Expression tree (in the form of a + <function>nodeToString()</function> representation) for the relation's + qualifying condition</entry> + </row> </tbody> </tgroup> </table> diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index 534e598d93..9608448207 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -21,8 +21,8 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> -ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] -ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_USER | SESSION_USER } @@ -91,7 +91,12 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r table name, only that table is affected. If <literal>ONLY</literal> is not specified, the table and all its descendant tables (if any) are affected. Optionally, <literal>*</literal> can be specified after the table - name to explicitly indicate that descendant tables are included. + name to explicitly indicate that descendant tables are included. If the + optional <literal>WHERE</literal> clause is specified, rows that do not + satisfy the <replaceable class="parameter">expression</replaceable> will + not be published. Note that parentheses are required around the + expression. The <replaceable class="parameter">expression</replaceable> + is executed with the role used for the replication connection. </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 99f87ca393..6e99943374 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -22,7 +22,7 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> CREATE PUBLICATION <replaceable class="parameter">name</replaceable> - [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] + [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...] | FOR ALL TABLES ] [ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] @@ -68,7 +68,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> that table is added to the publication. If <literal>ONLY</literal> is not specified, the table and all its descendant tables (if any) are added. Optionally, <literal>*</literal> can be specified after the table name to - explicitly indicate that descendant tables are included. + explicitly indicate that descendant tables are included. If the optional + <literal>WHERE</literal> clause is specified, rows that do not satisfy + the <replaceable class="parameter">expression</replaceable> will not be + published. Note that parentheses are required around the expression. </para> <para> @@ -157,6 +160,13 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> </para> <para> + Columns used in the <literal>WHERE</literal> clause must be part of the + primary key or be covered by <literal>REPLICA IDENTITY</literal> otherwise + <command>UPDATE</command> and <command>DELETE</command> operations will not + be replicated. + </para> + + <para> For an <command>INSERT ... ON CONFLICT</command> command, the publication will publish the operation that actually results from the command. So depending of the outcome, it may be published as either <command>INSERT</command> or @@ -171,6 +181,11 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> <para> <acronym>DDL</acronym> operations are not published. </para> + + <para> + The <literal>WHERE</literal> clause expression is executed with the role used + for the replication connection. + </para> </refsect1> <refsect1> @@ -184,6 +199,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments; </para> <para> + Create a publication that publishes all changes from active departments: +<programlisting> +CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE); +</programlisting> + </para> + + <para> Create a publication that publishes all changes in all tables: <programlisting> CREATE PUBLICATION alltables FOR ALL TABLES; diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index f8475c1aba..ff30fdd9f6 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -34,6 +34,10 @@ #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "parser/parse_clause.h" +#include "parser/parse_collate.h" +#include "parser/parse_relation.h" + #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" @@ -149,18 +153,21 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS) * Insert new publication / relation mapping. */ ObjectAddress -publication_add_relation(Oid pubid, Relation targetrel, +publication_add_relation(Oid pubid, PublicationRelationQual *targetrel, bool if_not_exists) { Relation rel; HeapTuple tup; Datum values[Natts_pg_publication_rel]; bool nulls[Natts_pg_publication_rel]; - Oid relid = RelationGetRelid(targetrel); + Oid relid = RelationGetRelid(targetrel->relation); Oid prrelid; Publication *pub = GetPublication(pubid); ObjectAddress myself, referenced; + ParseState *pstate; + RangeTblEntry *rte; + Node *whereclause; rel = table_open(PublicationRelRelationId, RowExclusiveLock); @@ -180,10 +187,27 @@ publication_add_relation(Oid pubid, Relation targetrel, ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("relation \"%s\" is already member of publication \"%s\"", - RelationGetRelationName(targetrel), pub->name))); + RelationGetRelationName(targetrel->relation), pub->name))); } - check_publication_add_relation(targetrel); + check_publication_add_relation(targetrel->relation); + + /* Set up a pstate to parse with */ + pstate = make_parsestate(NULL); + pstate->p_sourcetext = nodeToString(targetrel->whereClause); + + rte = addRangeTableEntryForRelation(pstate, targetrel->relation, + AccessShareLock, + NULL, false, false); + addRTEtoQuery(pstate, rte, false, true, true); + + whereclause = transformWhereClause(pstate, + copyObject(targetrel->whereClause), + EXPR_KIND_PUBLICATION_WHERE, + "PUBLICATION"); + + /* Fix up collation information */ + assign_expr_collations(pstate, whereclause); /* Form a tuple. */ memset(values, 0, sizeof(values)); @@ -197,6 +221,12 @@ publication_add_relation(Oid pubid, Relation targetrel, values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + /* Add qualifications, if available */ + if (whereclause) + values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(whereclause)); + else + nulls[Anum_pg_publication_rel_prqual - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -213,11 +243,17 @@ publication_add_relation(Oid pubid, Relation targetrel, ObjectAddressSet(referenced, RelationRelationId, relid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + /* Add dependency on the objects mentioned in the qualifications */ + if (whereclause) + recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL); + + free_parsestate(pstate); + /* Close the table. */ table_close(rel, RowExclusiveLock); /* Invalidate relcache so that publication info is rebuilt. */ - CacheInvalidateRelcache(targetrel); + CacheInvalidateRelcache(targetrel->relation); return myself; } @@ -292,6 +328,62 @@ GetPublicationRelations(Oid pubid) } /* + * Gets list of PublicationRelationQuals for a publication. + */ +List * +GetPublicationRelationQuals(Oid pubid) +{ + List *result; + Relation pubrelsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all publications associated with the relation. */ + pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_rel_prpubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId, + true, NULL, 1, &scankey); + + result = NIL; + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_rel pubrel; + PublicationRelationQual *relqual; + Datum value_datum; + char *qual_value; + Node *qual_expr; + bool isnull; + + pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + + value_datum = heap_getattr(tup, Anum_pg_publication_rel_prqual, RelationGetDescr(pubrelsrel), &isnull); + if (!isnull) + { + qual_value = TextDatumGetCString(value_datum); + qual_expr = (Node *) stringToNode(qual_value); + } + else + qual_expr = NULL; + + relqual = palloc(sizeof(PublicationRelationQual)); + relqual->relation = table_open(pubrel->prrelid, ShareUpdateExclusiveLock); + relqual->whereClause = copyObject(qual_expr); + result = lappend(result, relqual); + } + + systable_endscan(scan); + heap_close(pubrelsrel, AccessShareLock); + + return result; +} + +/* * Gets list of publication oids for publications marked as FOR ALL TABLES. */ List * diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 4d48be0b92..6d56893c3e 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -344,6 +344,27 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, Assert(list_length(stmt->tables) > 0); + /* + * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause. Use + * publication_table_list node (that accepts a WHERE clause) but forbid the + * WHERE clause in it. The use of relation_expr_list node just for the + * DROP TABLE part does not worth the trouble. + */ + if (stmt->tableAction == DEFELEM_DROP) + { + ListCell *lc; + + foreach(lc, stmt->tables) + { + PublicationTable *t = lfirst(lc); + if (t->whereClause) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use a WHERE clause for removing table from publication \"%s\"", + NameStr(pubform->pubname)))); + } + } + rels = OpenTableList(stmt->tables); if (stmt->tableAction == DEFELEM_ADD) @@ -352,47 +373,55 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { - List *oldrelids = GetPublicationRelations(pubid); + List *oldrels = GetPublicationRelationQuals(pubid); List *delrels = NIL; ListCell *oldlc; /* Calculate which relations to drop. */ - foreach(oldlc, oldrelids) + foreach(oldlc, oldrels) { - Oid oldrelid = lfirst_oid(oldlc); + PublicationRelationQual *oldrel = lfirst(oldlc); + PublicationRelationQual *newrel; ListCell *newlc; bool found = false; foreach(newlc, rels) { - Relation newrel = (Relation) lfirst(newlc); + newrel = (PublicationRelationQual *) lfirst(newlc); - if (RelationGetRelid(newrel) == oldrelid) + if (RelationGetRelid(newrel->relation) == RelationGetRelid(oldrel->relation)) { found = true; break; } } - if (!found) + /* + * Remove publication / relation mapping iif (i) table is not found in + * the new list or (ii) table is found in the new list, however, + * its qual does not match the old one (in this case, a simple + * tuple update is not enough because of the dependencies). + */ + if (!found || (found && !equal(oldrel->whereClause, newrel->whereClause))) { - Relation oldrel = table_open(oldrelid, - ShareUpdateExclusiveLock); + PublicationRelationQual *oldrelqual = palloc(sizeof(PublicationRelationQual)); + oldrelqual->relation = table_open(RelationGetRelid(oldrel->relation), + ShareUpdateExclusiveLock); - delrels = lappend(delrels, oldrel); + delrels = lappend(delrels, oldrelqual); } } /* And drop them. */ PublicationDropTables(pubid, delrels, true); + CloseTableList(oldrels); + CloseTableList(delrels); /* * Don't bother calculating the difference for adding, we'll catch and * skip existing ones when doing catalog update. */ PublicationAddTables(pubid, rels, true, stmt); - - CloseTableList(delrels); } CloseTableList(rels); @@ -502,16 +531,18 @@ OpenTableList(List *tables) List *relids = NIL; List *rels = NIL; ListCell *lc; + PublicationRelationQual *relqual; /* * Open, share-lock, and check all the explicitly-specified relations */ foreach(lc, tables) { - RangeVar *rv = castNode(RangeVar, lfirst(lc)); - bool recurse = rv->inh; - Relation rel; - Oid myrelid; + PublicationTable *t = lfirst(lc); + RangeVar *rv = castNode(RangeVar, t->relation); + bool recurse = rv->inh; + Relation rel; + Oid myrelid; /* Allow query cancel in case this takes a long time */ CHECK_FOR_INTERRUPTS(); @@ -531,8 +562,10 @@ OpenTableList(List *tables) table_close(rel, ShareUpdateExclusiveLock); continue; } - - rels = lappend(rels, rel); + relqual = palloc(sizeof(PublicationRelationQual)); + relqual->relation = rel; + relqual->whereClause = t->whereClause; + rels = lappend(rels, relqual); relids = lappend_oid(relids, myrelid); /* Add children of this rel, if requested */ @@ -560,7 +593,11 @@ OpenTableList(List *tables) /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); - rels = lappend(rels, rel); + relqual = palloc(sizeof(PublicationRelationQual)); + relqual->relation = rel; + /* child inherits WHERE clause from parent */ + relqual->whereClause = t->whereClause; + rels = lappend(rels, relqual); relids = lappend_oid(relids, childrelid); } } @@ -581,10 +618,12 @@ CloseTableList(List *rels) foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc); - table_close(rel, NoLock); + table_close(rel->relation, NoLock); } + + list_free_deep(rels); } /* @@ -600,13 +639,13 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc); ObjectAddress obj; /* Must be owner of the table or superuser. */ - if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), - RelationGetRelationName(rel)); + if (!pg_class_ownercheck(RelationGetRelid(rel->relation), GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->relation->rd_rel->relkind), + RelationGetRelationName(rel->relation)); obj = publication_add_relation(pubid, rel, if_not_exists); if (stmt) @@ -632,8 +671,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); - Oid relid = RelationGetRelid(rel); + PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc); + Oid relid = RelationGetRelid(rel->relation); prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, ObjectIdGetDatum(relid), @@ -646,7 +685,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("relation \"%s\" is not part of the publication", - RelationGetRelationName(rel)))); + RelationGetRelationName(rel->relation)))); } ObjectAddressSet(obj, PublicationRelRelationId, prid); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 61cc59fe7c..2580da9deb 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -404,13 +404,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); relation_expr_list dostmt_opt_list transform_element_list transform_type_list TriggerTransitions TriggerReferencing - publication_name_list + publication_name_list publication_table_list vacuum_relation_list opt_vacuum_relation_list %type <list> group_by_list %type <node> group_by_item empty_grouping_set rollup_clause cube_clause %type <node> grouping_sets_clause -%type <node> opt_publication_for_tables publication_for_tables +%type <node> opt_publication_for_tables publication_for_tables publication_table_elem %type <value> publication_name_item %type <list> opt_fdw_options fdw_options @@ -9518,7 +9518,7 @@ opt_publication_for_tables: ; publication_for_tables: - FOR TABLE relation_expr_list + FOR TABLE publication_table_list { $$ = (Node *) $3; } @@ -9549,7 +9549,7 @@ AlterPublicationStmt: n->options = $5; $$ = (Node *)n; } - | ALTER PUBLICATION name ADD_P TABLE relation_expr_list + | ALTER PUBLICATION name ADD_P TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9557,7 +9557,7 @@ AlterPublicationStmt: n->tableAction = DEFELEM_ADD; $$ = (Node *)n; } - | ALTER PUBLICATION name SET TABLE relation_expr_list + | ALTER PUBLICATION name SET TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9565,7 +9565,7 @@ AlterPublicationStmt: n->tableAction = DEFELEM_SET; $$ = (Node *)n; } - | ALTER PUBLICATION name DROP TABLE relation_expr_list + | ALTER PUBLICATION name DROP TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9575,6 +9575,20 @@ AlterPublicationStmt: } ; +publication_table_list: + publication_table_elem { $$ = list_make1($1); } + | publication_table_list ',' publication_table_elem { $$ = lappend($1, $3); } + ; + +publication_table_elem: relation_expr OptWhereClause + { + PublicationTable *n = makeNode(PublicationTable); + n->relation = $1; + n->whereClause = $2; + $$ = (Node *) n; + } + ; + /***************************************************************************** * * CREATE SUBSCRIPTION name ... diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index c745fcdd2b..b11d159b54 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -544,6 +544,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr) err = _("grouping operations are not allowed in COPY FROM WHERE conditions"); break; + case EXPR_KIND_PUBLICATION_WHERE: + if (isAgg) + err = _("aggregate functions are not allowed in publication WHERE expressions"); + else + err = _("grouping operations are not allowed in publication WHERE expressions"); + + break; /* * There is intentionally no default: case here, so that the @@ -933,6 +940,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc, case EXPR_KIND_GENERATED_COLUMN: err = _("window functions are not allowed in column generation expressions"); break; + case EXPR_KIND_PUBLICATION_WHERE: + err = _("window functions are not allowed in publication WHERE expressions"); + break; /* * There is intentionally no default: case here, so that the diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index 8e136a7981..f82518afc8 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -170,6 +170,13 @@ transformExprRecurse(ParseState *pstate, Node *expr) /* Guard against stack overflow due to overly complex expressions */ check_stack_depth(); + /* Functions are not allowed in publication WHERE clauses */ + if (pstate->p_expr_kind == EXPR_KIND_PUBLICATION_WHERE && nodeTag(expr) == T_FuncCall) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functions are not allowed in WHERE"), + parser_errposition(pstate, exprLocation(expr)))); + switch (nodeTag(expr)) { case T_ColumnRef: @@ -571,6 +578,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) case EXPR_KIND_CALL_ARGUMENT: case EXPR_KIND_COPY_WHERE: case EXPR_KIND_GENERATED_COLUMN: + case EXPR_KIND_PUBLICATION_WHERE: /* okay */ break; @@ -1924,13 +1932,15 @@ transformSubLink(ParseState *pstate, SubLink *sublink) break; case EXPR_KIND_CALL_ARGUMENT: err = _("cannot use subquery in CALL argument"); - break; case EXPR_KIND_COPY_WHERE: err = _("cannot use subquery in COPY FROM WHERE condition"); break; case EXPR_KIND_GENERATED_COLUMN: err = _("cannot use subquery in column generation expression"); break; + case EXPR_KIND_PUBLICATION_WHERE: + err = _("cannot use subquery in publication WHERE expression"); + break; /* * There is intentionally no default: case here, so that the @@ -3563,6 +3573,8 @@ ParseExprKindName(ParseExprKind exprKind) return "WHERE"; case EXPR_KIND_GENERATED_COLUMN: return "GENERATED AS"; + case EXPR_KIND_PUBLICATION_WHERE: + return "publication expression"; /* * There is intentionally no default: case here, so that the diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c index 752cf1b315..50653a89d8 100644 --- a/src/backend/parser/parse_func.c +++ b/src/backend/parser/parse_func.c @@ -2529,6 +2529,9 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location) case EXPR_KIND_GENERATED_COLUMN: err = _("set-returning functions are not allowed in column generation expressions"); break; + case EXPR_KIND_PUBLICATION_WHERE: + err = _("set-returning functions are not allowed in publication WHERE expressions"); + break; /* * There is intentionally no default: case here, so that the diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 42db4ada9e..5468b694f6 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -637,19 +637,26 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. + * message provides during replication. This function also returns the relation + * qualifications to be used in COPY. */ static void fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel) + LogicalRepRelation *lrel, List **qual) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[2] = {OIDOID, CHAROID}; Oid attrRow[3] = {TEXTOID, OIDOID, BOOLOID}; + Oid qualRow[1] = {TEXTOID}; bool isnull; - int natt; + int n; + ListCell *lc; + bool first; + + /* Avoid trashing relation map cache */ + memset(lrel, 0, sizeof(LogicalRepRelation)); lrel->nspname = nspname; lrel->relname = relname; @@ -713,20 +720,20 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->atttyps = palloc0(res->ntuples * sizeof(Oid)); lrel->attkeys = NULL; - natt = 0; + n = 0; slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - lrel->attnames[natt] = + lrel->attnames[n] = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); Assert(!isnull); - lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); + lrel->atttyps[n] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); Assert(!isnull); if (DatumGetBool(slot_getattr(slot, 3, &isnull))) - lrel->attkeys = bms_add_member(lrel->attkeys, natt); + lrel->attkeys = bms_add_member(lrel->attkeys, n); /* Should never happen. */ - if (++natt >= MaxTupleAttributeNumber) + if (++n >= MaxTupleAttributeNumber) elog(ERROR, "too many columns in remote table \"%s.%s\"", nspname, relname); @@ -734,7 +741,46 @@ fetch_remote_table_info(char *nspname, char *relname, } ExecDropSingleTupleTableSlot(slot); - lrel->natts = natt; + lrel->natts = n; + + walrcv_clear_result(res); + + /* Get relation qual */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT pg_get_expr(prqual, prrelid) FROM pg_publication p INNER JOIN pg_publication_rel pr ON (p.oid = pr.prpubid) WHERE pr.prrelid = %u AND p.pubname IN (", lrel->remoteid); + + first = true; + foreach(lc, MySubscription->publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_literal_cstr(pubname)); + } + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 1, qualRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch relation qualifications for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + Datum rf = slot_getattr(slot, 1, &isnull); + + if (!isnull) + *qual = lappend(*qual, makeString(TextDatumGetCString(rf))); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); pfree(cmd.data); @@ -750,6 +796,7 @@ copy_table(Relation rel) { LogicalRepRelMapEntry *relmapentry; LogicalRepRelation lrel; + List *qual = NIL; WalRcvExecResult *res; StringInfoData cmd; CopyState cstate; @@ -758,7 +805,7 @@ copy_table(Relation rel) /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel); + RelationGetRelationName(rel), &lrel, &qual); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -767,10 +814,57 @@ copy_table(Relation rel) relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); + /* list of columns for COPY */ + attnamelist = make_copy_attnamelist(relmapentry); + /* Start copy on the publisher. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "COPY %s TO STDOUT", - quote_qualified_identifier(lrel.nspname, lrel.relname)); + /* + * If publication has any row filter, build a SELECT query with OR'ed row + * filters for COPY. If no row filters are available, use COPY for all + * table contents. + */ + if (list_length(qual) > 0) + { + ListCell *lc; + bool first; + + appendStringInfoString(&cmd, "COPY (SELECT "); + /* list of attribute names */ + first = true; + foreach(lc, attnamelist) + { + char *col = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + appendStringInfo(&cmd, "%s", quote_identifier(col)); + } + appendStringInfo(&cmd, " FROM %s", + quote_qualified_identifier(lrel.nspname, lrel.relname)); + appendStringInfoString(&cmd, " WHERE "); + /* list of OR'ed filters */ + first = true; + foreach(lc, qual) + { + char *q = strVal(lfirst(lc)); + if (first) + first = false; + else + appendStringInfoString(&cmd, " OR "); + appendStringInfo(&cmd, "%s", q); + } + + appendStringInfoString(&cmd, ") TO STDOUT"); + list_free_deep(qual); + } + else + { + appendStringInfo(&cmd, "COPY %s TO STDOUT", + quote_qualified_identifier(lrel.nspname, lrel.relname)); + } res = walrcv_exec(wrconn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) @@ -785,7 +879,6 @@ copy_table(Relation rel) addRangeTableEntryForRelation(pstate, rel, AccessShareLock, NULL, false, false); - attnamelist = make_copy_attnamelist(relmapentry); cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL); /* Do the copy */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 31fc7c5048..22b95d52b5 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -172,7 +172,7 @@ ensure_transaction(void) * * This is based on similar code in copy.c */ -static EState * +EState * create_estate_for_relation(Relation rel) { EState *estate; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 63687a97ec..49f533280b 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -12,13 +12,24 @@ */ #include "postgres.h" +#include "catalog/pg_type.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel.h" + +#include "executor/executor.h" +#include "nodes/execnodes.h" +#include "nodes/nodeFuncs.h" +#include "optimizer/planner.h" +#include "optimizer/optimizer.h" +#include "parser/parse_coerce.h" #include "replication/logical.h" #include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/inval.h" #include "utils/int8.h" #include "utils/memutils.h" @@ -58,6 +69,7 @@ typedef struct RelationSyncEntry bool schema_sent; /* did we send the schema? */ bool replicate_valid; PublicationActions pubactions; + List *qual; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -333,6 +345,65 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* ... then check row filter */ + if (list_length(relentry->qual) > 0) + { + HeapTuple old_tuple; + HeapTuple new_tuple; + TupleDesc tupdesc; + EState *estate; + ExprContext *ecxt; + MemoryContext oldcxt; + ListCell *lc; + bool matched = true; + + old_tuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; + new_tuple = change->data.tp.newtuple ? &change->data.tp.newtuple->tuple : NULL; + tupdesc = RelationGetDescr(relation); + estate = create_estate_for_relation(relation); + + /* prepare context per tuple */ + ecxt = GetPerTupleExprContext(estate); + oldcxt = MemoryContextSwitchTo(estate->es_query_cxt); + ecxt->ecxt_scantuple = ExecInitExtraTupleSlot(estate, tupdesc, &TTSOpsVirtual); + + ExecStoreHeapTuple(new_tuple ? new_tuple : old_tuple, ecxt->ecxt_scantuple, false); + + foreach (lc, relentry->qual) + { + Node *qual; + ExprState *expr_state; + Expr *expr; + Oid expr_type; + Datum res; + bool isnull; + + qual = (Node *) lfirst(lc); + + /* evaluates row filter */ + expr_type = exprType(qual); + expr = (Expr *) coerce_to_target_type(NULL, qual, expr_type, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1); + expr = expression_planner(expr); + expr_state = ExecInitExpr(expr, NULL); + res = ExecEvalExpr(expr_state, ecxt, &isnull); + + /* if tuple does not match row filter, bail out */ + if (!DatumGetBool(res) || isnull) + { + matched = false; + break; + } + } + + MemoryContextSwitchTo(oldcxt); + + ExecDropSingleTupleTableSlot(ecxt->ecxt_scantuple); + FreeExecutorState(estate); + + if (!matched) + return; + } + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -568,10 +639,14 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) */ entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->qual = NIL; foreach(lc, data->publications) { Publication *pub = lfirst(lc); + HeapTuple rf_tuple; + Datum rf_datum; + bool rf_isnull; if (pub->alltables || list_member_oid(pubids, pub->oid)) { @@ -581,9 +656,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } - if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) - break; + /* Cache row filters, if available */ + rf_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(rf_tuple)) + { + rf_datum = SysCacheGetAttr(PUBLICATIONRELMAP, rf_tuple, Anum_pg_publication_rel_prqual, &rf_isnull); + + if (!rf_isnull) + { + MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext); + char *s = TextDatumGetCString(rf_datum); + Node *rf_node = stringToNode(s); + entry->qual = lappend(entry->qual, rf_node); + MemoryContextSwitchTo(oldctx); + } + + ReleaseSysCache(rf_tuple); + } } list_free(pubids); @@ -658,5 +747,10 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) */ hash_seq_init(&status, RelationSyncCache); while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) + { entry->replicate_valid = false; + if (list_length(entry->qual) > 0) + list_free_deep(entry->qual); + entry->qual = NIL; + } } diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 2dad24fc9f..74ab2c25d1 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -78,15 +78,22 @@ typedef struct Publication PublicationActions pubactions; } Publication; +typedef struct PublicationRelationQual +{ + Relation relation; + Node *whereClause; +} PublicationRelationQual; + extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); extern List *GetPublicationRelations(Oid pubid); +extern List *GetPublicationRelationQuals(Oid pubid); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(void); extern bool is_publishable_relation(Relation rel); -extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, +extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationQual *targetrel, bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index 5f5bc92ab3..a75b2d5345 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -28,9 +28,13 @@ */ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) { - Oid oid; /* oid */ - Oid prpubid; /* Oid of the publication */ - Oid prrelid; /* Oid of the relation */ + Oid oid; /* oid */ + Oid prpubid; /* Oid of the publication */ + Oid prrelid; /* Oid of the relation */ + +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + pg_node_tree prqual; /* qualifications */ +#endif } FormData_pg_publication_rel; /* ---------------- diff --git a/src/include/catalog/toasting.h b/src/include/catalog/toasting.h index 5ee628c837..aedf27b483 100644 --- a/src/include/catalog/toasting.h +++ b/src/include/catalog/toasting.h @@ -66,6 +66,7 @@ DECLARE_TOAST(pg_namespace, 4163, 4164); DECLARE_TOAST(pg_partitioned_table, 4165, 4166); DECLARE_TOAST(pg_policy, 4167, 4168); DECLARE_TOAST(pg_proc, 2836, 2837); +DECLARE_TOAST(pg_publication_rel, 8287, 8288); DECLARE_TOAST(pg_rewrite, 2838, 2839); DECLARE_TOAST(pg_seclabel, 3598, 3599); DECLARE_TOAST(pg_statistic, 2840, 2841); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index ffb4cd4bcc..4e624317b0 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -476,6 +476,7 @@ typedef enum NodeTag T_PartitionRangeDatum, T_PartitionCmd, T_VacuumRelation, + T_PublicationTable, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 12e9730dd0..91cd750047 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3462,12 +3462,19 @@ typedef struct AlterTSConfigurationStmt } AlterTSConfigurationStmt; +typedef struct PublicationTable +{ + NodeTag type; + RangeVar *relation; /* relation to be published */ + Node *whereClause; /* qualifications */ +} PublicationTable; + typedef struct CreatePublicationStmt { NodeTag type; char *pubname; /* Name of the publication */ List *options; /* List of DefElem nodes */ - List *tables; /* Optional list of tables to add */ + List *tables; /* Optional list of PublicationTable to add */ bool for_all_tables; /* Special publication for all tables in db */ } CreatePublicationStmt; @@ -3480,7 +3487,7 @@ typedef struct AlterPublicationStmt List *options; /* List of DefElem nodes */ /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ - List *tables; /* List of tables to add/drop */ + List *tables; /* List of PublicationTable to add/drop */ bool for_all_tables; /* Special publication for all tables in db */ DefElemAction tableAction; /* What action to perform with the tables */ } AlterPublicationStmt; diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h index 3d8039aa51..048a445030 100644 --- a/src/include/parser/parse_node.h +++ b/src/include/parser/parse_node.h @@ -73,6 +73,7 @@ typedef enum ParseExprKind EXPR_KIND_CALL_ARGUMENT, /* procedure argument in CALL */ EXPR_KIND_COPY_WHERE, /* WHERE condition in COPY FROM */ EXPR_KIND_GENERATED_COLUMN, /* generation expression for a column */ + EXPR_KIND_PUBLICATION_WHERE /* WHERE condition for a table in PUBLICATION */ } ParseExprKind; diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 85e0b6ea62..29af52ce3a 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -39,4 +39,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp); extern char *logicalrep_typmap_gettypname(Oid remoteid); +extern EState *create_estate_for_relation(Relation rel); + #endif /* LOGICALRELATION_H */ diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index afbbdd543d..cf67b7b186 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -107,6 +107,35 @@ Tables: DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +CREATE TABLE testpub_rf_tbl1 (a integer, b text); +CREATE TABLE testpub_rf_tbl2 (c text, d integer); +CREATE TABLE testpub_rf_tbl3 (e integer); +CREATE TABLE testpub_rf_tbl4 (g text); +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5); +RESET client_min_messages; +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000); +ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2; +-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression) +ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500); +-- fail - functions disallowed +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6); +ERROR: functions are not allowed in WHERE +LINE 1: ...ICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) ... + ^ +\dRp+ testpub5 + Publication testpub5 + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t +Tables: + "public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500)) + +DROP TABLE testpub_rf_tbl1; +DROP TABLE testpub_rf_tbl2; +DROP TABLE testpub_rf_tbl3; +DROP TABLE testpub_rf_tbl4; +DROP PUBLICATION testpub5; -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; ERROR: "testpub_view" is not a table diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 815410b3c5..20c874eb67 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -60,6 +60,27 @@ CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +CREATE TABLE testpub_rf_tbl1 (a integer, b text); +CREATE TABLE testpub_rf_tbl2 (c text, d integer); +CREATE TABLE testpub_rf_tbl3 (e integer); +CREATE TABLE testpub_rf_tbl4 (g text); +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5); +RESET client_min_messages; +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000); +ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2; +-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression) +ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500); +-- fail - functions disallowed +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6); +\dRp+ testpub5 + +DROP TABLE testpub_rf_tbl1; +DROP TABLE testpub_rf_tbl2; +DROP TABLE testpub_rf_tbl3; +DROP TABLE testpub_rf_tbl4; +DROP PUBLICATION testpub5; + -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1, pub_test.testpub_nopk; diff --git a/src/test/subscription/t/013_row_filter.pl b/src/test/subscription/t/013_row_filter.pl new file mode 100644 index 0000000000..99e6db94d6 --- /dev/null +++ b/src/test/subscription/t/013_row_filter.pl @@ -0,0 +1,96 @@ +# Test logical replication behavior with row filtering +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 4; + +# create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# setup structure on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_1 (a int primary key, b text)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_2 (c int primary key)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)"); + +# setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_1 (a int primary key, b text)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_2 (c int primary key)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)"); + +# setup logical replication +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_1 FOR TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')"); + +my $result = $node_publisher->psql('postgres', + "ALTER PUBLICATION tap_pub_1 DROP TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')"); +is($result, 3, "syntax error for ALTER PUBLICATION DROP TABLE"); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_1 ADD TABLE tab_rowfilter_2 WHERE (c % 7 = 0)"); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_1 SET TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered'), tab_rowfilter_2 WHERE (c % 2 = 0), tab_rowfilter_3"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_2 FOR TABLE tab_rowfilter_2 WHERE (c % 3 = 0)"); + +# test row filtering +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1, 'not replicated')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1500, 'filtered')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1980, 'not filtered')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) SELECT x, 'test ' || x FROM generate_series(990,1002) x"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_2 (c) SELECT generate_series(1, 10)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_3 (a, b) SELECT x, (x % 3 = 0) FROM generate_series(1, 10) x"); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2" +); + +$node_publisher->wait_for_catchup($appname); + +# wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +#$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab_rowfilter_1"); +is($result, qq(1980|not filtered +1001|test 1001 +1002|test 1002), 'check filtered data was copied to subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(c), min(c), max(c) FROM tab_rowfilter_2"); +is($result, qq(7|2|10), 'check filtered data was copied to subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(a) FROM tab_rowfilter_3"); +is($result, qq(10), 'check filtered data was copied to subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 2.11.0
From 2548f727b6f5d61e87de70ec661335fe82ce6ffa Mon Sep 17 00:00:00 2001 From: Euler Taveira <eu...@timbira.com.br> Date: Sat, 15 Sep 2018 02:52:00 +0000 Subject: [PATCH 7/8] Publication where condition support for pg_dump --- src/bin/pg_dump/pg_dump.c | 15 +++++++++++++-- src/bin/pg_dump/pg_dump.h | 1 + 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 8b993d6eae..b41d9fd477 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3930,6 +3930,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) int i_tableoid; int i_oid; int i_pubname; + int i_pubrelqual; int i, j, ntups; @@ -3962,7 +3963,8 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) /* Get the publication membership for the table. */ appendPQExpBuffer(query, - "SELECT pr.tableoid, pr.oid, p.pubname " + "SELECT pr.tableoid, pr.oid, p.pubname, " + "pg_catalog.pg_get_expr(pr.prqual, pr.prrelid) AS pubrelqual " "FROM pg_publication_rel pr, pg_publication p " "WHERE pr.prrelid = '%u'" " AND p.oid = pr.prpubid", @@ -3983,6 +3985,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) i_tableoid = PQfnumber(res, "tableoid"); i_oid = PQfnumber(res, "oid"); i_pubname = PQfnumber(res, "pubname"); + i_pubrelqual = PQfnumber(res, "pubrelqual"); pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -3998,6 +4001,11 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname)); pubrinfo[j].pubtable = tbinfo; + if (PQgetisnull(res, j, i_pubrelqual)) + pubrinfo[j].pubrelqual = NULL; + else + pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, j, i_pubrelqual)); + /* Decide whether we want to dump it */ selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout); } @@ -4026,8 +4034,11 @@ dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo) appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY", fmtId(pubrinfo->pubname)); - appendPQExpBuffer(query, " %s;\n", + appendPQExpBuffer(query, " %s", fmtQualifiedDumpable(tbinfo)); + if (pubrinfo->pubrelqual) + appendPQExpBuffer(query, " WHERE %s", pubrinfo->pubrelqual); + appendPQExpBufferStr(query, ";\n"); /* * There is no point in creating drop query as drop query as the drop is diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 4f9ebb4904..d03eaa1dca 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -612,6 +612,7 @@ typedef struct _PublicationRelInfo DumpableObject dobj; TableInfo *pubtable; char *pubname; + char *pubrelqual; } PublicationRelInfo; /* -- 2.11.0
From 72cfcf940ae2100f0bf8728983bf7313a2d9ef83 Mon Sep 17 00:00:00 2001 From: Euler Taveira <eu...@timbira.com.br> Date: Thu, 17 May 2018 20:52:28 +0000 Subject: [PATCH 6/8] Print publication WHERE condition in psql --- src/bin/psql/describe.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index ee00c5da08..872a544410 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5816,7 +5816,8 @@ describePublications(const char *pattern) if (!puballtables) { printfPQExpBuffer(&buf, - "SELECT n.nspname, c.relname\n" + "SELECT n.nspname, c.relname,\n" + " pg_get_expr(pr.prqual, c.oid)\n" "FROM pg_catalog.pg_class c,\n" " pg_catalog.pg_namespace n,\n" " pg_catalog.pg_publication_rel pr\n" @@ -5846,6 +5847,10 @@ describePublications(const char *pattern) PQgetvalue(tabres, j, 0), PQgetvalue(tabres, j, 1)); + if (!PQgetisnull(tabres, j, 2)) + appendPQExpBuffer(&buf, " WHERE %s", + PQgetvalue(tabres, j, 2)); + printTableAddFooter(&cont, buf.data); } PQclear(tabres); -- 2.11.0
From c947168de90527d30d850f8c5c4bd6e090521500 Mon Sep 17 00:00:00 2001 From: Euler Taveira <eu...@timbira.com.br> Date: Wed, 14 Mar 2018 00:53:17 +0000 Subject: [PATCH 8/8] Debug for row filtering --- src/backend/commands/publicationcmds.c | 11 +++++ src/backend/replication/logical/tablesync.c | 1 + src/backend/replication/pgoutput/pgoutput.c | 66 +++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+) diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 6d56893c3e..65294f2100 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -333,6 +333,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, List *rels = NIL; Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); Oid pubid = pubform->oid; + ListCell *lc; /* Check that user is allowed to manipulate the publication tables. */ if (pubform->puballtables) @@ -344,6 +345,16 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, Assert(list_length(stmt->tables) > 0); + foreach(lc, stmt->tables) + { + PublicationTable *t = lfirst(lc); + + if (t->whereClause == NULL) + elog(DEBUG3, "publication \"%s\" has no WHERE clause", NameStr(pubform->pubname)); + else + elog(DEBUG3, "publication \"%s\" has WHERE clause", NameStr(pubform->pubname)); + } + /* * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause. Use * publication_table_list node (that accepts a WHERE clause) but forbid the diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 5468b694f6..1fc7d5647b 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -865,6 +865,7 @@ copy_table(Relation rel) appendStringInfo(&cmd, "COPY %s TO STDOUT", quote_qualified_identifier(lrel.nspname, lrel.relname)); } + elog(DEBUG2, "COPY for initial synchronization: %s", cmd.data); res = walrcv_exec(wrconn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 49f533280b..4aff5cb515 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -32,6 +32,7 @@ #include "utils/builtins.h" #include "utils/inval.h" #include "utils/int8.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -321,6 +322,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContext old; RelationSyncEntry *relentry; + Form_pg_class class_form; + char *schemaname; + char *tablename; + if (!is_publishable_relation(relation)) return; @@ -345,6 +350,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + class_form = RelationGetForm(relation); + schemaname = get_namespace_name(class_form->relnamespace); + tablename = NameStr(class_form->relname); + + if (change->action == REORDER_BUFFER_CHANGE_INSERT) + elog(DEBUG1, "INSERT \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid); + else if (change->action == REORDER_BUFFER_CHANGE_UPDATE) + elog(DEBUG1, "UPDATE \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid); + else if (change->action == REORDER_BUFFER_CHANGE_DELETE) + elog(DEBUG1, "DELETE \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid); + /* ... then check row filter */ if (list_length(relentry->qual) > 0) { @@ -362,6 +378,42 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, tupdesc = RelationGetDescr(relation); estate = create_estate_for_relation(relation); +#ifdef _NOT_USED + if (old_tuple) + { + int i; + + for (i = 0; i < tupdesc->natts; i++) + { + Form_pg_attribute attr; + HeapTuple type_tuple; + Oid typoutput; + bool typisvarlena; + bool isnull; + Datum val; + char *outputstr = NULL; + + attr = TupleDescAttr(tupdesc, i); + + /* Figure out type name */ + type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(attr->atttypid)); + if (HeapTupleIsValid(type_tuple)) + { + /* Get information needed for printing values of a type */ + getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena); + + val = heap_getattr(old_tuple, i + 1, tupdesc, &isnull); + if (!isnull) + { + outputstr = OidOutputFunctionCall(typoutput, val); + elog(DEBUG2, "row filter: REPLICA IDENTITY %s: %s", NameStr(attr->attname), outputstr); + pfree(outputstr); + } + } + } + } +#endif + /* prepare context per tuple */ ecxt = GetPerTupleExprContext(estate); oldcxt = MemoryContextSwitchTo(estate->es_query_cxt); @@ -377,6 +429,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid expr_type; Datum res; bool isnull; + char *s = NULL; qual = (Node *) lfirst(lc); @@ -387,12 +440,22 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, expr_state = ExecInitExpr(expr, NULL); res = ExecEvalExpr(expr_state, ecxt, &isnull); + elog(DEBUG3, "row filter: result: %s ; isnull: %s", (DatumGetBool(res)) ? "true" : "false", (isnull) ? "true" : "false"); + /* if tuple does not match row filter, bail out */ if (!DatumGetBool(res) || isnull) { + s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(qual)), ObjectIdGetDatum(relentry->relid))); + elog(DEBUG2, "row filter \"%s\" was not matched", s); + pfree(s); + matched = false; break; } + + s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(qual)), ObjectIdGetDatum(relentry->relid))); + elog(DEBUG2, "row filter \"%s\" was matched", s); + pfree(s); } MemoryContextSwitchTo(oldcxt); @@ -666,9 +729,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext); char *s = TextDatumGetCString(rf_datum); + char *t = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, rf_datum, ObjectIdGetDatum(entry->relid))); Node *rf_node = stringToNode(s); entry->qual = lappend(entry->qual, rf_node); MemoryContextSwitchTo(oldctx); + + elog(DEBUG2, "row filter \"%s\" found for publication \"%s\" and relation \"%s\"", t, pub->name, get_rel_name(relid)); } ReleaseSysCache(rf_tuple); -- 2.11.0