Hi,
I'm going to register this in CF for feedback.
Summary for potential reviewers: we don't use declarative partitions in
the Timescale partitioning scheme, but it'd be really nice to be able to
replicate between our tables and standard tables, or between two
Timescale-partitioned tables with different layouts. This patch lets
extensions (or savvy users) upgrade an existing inheritance relationship
between two tables into a "logical partition" relationship, so that they
can be handled with the publish_via_partition_root machinery.
I hope this might also help pg_partman users migrate between old- and
new-style partition schemes, but that's speculation.
On 1/20/23 09:53, Jacob Champion wrote:
>> 2) While this strategy works well for ongoing replication, it's not
>> enough to get the initial synchronization correct. The subscriber
>> still does a COPY of the root table directly, missing out on all the
>> logical descendant data. The publisher will have to tell the
>> subscriber about the relationship somehow, and older subscriber
>> versions won't understand how to use that (similar to how old
>> subscribers can't correctly handle row filters).
>
> I partially solved this by having the subscriber pull the logical
> hierarchy from the publisher to figure out which tables to COPY. This
> works when publish_via_partition_root=true, but it doesn't correctly
> return to the previous behavior when the setting is false. I need to
> check the publication setting from the subscriber, too, but that opens
> up the question of what to do if two different publications conflict.
Second draft attached, which fixes that bug. I kept thinking to myself
that this would be much easier if the publisher told the subscriber what
data to copy rather than having the subscriber hardcode the initial sync
process... and then I realized that I could, sort of, move in that
direction.
This version adds a SQL function to determine the list of source tables
to COPY into a subscriber's target table. Now the publisher can make use
of whatever catalogs it needs to make that list and the subscriber
doesn't need to couple to them. (This could also provide a way for
publishers to provide more generic "table indirection" in the future,
but I'm wary of selling genericism as a feature here.)
I haven't solved the problem where two publications of the same table
have different settings for publish_via_partition_root. I was curious to
see how the existing partition code prevented problems, but I'm not
really sure that it does... Here are some situations where the existing
implementation duplicates data on the initial sync:
1) A single subscription to two publications, one with
publish_via_partition_root on and the other off, which publish the same
partitioned table
2) A single subscription to two publications with
publish_via_partition_root on, one of which publishes a root partition
and the other of which publishes a descendant/leaf
3) A single subscription to two publications with
publish_via_partition_root on, one of which publishes FOR ALL TABLES and
the other of which publishes a descendant/leaf
Is it expected that DBAs should avoid these cases, or are they worth
pursuing with a bug fix?
Thanks,
--Jacob
From 6d31d13f9d7c400cf717426c4f336f7ad19a4391 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jchamp...@timescale.com>
Date: Mon, 26 Sep 2022 13:23:51 -0700
Subject: [PATCH] WIP: introduce pg_set_logical_root for use with pubviaroot
Allows regular inherited tables to be published via their root table,
just like partitions. This works by hijacking pg_inherit's inhseqno
column, and replacing a (single) existing entry for the child with the
value zero, indicating that it should be treated as a logical partition
by the publication machinery.
Initial sync works by asking the publisher for a list of logical
descendants of the published table, then COPYing them one-by-one into
the root. The publisher reuses the existing pubviaroot logic, adding the
new logical roots to code that previously looked only for partition
roots.
Known bugs/TODOs:
- The pg_inherits machinery doesn't prohibit changes to inheritance
after an entry has been marked as a logical root.
- I haven't given any thought to interactions with row filters, or to
column lists, or to multiple publications with conflicting pubviaroot
settings.
- pg_set_logical_root() doesn't check for table ownership yet. Anyone
can muck with pg_inherits through it.
- I'm not sure that I'm taking all the necessary locks yet, and those I
do take may be taken in the wrong order.
---
src/backend/catalog/pg_inherits.c | 202 ++++++++++++++++-
src/backend/catalog/pg_publication.c | 120 +++++++++-
src/backend/commands/publicationcmds.c | 10 +
src/backend/partitioning/partdesc.c | 3 +-
src/backend/replication/logical/tablesync.c | 235 +++++++++++++------
src/backend/replication/pgoutput/pgoutput.c | 54 ++---
src/include/catalog/pg_inherits.h | 7 +-
src/include/catalog/pg_proc.dat | 10 +
src/test/regress/expected/publication.out | 32 +++
src/test/regress/sql/publication.sql | 25 ++
src/test/subscription/t/013_partition.pl | 239 ++++++++++++++++++++
11 files changed, 817 insertions(+), 120 deletions(-)
diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c
index da969bd2f9..659f5125f1 100644
--- a/src/backend/catalog/pg_inherits.c
+++ b/src/backend/catalog/pg_inherits.c
@@ -24,14 +24,20 @@
#include "access/table.h"
#include "catalog/indexing.h"
#include "catalog/pg_inherits.h"
+#include "catalog/partition.h"
#include "parser/parse_type.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
+#include "utils/fmgrprotos.h"
+#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
+static List * find_all_inheritors_internal(Oid parentrelId, LOCKMODE lockmode,
+ List **numparents, bool logical_only);
+
/*
* Entry of a hash table used in find_all_inheritors. See below.
*/
@@ -59,14 +65,14 @@ List *
find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
{
return find_inheritance_children_extended(parentrelId, true, lockmode,
- NULL, NULL);
+ NULL, NULL, false);
}
/*
* find_inheritance_children_extended
*
* As find_inheritance_children, with more options regarding detached
- * partitions.
+ * partitions and logical roots.
*
* If a partition's pg_inherits row is marked "detach pending",
* *detached_exist (if not null) is set true.
@@ -78,16 +84,21 @@ find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
* whether the transaction that marked those partitions as detached appears
* committed to the active snapshot. In addition, *detached_xmin (if not null)
* is set to the xmin of the row of the detached partition.
+ *
+ * If logical_only is true, only tables marked explicitly via
+ * pg_set_logical_root() are included in the output list.
*/
List *
find_inheritance_children_extended(Oid parentrelId, bool omit_detached,
LOCKMODE lockmode, bool *detached_exist,
- TransactionId *detached_xmin)
+ TransactionId *detached_xmin,
+ bool logical_only)
{
List *list = NIL;
Relation relation;
+ Oid index;
SysScanDesc scan;
- ScanKeyData key[1];
+ ScanKeyData key[2];
HeapTuple inheritsTuple;
Oid inhrelid;
Oid *oidarr;
@@ -110,14 +121,24 @@ find_inheritance_children_extended(Oid parentrelId, bool omit_detached,
numoids = 0;
relation = table_open(InheritsRelationId, AccessShareLock);
+ index = InheritsParentIndexId;
ScanKeyInit(&key[0],
Anum_pg_inherits_inhparent,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(parentrelId));
- scan = systable_beginscan(relation, InheritsParentIndexId, true,
- NULL, 1, key);
+ if (logical_only)
+ {
+ index = InheritsParentSeqnoIndexId;
+ ScanKeyInit(&key[1],
+ Anum_pg_inherits_inhseqno,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum(0));
+ }
+
+ scan = systable_beginscan(relation, index, true,
+ NULL, logical_only ? 2 : 1, key);
while ((inheritsTuple = systable_getnext(scan)) != NULL)
{
@@ -254,6 +275,20 @@ find_inheritance_children_extended(Oid parentrelId, bool omit_detached,
*/
List *
find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
+{
+ return find_all_inheritors_internal(parentrelId, lockmode, numparents,
+ false);
+}
+
+List *
+find_all_logical_inheritors(Oid parentrelId)
+{
+ return find_all_inheritors_internal(parentrelId, NoLock, NULL, true);
+}
+
+static List *
+find_all_inheritors_internal(Oid parentrelId, LOCKMODE lockmode,
+ List **numparents, bool logical_only)
{
/* hash table for O(1) rel_oid -> rel_numparents cell lookup */
HTAB *seen_rels;
@@ -290,7 +325,9 @@ find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
ListCell *lc;
/* Get the direct children of this rel */
- currentchildren = find_inheritance_children(currentrel, lockmode);
+ currentchildren =
+ find_inheritance_children_extended(currentrel, true, lockmode,
+ NULL, NULL, logical_only);
/*
* Add to the queue only those children not already seen. This avoids
@@ -655,3 +692,154 @@ PartitionHasPendingDetach(Oid partoid)
elog(ERROR, "relation %u is not a partition", partoid);
return false; /* keep compiler quiet */
}
+
+static Oid
+get_logical_parent_worker(Relation inhRel, Oid relid)
+{
+ SysScanDesc scan;
+ ScanKeyData key[2];
+ Oid result = InvalidOid;
+ HeapTuple tuple;
+
+ ScanKeyInit(&key[0],
+ Anum_pg_inherits_inhrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(relid));
+ ScanKeyInit(&key[1],
+ Anum_pg_inherits_inhseqno,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum(0));
+
+ scan = systable_beginscan(inhRel, InheritsRelidSeqnoIndexId, true,
+ NULL, 2, key);
+ tuple = systable_getnext(scan);
+ if (HeapTupleIsValid(tuple))
+ {
+ Form_pg_inherits form = (Form_pg_inherits) GETSTRUCT(tuple);
+ result = form->inhparent;
+ }
+
+ systable_endscan(scan);
+
+ return result;
+}
+
+static void
+get_logical_ancestors_worker(Relation inhRel, Oid relid, List **ancestors)
+{
+ Oid parentOid;
+
+ /*
+ * Recursion ends at the topmost level, ie., when there's no parent.
+ */
+ parentOid = get_logical_parent_worker(inhRel, relid);
+ if (parentOid == InvalidOid)
+ return;
+
+ *ancestors = lappend_oid(*ancestors, parentOid);
+ get_logical_ancestors_worker(inhRel, parentOid, ancestors);
+}
+
+List *
+get_logical_ancestors(Oid relid, bool is_partition)
+{
+ List *result = NIL;
+ Relation inhRel;
+
+ /* For partitions, this is identical to get_partition_ancestors(). */
+ if (is_partition)
+ return get_partition_ancestors(relid);
+
+ inhRel = table_open(InheritsRelationId, AccessShareLock);
+ get_logical_ancestors_worker(inhRel, relid, &result);
+ table_close(inhRel, AccessShareLock);
+
+ return result;
+}
+
+bool
+has_logical_parent(Relation inhRel, Oid relid)
+{
+ return (get_logical_parent_worker(inhRel, relid) != InvalidOid);
+}
+
+Datum
+pg_set_logical_root(PG_FUNCTION_ARGS)
+{
+ Oid tableoid = PG_GETARG_OID(0);
+ Oid rootoid = PG_GETARG_OID(1);
+ char *tablename;
+ char *rootname;
+ Relation inhRel;
+ ScanKeyData key;
+ SysScanDesc scan;
+ Oid parent = InvalidOid;
+ HeapTuple tuple, copyTuple;
+ Form_pg_inherits form;
+
+ /*
+ * Check that the tables exist.
+ * TODO: check inheritance
+ * TODO: and identical schemas too? or does replication handle that?
+ * TODO: check ownership
+ */
+ tablename = get_rel_name(tableoid);
+ if (tablename == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("OID %u does not refer to a table", tableoid)));
+ rootname = get_rel_name(rootoid);
+ if (rootname == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("OID %u does not refer to a table", rootoid)));
+
+ /* Open pg_inherits with RowExclusiveLock so that we can update it. */
+ inhRel = table_open(InheritsRelationId, RowExclusiveLock);
+
+ /*
+ * We have to make sure that the inheritance relationship already exists,
+ * and that there is only one existing parent for this table.
+ *
+ * TODO: do we have to lock the tables themselves to avoid races?
+ */
+ ScanKeyInit(&key,
+ Anum_pg_inherits_inhrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(tableoid));
+
+ scan = systable_beginscan(inhRel, InheritsRelidSeqnoIndexId, true,
+ NULL, 1, &key);
+ tuple = systable_getnext(scan);
+ if (HeapTupleIsValid(tuple))
+ {
+ form = (Form_pg_inherits) GETSTRUCT(tuple);
+ parent = form->inhparent;
+ copyTuple = heap_copytuple(tuple);
+
+ if (HeapTupleIsValid(systable_getnext(scan)))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("table \"%s\" inherits from multiple tables",
+ tablename)));
+ }
+
+ if (parent != rootoid)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("table \"%s\" does not inherit from intended root table \"%s\"",
+ tablename, rootname)));
+
+ systable_endscan(scan);
+
+ /* Mark the inheritance as a logical root by setting it to zero. */
+ form = (Form_pg_inherits) GETSTRUCT(copyTuple);
+ form->inhseqno = 0;
+
+ CatalogTupleUpdate(inhRel, ©Tuple->t_self, copyTuple);
+
+ heap_freetuple(copyTuple);
+ table_close(inhRel, RowExclusiveLock);
+
+ PG_RETURN_VOID();
+}
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index a98fcad421..04f7dc8099 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -172,11 +172,11 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
}
/*
- * Filter out the partitions whose parent tables were also specified in
+ * Filter out the tables whose logical parent tables were also specified in
* the publication.
*/
static List *
-filter_partitions(List *relids)
+filter_logical_descendants(List *relids)
{
List *result = NIL;
ListCell *lc;
@@ -188,8 +188,7 @@ filter_partitions(List *relids)
List *ancestors = NIL;
Oid relid = lfirst_oid(lc);
- if (get_rel_relispartition(relid))
- ancestors = get_partition_ancestors(relid);
+ ancestors = get_logical_ancestors(relid, get_rel_relispartition(relid));
foreach(lc2, ancestors)
{
@@ -782,11 +781,15 @@ List *
GetAllTablesPublicationRelations(bool pubviaroot)
{
Relation classRel;
+ Relation inhRel;
ScanKeyData key[1];
TableScanDesc scan;
HeapTuple tuple;
List *result = NIL;
+ /* TODO: is there a required order to acquire these locks? */
+ if (pubviaroot)
+ inhRel = table_open(InheritsRelationId, AccessShareLock);
classRel = table_open(RelationRelationId, AccessShareLock);
ScanKeyInit(&key[0],
@@ -802,7 +805,8 @@ GetAllTablesPublicationRelations(bool pubviaroot)
Oid relid = relForm->oid;
if (is_publishable_class(relid, relForm) &&
- !(relForm->relispartition && pubviaroot))
+ !(relForm->relispartition && pubviaroot) &&
+ !(pubviaroot && has_logical_parent(inhRel, relid)))
result = lappend_oid(result, relid);
}
@@ -831,6 +835,9 @@ GetAllTablesPublicationRelations(bool pubviaroot)
}
table_close(classRel, AccessShareLock);
+ if (pubviaroot)
+ table_close(inhRel, AccessShareLock);
+
return result;
}
@@ -1076,15 +1083,14 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
tables = list_concat_unique_oid(relids, schemarelids);
/*
- * If the publication publishes partition changes via their
- * respective root partitioned tables, we must exclude partitions
- * in favor of including the root partitioned tables. Otherwise,
- * the function could return both the child and parent tables
- * which could cause data of the child table to be
- * double-published on the subscriber side.
+ * If the publication publishes table changes via their respective
+ * logical root tables, we must exclude logical descendants in favor
+ * of including the root tables. Otherwise, the function could
+ * return both the child and parent tables which could cause data of
+ * the child table to be double-published on the subscriber side.
*/
if (publication->pubviaroot)
- tables = filter_partitions(tables);
+ tables = filter_logical_descendants(tables);
}
/* Construct a tuple descriptor for the result rows. */
@@ -1190,3 +1196,93 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+
+/*
+ * Returns a list of tables that should be copied during initial sync for the
+ * given root table and publication.
+ *
+ * The only time this list consists of anything more than the table itself is
+ * when a publication's publish_via_partition_root is set to true and the table
+ * has inherited child tables in the publication that have been marked with
+ * pg_set_logical_root().
+ */
+Datum
+pg_get_publication_rels_to_sync(PG_FUNCTION_ARGS)
+{
+#define NUM_SYNC_TABLES_ELEM 1
+ FuncCallContext *funcctx;
+ Oid rootid = PG_GETARG_OID(0);
+ char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ List *tables;
+
+ /* stuff done only on the first call of the function */
+ if (SRF_IS_FIRSTCALL())
+ {
+ MemoryContext oldcontext;
+ Publication *publication;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /* switch to memory context appropriate for multiple function calls */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ publication = GetPublicationByName(pubname, false);
+
+ /* TODO: do the tables in this list need to be locked? */
+ if (publication->pubviaroot)
+ tables = find_all_logical_inheritors(rootid);
+ else
+ tables = list_make1_oid(rootid);
+
+ if (!publication->alltables)
+ {
+ List *relids;
+ List *schemarelids;
+ List *published;
+ List *result = NIL;
+ ListCell *cell;
+
+ /*
+ * Filter our list of tables to those that are actually included in
+ * the publication.
+ */
+ relids = GetPublicationRelations(publication->oid,
+ publication->pubviaroot ?
+ PUBLICATION_PART_ROOT :
+ PUBLICATION_PART_ALL);
+ schemarelids = GetAllSchemaPublicationRelations(publication->oid,
+ publication->pubviaroot ?
+ PUBLICATION_PART_ROOT :
+ PUBLICATION_PART_LEAF);
+ published = list_concat_unique_oid(relids, schemarelids);
+
+ foreach(cell, tables)
+ {
+ Oid current = lfirst_oid(cell);
+
+ if (list_member_oid(published, current))
+ result = lappend_oid(result, current);
+ }
+
+ tables = result;
+ }
+
+ funcctx->user_fctx = (void *) tables;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ /* stuff done on every call of the function */
+ funcctx = SRF_PERCALL_SETUP();
+ tables = (List *) funcctx->user_fctx;
+
+ if (funcctx->call_cntr < list_length(tables))
+ {
+ Oid relid = list_nth_oid(tables, funcctx->call_cntr);
+
+ SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index f4ba572697..7e23bed6c0 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -238,6 +238,8 @@ contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
* parent table, but the bitmap contains the replica identity
* information of the child table. So, get the column number of the
* child table as parent and child column order could be different.
+ *
+ * TODO: is this applicable to pg_set_logical_root()?
*/
if (context->pubviaroot)
{
@@ -286,6 +288,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
*
* Note that even though the row filter used is for an ancestor, the
* REPLICA IDENTITY used will be for the actual child table.
+ *
+ * TODO: is this applicable to pg_set_logical_root()?
*/
if (pubviaroot && relation->rd_rel->relispartition)
{
@@ -336,6 +340,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
* the column list.
*
* Returns true if any replica identity column is not covered by column list.
+ *
+ * TODO: pg_set_logical_root()?
*/
bool
pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
@@ -628,6 +634,8 @@ TransformPubWhereClauses(List *tables, const char *queryString,
* If the publication doesn't publish changes via the root partitioned
* table, the partition's row filter will be used. So disallow using
* WHERE clause on partitioned table in this case.
+ *
+ * TODO: decide how this interacts with pg_set_logical_root
*/
if (!pubviaroot &&
pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
@@ -715,6 +723,8 @@ CheckPubRelationColumnList(char *pubname, List *tables,
* If the publication doesn't publish changes via the root partitioned
* table, the partition's column list will be used. So disallow using
* a column list on the partitioned table in this case.
+ *
+ * TODO: decide if this interacts with pg_set_logical_root()
*/
if (!pubviaroot &&
pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
diff --git a/src/backend/partitioning/partdesc.c b/src/backend/partitioning/partdesc.c
index de06caccd2..2e34983117 100644
--- a/src/backend/partitioning/partdesc.c
+++ b/src/backend/partitioning/partdesc.c
@@ -162,7 +162,8 @@ RelationBuildPartitionDesc(Relation rel, bool omit_detached)
inhoids = find_inheritance_children_extended(RelationGetRelid(rel),
omit_detached, NoLock,
&detached_exist,
- &detached_xmin);
+ &detached_xmin,
+ false);
nparts = list_length(inhoids);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 07eea504ba..14dcf2a825 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -756,11 +756,12 @@ copy_read_data(void *outbuf, int minread, int maxread)
/*
* Get information about remote relation in similar fashion the RELATION
* message provides during replication. This function also returns the relation
- * qualifications to be used in the COPY command.
+ * qualifications to be used in the COPY command, and the list of tables to COPY
+ * (which for most tables will contain only one entry).
*/
static void
fetch_remote_table_info(char *nspname, char *relname,
- LogicalRepRelation *lrel, List **qual)
+ LogicalRepRelation *lrel, List **qual, List **to_copy)
{
WalRcvExecResult *res;
StringInfoData cmd;
@@ -1071,6 +1072,87 @@ fetch_remote_table_info(char *nspname, char *relname,
walrcv_clear_result(res);
}
+ /*
+ * See if there are any other tables to be copied besides the original. This
+ * happens when a descendant in the inheritance relationship is marked with
+ * pg_set_logical_root() and is part of a publication with
+ * publish_via_partition_root = true.
+ *
+ * FIXME: if two publications have conflicting settings for
+ * publish_via_partition_root, this behaves strangely. It looks like that's
+ * true for regular partitions too...
+ */
+ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000)
+ {
+ Oid descRow[] = {TEXTOID, TEXTOID};
+ StringInfoData pub_names;
+
+ /* Build the pubname list. */
+ initStringInfo(&pub_names);
+ foreach(lc, MySubscription->publications)
+ {
+ char *pubname = strVal(lfirst(lc));
+
+ if (foreach_current_index(lc) > 0)
+ appendStringInfoString(&pub_names, ", ");
+
+ appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
+ }
+
+ /*
+ * Ask the server what it wants us to sync.
+ */
+ resetStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "SELECT DISTINCT n.nspname, c.relname"
+ " FROM pg_catalog.pg_publication p,"
+ " pg_catalog.pg_class c"
+ " JOIN pg_catalog.pg_namespace n"
+ " ON (c.relnamespace = n.oid)"
+ " JOIN LATERAL pg_catalog.pg_get_publication_rels_to_sync(%u::regclass, p.pubname) relid"
+ " ON (c.oid = relid)"
+ " WHERE p.pubname IN ( %s )",
+ lrel->remoteid, pub_names.data);
+
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(descRow), descRow);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not fetch logical descendants for table \"%s.%s\" from publisher: %s",
+ nspname, relname, res->err)));
+
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *desc_nspname;
+ char *desc_relname;
+ char *quoted;
+
+ desc_nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+ desc_relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ quoted = quote_qualified_identifier(desc_nspname, desc_relname);
+ *to_copy = lappend(*to_copy, quoted);
+
+ ExecClearTuple(slot);
+ }
+
+ ExecDropSingleTupleTableSlot(slot);
+ walrcv_clear_result(res);
+
+ pfree(pub_names.data);
+ }
+ else
+ {
+ /* For older servers, we only COPY the table itself. */
+ char *quoted = quote_qualified_identifier(lrel->nspname,
+ lrel->relname);
+ *to_copy = lappend(*to_copy, quoted);
+ }
+
pfree(cmd.data);
}
@@ -1085,6 +1167,8 @@ copy_table(Relation rel)
LogicalRepRelMapEntry *relmapentry;
LogicalRepRelation lrel;
List *qual = NIL;
+ List *to_copy = NIL;
+ ListCell *cur;
WalRcvExecResult *res;
StringInfoData cmd;
CopyFromState cstate;
@@ -1093,7 +1177,8 @@ copy_table(Relation rel)
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), &lrel, &qual);
+ RelationGetRelationName(rel), &lrel, &qual,
+ &to_copy);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
@@ -1102,92 +1187,96 @@ copy_table(Relation rel)
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
Assert(rel == relmapentry->localrel);
- /* Start copy on the publisher. */
- initStringInfo(&cmd);
-
- /* Regular table with no row filter */
- if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+ foreach(cur, to_copy)
{
- appendStringInfo(&cmd, "COPY %s (",
- quote_qualified_identifier(lrel.nspname, lrel.relname));
+ char *quoted_name = lfirst(cur);
- /*
- * XXX Do we need to list the columns in all cases? Maybe we're
- * replicating all columns?
- */
- for (int i = 0; i < lrel.natts; i++)
- {
- if (i > 0)
- appendStringInfoString(&cmd, ", ");
+ /* Start copy on the publisher. */
+ initStringInfo(&cmd);
- appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
- }
-
- appendStringInfoString(&cmd, ") TO STDOUT");
- }
- else
- {
- /*
- * For non-tables and tables with row filters, we need to do COPY
- * (SELECT ...), but we can't just do SELECT * because we need to not
- * copy generated columns. For tables with any row filters, build a
- * SELECT query with OR'ed row filters for COPY.
- */
- appendStringInfoString(&cmd, "COPY (SELECT ");
- for (int i = 0; i < lrel.natts; i++)
+ /* Regular table with no row filter */
+ if (lrel.relkind == RELKIND_RELATION && qual == NIL)
{
- appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
- if (i < lrel.natts - 1)
- appendStringInfoString(&cmd, ", ");
- }
+ appendStringInfo(&cmd, "COPY %s (", quoted_name);
- appendStringInfoString(&cmd, " FROM ");
+ /*
+ * XXX Do we need to list the columns in all cases? Maybe we're
+ * replicating all columns?
+ */
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ if (i > 0)
+ appendStringInfoString(&cmd, ", ");
- /*
- * For regular tables, make sure we don't copy data from a child that
- * inherits the named table as those will be copied separately.
- */
- if (lrel.relkind == RELKIND_RELATION)
- appendStringInfoString(&cmd, "ONLY ");
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ }
- appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
- /* list of OR'ed filters */
- if (qual != NIL)
+ appendStringInfoString(&cmd, ") TO STDOUT");
+ }
+ else
{
- ListCell *lc;
- char *q = strVal(linitial(qual));
+ /*
+ * For non-tables and tables with row filters, we need to do COPY
+ * (SELECT ...), but we can't just do SELECT * because we need to not
+ * copy generated columns. For tables with any row filters, build a
+ * SELECT query with OR'ed row filters for COPY.
+ */
+ appendStringInfoString(&cmd, "COPY (SELECT ");
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ if (i < lrel.natts - 1)
+ appendStringInfoString(&cmd, ", ");
+ }
+
+ appendStringInfoString(&cmd, " FROM ");
- appendStringInfo(&cmd, " WHERE %s", q);
- for_each_from(lc, qual, 1)
+ /*
+ * For regular tables, make sure we don't copy data from a child that
+ * inherits the named table as those will be copied separately.
+ */
+ if (lrel.relkind == RELKIND_RELATION)
+ appendStringInfoString(&cmd, "ONLY ");
+
+ appendStringInfoString(&cmd, quoted_name);
+ /* list of OR'ed filters */
+ if (qual != NIL)
{
- q = strVal(lfirst(lc));
- appendStringInfo(&cmd, " OR %s", q);
+ ListCell *lc;
+ char *q = strVal(linitial(qual));
+
+ appendStringInfo(&cmd, " WHERE %s", q);
+ for_each_from(lc, qual, 1)
+ {
+ q = strVal(lfirst(lc));
+ appendStringInfo(&cmd, " OR %s", q);
+ }
+ list_free_deep(qual);
}
- list_free_deep(qual);
- }
- appendStringInfoString(&cmd, ") TO STDOUT");
- }
- res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
- pfree(cmd.data);
- if (res->status != WALRCV_OK_COPY_OUT)
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("could not start initial contents copy for table \"%s.%s\": %s",
- lrel.nspname, lrel.relname, res->err)));
- walrcv_clear_result(res);
+ appendStringInfoString(&cmd, ") TO STDOUT");
+ }
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
+ pfree(cmd.data);
+ if (res->status != WALRCV_OK_COPY_OUT)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not start initial contents copy for table \"%s.%s\" from remote %s: %s",
+ lrel.nspname, lrel.relname, quoted_name, res->err)));
+ walrcv_clear_result(res);
- copybuf = makeStringInfo();
+ copybuf = makeStringInfo();
- pstate = make_parsestate(NULL);
- (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
- NULL, false, false);
+ pstate = make_parsestate(NULL);
+ (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
+ NULL, false, false);
- attnamelist = make_copy_attnamelist(relmapentry);
- cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
+ attnamelist = make_copy_attnamelist(relmapentry);
+ cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
- /* Do the copy */
- (void) CopyFrom(cstate);
+ /* Do the copy */
+ (void) CopyFrom(cstate);
+ }
logicalrep_rel_close(relmapentry, NoLock);
}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 0df1acbb7a..7cfb6130a8 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -14,6 +14,7 @@
#include "access/tupconvert.h"
#include "catalog/partition.h"
+#include "catalog/pg_inherits.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_subscription.h"
@@ -1454,7 +1455,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
- Assert(relation->rd_rel->relispartition);
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
targetrel = ancestor;
/* Convert tuple if needed. */
@@ -2149,55 +2149,57 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
int ancestor_level = 0;
/*
- * If this is a FOR ALL TABLES publication, pick the partition
+ * If this is a FOR ALL TABLES publication, pick the logical
* root and set the ancestor level accordingly.
*/
if (pub->alltables)
{
publish = true;
- if (pub->pubviaroot && am_partition)
+ if (pub->pubviaroot)
{
- List *ancestors = get_partition_ancestors(relid);
+ List *ancestors;
- pub_relid = llast_oid(ancestors);
- ancestor_level = list_length(ancestors);
+ ancestors = get_logical_ancestors(relid, am_partition);
+ if (ancestors != NIL)
+ {
+ pub_relid = llast_oid(ancestors);
+ ancestor_level = list_length(ancestors);
+ }
}
}
if (!publish)
{
bool ancestor_published = false;
+ Oid ancestor;
+ int level;
+ List *ancestors;
/*
- * For a partition, check if any of the ancestors are
- * published. If so, note down the topmost ancestor that is
+ * Check if any of the logical ancestors (that is, partition
+ * parents or tables marked with pg_set_logical_root()) are
+ * published. If so, note down the topmost ancestor that is
* published via this publication, which will be used as the
- * relation via which to publish the partition's changes.
+ * relation via which to publish this table's changes.
*/
- if (am_partition)
- {
- Oid ancestor;
- int level;
- List *ancestors = get_partition_ancestors(relid);
-
- ancestor = GetTopMostAncestorInPublication(pub->oid,
- ancestors,
- &level);
+ ancestors = get_logical_ancestors(relid, am_partition);
+ ancestor = GetTopMostAncestorInPublication(pub->oid,
+ ancestors,
+ &level);
- if (ancestor != InvalidOid)
+ if (ancestor != InvalidOid)
+ {
+ ancestor_published = true;
+ if (pub->pubviaroot)
{
- ancestor_published = true;
- if (pub->pubviaroot)
- {
- pub_relid = ancestor;
- ancestor_level = level;
- }
+ pub_relid = ancestor;
+ ancestor_level = level;
}
}
if (list_member_oid(pubids, pub->oid) ||
list_member_oid(schemaPubids, pub->oid) ||
- ancestor_published)
+ (am_partition && ancestor_published))
publish = true;
}
diff --git a/src/include/catalog/pg_inherits.h b/src/include/catalog/pg_inherits.h
index ce154ab943..cfcacacf76 100644
--- a/src/include/catalog/pg_inherits.h
+++ b/src/include/catalog/pg_inherits.h
@@ -46,14 +46,17 @@ typedef FormData_pg_inherits *Form_pg_inherits;
DECLARE_UNIQUE_INDEX_PKEY(pg_inherits_relid_seqno_index, 2680, InheritsRelidSeqnoIndexId, on pg_inherits using btree(inhrelid oid_ops, inhseqno int4_ops));
DECLARE_INDEX(pg_inherits_parent_index, 2187, InheritsParentIndexId, on pg_inherits using btree(inhparent oid_ops));
+DECLARE_INDEX(pg_inherits_parent_seqno_index, 8138, InheritsParentSeqnoIndexId, on pg_inherits using btree(inhparent oid_ops, inhseqno int4_ops));
extern List *find_inheritance_children(Oid parentrelId, LOCKMODE lockmode);
extern List *find_inheritance_children_extended(Oid parentrelId, bool omit_detached,
- LOCKMODE lockmode, bool *detached_exist, TransactionId *detached_xmin);
+ LOCKMODE lockmode, bool *detached_exist, TransactionId *detached_xmin,
+ bool logical_only);
extern List *find_all_inheritors(Oid parentrelId, LOCKMODE lockmode,
List **numparents);
+extern List *find_all_logical_inheritors(Oid parentrelId);
extern bool has_subclass(Oid relationId);
extern bool has_superclass(Oid relationId);
extern bool typeInheritsFrom(Oid subclassTypeId, Oid superclassTypeId);
@@ -63,5 +66,7 @@ extern bool DeleteInheritsTuple(Oid inhrelid, Oid inhparent,
bool expect_detach_pending,
const char *childname);
extern bool PartitionHasPendingDetach(Oid partoid);
+extern List *get_logical_ancestors(Oid relid, bool is_partition);
+extern bool has_logical_parent(Relation inhRel, Oid relid);
#endif /* PG_INHERITS_H */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 505595620e..9d7b3c11bf 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11729,6 +11729,11 @@
proname => 'pg_relation_is_publishable', provolatile => 's',
prorettype => 'bool', proargtypes => 'regclass',
prosrc => 'pg_relation_is_publishable' },
+{ oid => '8137', descr => 'get list of tables to copy during initial sync',
+ proname => 'pg_get_publication_rels_to_sync', prorows => '10', proretset => 't',
+ provolatile => 's', prorettype => 'regclass', proargtypes => 'regclass text',
+ proargnames => '{rootid,pubname}',
+ prosrc => 'pg_get_publication_rels_to_sync' },
# rls
{ oid => '3298',
@@ -11887,6 +11892,11 @@
proname => 'pg_partition_root', prorettype => 'regclass',
proargtypes => 'regclass', prosrc => 'pg_partition_root' },
+{ oid => '8136', descr => 'mark a table root for logical replication',
+ proname => 'pg_set_logical_root', provolatile => 'v', proparallel => 'u',
+ prorettype => 'void', proargtypes => 'regclass regclass',
+ prosrc => 'pg_set_logical_root' },
+
{ oid => '4350', descr => 'Unicode normalization',
proname => 'normalize', prorettype => 'text', proargtypes => 'text text',
prosrc => 'unicode_normalize_func' },
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 427f87ea07..0e06103f4c 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -1718,9 +1718,41 @@ SELECT * FROM pg_publication_tables;
pub | sch1 | tbl1 | {a} |
(1 row)
+-- Sanity check cases for pg_set_logical_root().
+CREATE TABLE sch1.iroot (a int);
+CREATE TABLE sch1.ipart1 (a int);
+CREATE TABLE sch1.ipart2 () INHERITS (sch1.iroot);
+-- marking roots between unrelated tables is not allowed
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+ERROR: table "ipart1" does not inherit from intended root table "iroot"
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.tbl1');
+ERROR: table "ipart2" does not inherit from intended root table "tbl1"
+-- establishing an inheritance relationship fixes the problem
+ALTER TABLE sch1.ipart1 INHERIT sch1.iroot;
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+ pg_set_logical_root
+---------------------
+
+(1 row)
+
+-- but multiple inheritance is not allowed
+ALTER TABLE sch1.ipart2 INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+ERROR: table "ipart2" inherits from multiple tables
+ALTER TABLE sch1.ipart2 NO INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+ pg_set_logical_root
+---------------------
+
+(1 row)
+
+-- TODO: make sure existing logical descendant can't be ALTERed [NO] INHERIT
RESET client_min_messages;
DROP PUBLICATION pub;
DROP TABLE sch1.tbl1;
+DROP TABLE sch1.ipart1;
+DROP TABLE sch1.ipart2;
+DROP TABLE sch1.iroot;
DROP SCHEMA sch1 cascade;
DROP SCHEMA sch2 cascade;
RESET SESSION AUTHORIZATION;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index a47c5939d5..52a0d6ba48 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -1087,9 +1087,34 @@ ALTER TABLE sch1.tbl1 ATTACH PARTITION sch1.tbl1_part3 FOR VALUES FROM (20) to (
CREATE PUBLICATION pub FOR TABLES IN SCHEMA sch1 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
SELECT * FROM pg_publication_tables;
+-- Sanity check cases for pg_set_logical_root().
+CREATE TABLE sch1.iroot (a int);
+CREATE TABLE sch1.ipart1 (a int);
+CREATE TABLE sch1.ipart2 () INHERITS (sch1.iroot);
+
+-- marking roots between unrelated tables is not allowed
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.tbl1');
+
+-- establishing an inheritance relationship fixes the problem
+ALTER TABLE sch1.ipart1 INHERIT sch1.iroot;
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+
+-- but multiple inheritance is not allowed
+ALTER TABLE sch1.ipart2 INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+
+ALTER TABLE sch1.ipart2 NO INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+
+-- TODO: make sure existing logical descendant can't be ALTERed [NO] INHERIT
+
RESET client_min_messages;
DROP PUBLICATION pub;
DROP TABLE sch1.tbl1;
+DROP TABLE sch1.ipart1;
+DROP TABLE sch1.ipart2;
+DROP TABLE sch1.iroot;
DROP SCHEMA sch1 cascade;
DROP SCHEMA sch2 cascade;
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 11a5c3c03e..a4d771f31f 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -388,6 +388,59 @@ $node_subscriber1->append_conf('postgresql.conf',
"log_min_messages = warning");
$node_subscriber1->reload;
+# Make sure standard inheritance setups aren't broken by the new
+# pg_set_logical_root() handling.
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1_1 (LIKE itab1)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1_2 (LIKE itab1)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1_1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1_2 (CHECK (a = 2)) INHERITS (itab1)");
+
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab1_1', 'itab1')");
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab1_2', 'itab1')");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (0, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1_1 VALUES (1, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1_2 VALUES (2, 'itab1')");
+
+# Regression: Create a publication for an unrelated table and set
+# publish_via_partition_root. This should have no effect at all, since itab1
+# isn't supposed to be using this publication...
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION dummy_pub FOR TABLE tab1 WITH (publish_via_partition_root = true)");
+$node_subscriber2->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub2 ADD PUBLICATION dummy_pub");
+$node_subscriber2->wait_for_subscription_sync;
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1");
+is($result, qq(0|itab1), 'initial data synced for itab1 on subscriber 1');
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1_1");
+is($result, qq(1|itab1), 'initial data synced for itab1_1 on subscriber 1');
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1_2");
+is($result, qq(2|itab1), 'initial data synced for itab1_2 on subscriber 1');
+
+$node_publisher->safe_psql('postgres',
+ "DROP TABLE itab1 CASCADE;");
+$node_subscriber2->safe_psql('postgres',
+ "DROP TABLE itab1 CASCADE;");
+
+$node_subscriber2->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub2 DROP PUBLICATION dummy_pub");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION dummy_pub");
+
# Tests for replication using root table identity and schema
# publisher
@@ -877,4 +930,190 @@ $result = $node_subscriber2->safe_psql('postgres',
"SELECT a, b, c FROM tab5_1 ORDER BY 1");
is($result, qq(4||1), 'updates of tab5 replicated correctly');
+# Test that replication works for older inheritance/trigger setups as well.
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1_1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1_2 (CHECK (a = 2)) INHERITS (itab1)");
+
+$node_publisher->safe_psql('postgres', "
+ CREATE OR REPLACE FUNCTION itab1_trigger()
+ RETURNS TRIGGER AS \$\$
+ BEGIN
+ IF ( NEW.a = 1 ) THEN INSERT INTO itab1_1 VALUES (NEW.*);
+ ELSIF ( NEW.a = 2 ) THEN INSERT INTO itab1_2 VALUES (NEW.*);
+ ELSE RETURN NEW;
+ END IF;
+ RETURN NULL;
+ END;
+ \$\$
+ LANGUAGE plpgsql;");
+$node_publisher->safe_psql('postgres', "
+ CREATE TRIGGER itab1_trigger
+ BEFORE INSERT ON itab1
+ FOR EACH ROW EXECUTE FUNCTION itab1_trigger();");
+
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab1_1', 'itab1')");
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab1_2', 'itab1')");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab2 (a int, b text)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab2_1 (CHECK (a = 1)) INHERITS (itab2)");
+
+$node_publisher->safe_psql('postgres', "
+ CREATE OR REPLACE FUNCTION itab2_trigger()
+ RETURNS TRIGGER AS \$\$
+ BEGIN
+ IF ( NEW.a = 1 ) THEN INSERT INTO itab2_1 VALUES (NEW.*);
+ ELSE RETURN NEW;
+ END IF;
+ RETURN NULL;
+ END;
+ \$\$
+ LANGUAGE plpgsql;");
+$node_publisher->safe_psql('postgres', "
+ CREATE TRIGGER itab2_trigger
+ BEFORE INSERT ON itab2
+ FOR EACH ROW EXECUTE FUNCTION itab2_trigger();");
+
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab2_1', 'itab2')");
+
+# itab2_1 should be published using its own identity here, since its parent is
+# not included. itab1_1 should be published via its parent, itab1, without
+# duplicating the rows.
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION pub_viaroot ADD TABLE itab1, itab1_1, itab2_1");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (0, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (1, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (2, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab2 VALUES (0, 'itab2')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab2 VALUES (1, 'itab2')");
+
+# Subscriber 1 only subscribes to some of the partitions, and does not set up
+# partition triggers, to check for the correct routing.
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE itab1_1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE itab2_1 (a int, b text)");
+
+# Subscriber 2 has different partition names for itab1, and it doesn't partition
+# itab2 at all.
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1_part1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1_part2 (CHECK (a = 2)) INHERITS (itab1)");
+
+$node_subscriber2->safe_psql('postgres', "
+ CREATE OR REPLACE FUNCTION itab_trigger()
+ RETURNS TRIGGER AS \$\$
+ BEGIN
+ IF ( NEW.a = 1 ) THEN INSERT INTO public.itab1_part1 VALUES (NEW.*);
+ ELSIF ( NEW.a = 2 ) THEN INSERT INTO public.itab1_part2 VALUES (NEW.*);
+ ELSE RETURN NEW;
+ END IF;
+ RETURN NULL;
+ END;
+ \$\$
+ LANGUAGE plpgsql;");
+$node_subscriber2->safe_psql('postgres', "
+ CREATE TRIGGER itab_trigger
+ BEFORE INSERT ON itab1
+ FOR EACH ROW EXECUTE FUNCTION itab_trigger();");
+$node_subscriber2->safe_psql('postgres', "
+ ALTER TABLE itab1 ENABLE ALWAYS TRIGGER itab_trigger;");
+
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab2 (a int, b text)");
+
+$node_subscriber1->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub_viaroot REFRESH PUBLICATION");
+$node_subscriber2->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+
+$node_subscriber1->wait_for_subscription_sync;
+$node_subscriber2->wait_for_subscription_sync;
+
+# check that data is synced correctly
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+2|itab1), 'initial data synced for itab1 on subscriber 1');
+
+# all of the data should have been routed to itab1 directly (there are no
+# triggers on subscriber 1 to move it elsewhere)
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+2|itab1), 'initial data correctly routed for itab1 on subscriber 1');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM itab2_1 ORDER BY 1, 2");
+is($result, qq(1|itab2), 'initial data synced for itab2_1 on subscriber 1');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+2|itab1), 'initial data synced for itab1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1), 'initial data correctly routed for itab1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab2 ORDER BY 1, 2");
+is($result, qq(0|itab2
+1|itab2), 'initial data synced for itab2 on subscriber 2');
+
+# make sure new data is also correctly routed to the roots
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (1, 'itab1-new')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab2 VALUES (1, 'itab2-new')");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+1|itab1-new
+2|itab1), 'new data routed for itab1 on subscriber 1');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM itab2_1 ORDER BY 1, 2");
+is($result, qq(1|itab2
+1|itab2-new), 'new data routed for itab2_1 on subscriber 1');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+1|itab1-new
+2|itab1), 'new data routed for itab1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1_part1 ORDER BY 1, 2");
+is($result, qq(1|itab1
+1|itab1-new), 'new data moved to itab1_part1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab2 ORDER BY 1, 2");
+is($result, qq(0|itab2
+1|itab2
+1|itab2-new), 'new data routed for itab2 on subscriber 2');
+
done_testing();
--
2.25.1