On 4/4/23 08:14, Jacob Champion wrote:
> Yes, sorry -- after 062a84442, the architecture needs to change in a
> way that I'm still working through. I've moved the patch to Waiting on
> Author while I figure out the rebase.
Okay -- that took longer than I wanted, but here's a rebased patchset
that I'll call v2.
Commit 062a84442 necessitated some rework of the new
pg_get_publication_rels_to_sync() helper. It now takes a list of
publications so that we can handle conflicts in the pubviaroot settings.
This is more complicated than before -- unlike partitions, standard
inheritance trees can selectively publish tables that aren't leaves. But
I think I've finally settled on some semantics for it which are
unsurprising.
As part of that, I've pulled out a patch in 0001 which I hope is
independently useful. Today, there appears to be no way to check which
relid a table will be published through, short of creating a
subscription just to see what happens. 0001 introduces
pg_get_relation_publishing_info() to surface this information, which
makes testing it easier and also makes it possible to inspect what's
happening with more complicated publication setups.
0001 also moves the determination of publish_as_relid out of the
pgoutput plugin and into a pg_publication helper function, because
unless I've missed something crucial, it doesn't seem like an output
plugin is really free to make that decision independently of the
publication settings. The subscriber is not going to ask a plugin for
the right tables to COPY during initial sync, so the plugin had better
be using the same logic as the core.
Many TODOs and upthread points of feedback are still pending, and I
think that several of them are actually symptoms of one architectural
problem with my patch:
- The volatility classifications of pg_set_logical_root() and
pg_get_publication_rels_to_sync() appear to conflict
- A dump/restore cycle loses the new marker
- Inheritance can be tampered with after the logical root has been set
- There's currently no way to clear a logical root after setting it
I wonder if pg_set_logical_root() might be better implemented as part of
ALTER TABLE. Maybe with a relation option? If it all went through ALTER
TABLE ONLY ... SET, then we wouldn't have to worry about a user
modifying roots while reading pg_get_publication_rels_to_sync() in the
same query. The permissions checks should be more consistent with less
effort, and there's an existing way to set/clear the option that already
plays well with pg_dump and pg_upgrade. The downsides I can see are the
need to handle simultaneous changes to INHERIT and SET (since we'd be
manipulating pg_inherits in both), as well as the fact that ALTER TABLE
... SET defaults to altering the entire table hierarchy, which may be
bad UX for this case.
--Jacob
From 761b91b3f173adffac9f73132c2f1c9fc8bcbec3 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jchamp...@timescale.com>
Date: Fri, 7 Apr 2023 09:55:27 -0700
Subject: [PATCH v2 1/2] pgoutput: refactor publication cache construction
Breaking this logic into a standalone helper should help expose the
behavior for testing. Additionally, move the implementation under the
pg_publication catalog helpers, since it seems like it's inherent to the
publication settings (and it must match the behavior of the initial
sync, which doesn't seem to be controlled by the output plugin at all).
---
src/backend/catalog/pg_publication.c | 215 ++++++++++++++++++++
src/backend/replication/pgoutput/pgoutput.c | 137 +------------
src/include/catalog/pg_proc.dat | 8 +
src/include/catalog/pg_publication.h | 4 +
src/test/regress/expected/publication.out | 26 +++
src/test/regress/sql/publication.sql | 15 ++
6 files changed, 272 insertions(+), 133 deletions(-)
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index c488b6370b..df9abf09c3 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1259,3 +1259,218 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+
+List *
+process_relation_publications(Oid relid, const List *publications,
+ PublicationActions *pubactions,
+ Oid *publish_as_relid)
+{
+ Oid schemaId = get_rel_namespace(relid);
+ List *pubids = GetRelationPublications(relid);
+
+ /*
+ * We don't acquire a lock on the namespace system table as we build
+ * the cache entry using a historic snapshot and all the later changes
+ * are absorbed while decoding WAL.
+ */
+ List *schemaPubids = GetSchemaPublications(schemaId);
+ ListCell *lc;
+ int publish_ancestor_level = 0;
+ bool am_partition = get_rel_relispartition(relid);
+ char relkind = get_rel_relkind(relid);
+ List *rel_publications = NIL;
+
+ *publish_as_relid = relid;
+
+ foreach(lc, publications)
+ {
+ Publication *pub = lfirst(lc);
+ bool publish = false;
+
+ /*
+ * Under what relid should we publish changes in this publication?
+ * We'll use the top-most relid across all publications. Also
+ * track the ancestor level for this publication.
+ */
+ Oid pub_relid = relid;
+ int ancestor_level = 0;
+
+ /*
+ * If this is a FOR ALL TABLES publication, pick the partition
+ * root and set the ancestor level accordingly.
+ */
+ if (pub->alltables)
+ {
+ publish = true;
+ if (pub->pubviaroot && am_partition)
+ {
+ List *ancestors = get_partition_ancestors(relid);
+
+ pub_relid = llast_oid(ancestors);
+ ancestor_level = list_length(ancestors);
+ }
+ }
+
+ if (!publish)
+ {
+ bool ancestor_published = false;
+
+ /*
+ * For a partition, check if any of the ancestors are
+ * published. If so, note down the topmost ancestor that is
+ * published via this publication, which will be used as the
+ * relation via which to publish the partition's changes.
+ */
+ if (am_partition)
+ {
+ Oid ancestor;
+ int level;
+ List *ancestors = get_partition_ancestors(relid);
+
+ ancestor = GetTopMostAncestorInPublication(pub->oid,
+ ancestors,
+ &level);
+
+ if (ancestor != InvalidOid)
+ {
+ ancestor_published = true;
+ if (pub->pubviaroot)
+ {
+ pub_relid = ancestor;
+ ancestor_level = level;
+ }
+ }
+ }
+
+ if (list_member_oid(pubids, pub->oid) ||
+ list_member_oid(schemaPubids, pub->oid) ||
+ ancestor_published)
+ publish = true;
+ }
+
+ /*
+ * If the relation is to be published, determine actions to
+ * publish, and list of columns, if appropriate.
+ *
+ * Don't publish changes for partitioned tables, because
+ * publishing those of its partitions suffices, unless partition
+ * changes won't be published due to pubviaroot being set.
+ */
+ if (publish &&
+ (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
+ {
+ pubactions->pubinsert |= pub->pubactions.pubinsert;
+ pubactions->pubupdate |= pub->pubactions.pubupdate;
+ pubactions->pubdelete |= pub->pubactions.pubdelete;
+ pubactions->pubtruncate |= pub->pubactions.pubtruncate;
+
+ /*
+ * We want to publish the changes as the top-most ancestor
+ * across all publications. So we need to check if the already
+ * calculated level is higher than the new one. If yes, we can
+ * ignore the new value (as it's a child). Otherwise the new
+ * value is an ancestor, so we keep it.
+ */
+ if (publish_ancestor_level > ancestor_level)
+ continue;
+
+ /*
+ * If we found an ancestor higher up in the tree, discard the
+ * list of publications through which we replicate it, and use
+ * the new ancestor.
+ */
+ if (publish_ancestor_level < ancestor_level)
+ {
+ *publish_as_relid = pub_relid;
+ publish_ancestor_level = ancestor_level;
+
+ /* reset the publication list for this relation */
+ rel_publications = NIL;
+ }
+ else
+ {
+ /* Same ancestor level, has to be the same OID. */
+ Assert(*publish_as_relid == pub_relid);
+ }
+
+ /* Track publications for this ancestor. */
+ rel_publications = lappend(rel_publications, pub);
+ }
+ }
+
+ list_free(pubids);
+ list_free(schemaPubids);
+
+ return rel_publications;
+}
+
+Datum
+pg_get_relation_publishing_info(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ List *publications = NIL;
+ ArrayType *arr;
+ Datum *elems;
+ int nelems,
+ i;
+ TupleDesc tupdesc;
+ HeapTuple htup;
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ arr = PG_GETARG_ARRAYTYPE_P(1);
+ deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT, &elems, NULL,
+ &nelems);
+
+ /* Get Oids of tables from each publication. */
+ for (i = 0; i < nelems; i++)
+ {
+ char *pubname = TextDatumGetCString(elems[i]);
+ Publication *pub = GetPublicationByName(pubname, false);
+
+ publications = lappend(publications, pub);
+ }
+
+ {
+ List *rel_publications;
+ PublicationActions pubactions;
+ Oid publish_as_relid;
+ ListCell *lc;
+ Datum *puboids;
+ ArrayType *puboidarray;
+ Datum values[6];
+ bool nulls[6];
+
+ rel_publications =
+ process_relation_publications(relid, publications, &pubactions,
+ &publish_as_relid);
+
+ /* Translate the rel_publications List into an OID array. */
+ puboids = palloc(sizeof(Datum) * list_length(rel_publications));
+
+ foreach(lc, rel_publications)
+ {
+ Publication *pub = lfirst(lc);
+
+ i = foreach_current_index(lc);
+ puboids[i] = ObjectIdGetDatum(pub->oid);
+ }
+
+ puboidarray = construct_array(puboids, list_length(rel_publications),
+ OIDOID, sizeof(Oid), true, TYPALIGN_INT);
+
+ values[0] = PointerGetDatum(puboidarray);
+ values[1] = ObjectIdGetDatum(publish_as_relid);
+ values[2] = BoolGetDatum(pubactions.pubinsert);
+ values[3] = BoolGetDatum(pubactions.pubupdate);
+ values[4] = BoolGetDatum(pubactions.pubdelete);
+ values[5] = BoolGetDatum(pubactions.pubtruncate);
+
+ memset(nulls, false, sizeof(nulls));
+
+ htup = heap_form_tuple(tupdesc, values, nulls);
+ }
+
+ PG_RETURN_DATUM(HeapTupleGetDatum(htup));
+}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b08ca55041..640676e7dc 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1986,20 +1986,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
/* Validate the entry */
if (!entry->replicate_valid)
{
- Oid schemaId = get_rel_namespace(relid);
- List *pubids = GetRelationPublications(relid);
-
- /*
- * We don't acquire a lock on the namespace system table as we build
- * the cache entry using a historic snapshot and all the later changes
- * are absorbed while decoding WAL.
- */
- List *schemaPubids = GetSchemaPublications(schemaId);
- ListCell *lc;
- Oid publish_as_relid = relid;
- int publish_ancestor_level = 0;
- bool am_partition = get_rel_relispartition(relid);
- char relkind = get_rel_relkind(relid);
List *rel_publications = NIL;
/* Reload publications if needed before use. */
@@ -2063,123 +2049,10 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
* but here we only need to consider ones that the subscriber
* requested.
*/
- foreach(lc, data->publications)
- {
- Publication *pub = lfirst(lc);
- bool publish = false;
-
- /*
- * Under what relid should we publish changes in this publication?
- * We'll use the top-most relid across all publications. Also
- * track the ancestor level for this publication.
- */
- Oid pub_relid = relid;
- int ancestor_level = 0;
-
- /*
- * If this is a FOR ALL TABLES publication, pick the partition
- * root and set the ancestor level accordingly.
- */
- if (pub->alltables)
- {
- publish = true;
- if (pub->pubviaroot && am_partition)
- {
- List *ancestors = get_partition_ancestors(relid);
-
- pub_relid = llast_oid(ancestors);
- ancestor_level = list_length(ancestors);
- }
- }
-
- if (!publish)
- {
- bool ancestor_published = false;
-
- /*
- * For a partition, check if any of the ancestors are
- * published. If so, note down the topmost ancestor that is
- * published via this publication, which will be used as the
- * relation via which to publish the partition's changes.
- */
- if (am_partition)
- {
- Oid ancestor;
- int level;
- List *ancestors = get_partition_ancestors(relid);
-
- ancestor = GetTopMostAncestorInPublication(pub->oid,
- ancestors,
- &level);
-
- if (ancestor != InvalidOid)
- {
- ancestor_published = true;
- if (pub->pubviaroot)
- {
- pub_relid = ancestor;
- ancestor_level = level;
- }
- }
- }
-
- if (list_member_oid(pubids, pub->oid) ||
- list_member_oid(schemaPubids, pub->oid) ||
- ancestor_published)
- publish = true;
- }
-
- /*
- * If the relation is to be published, determine actions to
- * publish, and list of columns, if appropriate.
- *
- * Don't publish changes for partitioned tables, because
- * publishing those of its partitions suffices, unless partition
- * changes won't be published due to pubviaroot being set.
- */
- if (publish &&
- (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
- {
- entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
- entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
- entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
- entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
-
- /*
- * We want to publish the changes as the top-most ancestor
- * across all publications. So we need to check if the already
- * calculated level is higher than the new one. If yes, we can
- * ignore the new value (as it's a child). Otherwise the new
- * value is an ancestor, so we keep it.
- */
- if (publish_ancestor_level > ancestor_level)
- continue;
-
- /*
- * If we found an ancestor higher up in the tree, discard the
- * list of publications through which we replicate it, and use
- * the new ancestor.
- */
- if (publish_ancestor_level < ancestor_level)
- {
- publish_as_relid = pub_relid;
- publish_ancestor_level = ancestor_level;
-
- /* reset the publication list for this relation */
- rel_publications = NIL;
- }
- else
- {
- /* Same ancestor level, has to be the same OID. */
- Assert(publish_as_relid == pub_relid);
- }
-
- /* Track publications for this ancestor. */
- rel_publications = lappend(rel_publications, pub);
- }
- }
-
- entry->publish_as_relid = publish_as_relid;
+ rel_publications =
+ process_relation_publications(relid, data->publications,
+ &entry->pubactions,
+ &entry->publish_as_relid);
/*
* Initialize the tuple slot, map, and row filter. These are only used
@@ -2198,8 +2071,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
pgoutput_column_list_init(data, rel_publications, entry);
}
- list_free(pubids);
- list_free(schemaPubids);
list_free(rel_publications);
entry->replicate_valid = true;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 6996073989..ed67168374 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11833,6 +11833,14 @@
proname => 'pg_relation_is_publishable', provolatile => 's',
prorettype => 'bool', proargtypes => 'regclass',
prosrc => 'pg_relation_is_publishable' },
+{ oid => '8139',
+ descr => 'get information on how a relation will be published via a list of publications',
+ proname => 'pg_get_relation_publishing_info', provariadic => 'text',
+ provolatile => 's', prorettype => 'record', proargtypes => 'regclass _text',
+ proallargtypes => '{regclass,_text,_oid,oid,bool,bool,bool,bool}',
+ proargmodes => '{i,v,o,o,o,o,o,o}',
+ proargnames => '{relid,pubnames,pubids,pubasrelid,pubinsert,pubupdate,pubdelete,pubtruncate}',
+ prosrc => 'pg_get_relation_publishing_info' },
# rls
{ oid => '3298',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 6ecaa2a01e..d4e3488917 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -155,4 +155,8 @@ extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid,
extern Bitmapset *pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols,
MemoryContext mcxt);
+extern List *process_relation_publications(Oid relid, const List *publications,
+ PublicationActions *pubactions,
+ Oid *publish_as_relid);
+
#endif /* PG_PUBLICATION_H */
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 69dc6cfd85..dd8fc99111 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -5,6 +5,17 @@ CREATE ROLE regress_publication_user LOGIN SUPERUSER;
CREATE ROLE regress_publication_user2;
CREATE ROLE regress_publication_user_dummy LOGIN NOSUPERUSER;
SET SESSION AUTHORIZATION 'regress_publication_user';
+CREATE FUNCTION published_stream (VARIADIC pubnames text[])
+ RETURNS TABLE (pubname text, published regclass, synced regclass) AS $$
+ -- For each publication, show each published root alongside the tables which
+ -- are published via its OID.
+ SELECT p.pubname, rpi.pubasrelid::regclass, pr.prrelid::regclass
+ FROM pg_publication_rel pr
+ JOIN pg_publication p ON (p.oid = pr.prpubid),
+ pg_get_relation_publishing_info(pr.prrelid, VARIADIC pubnames) rpi
+ WHERE p.pubname = ANY (pubnames)
+ ORDER BY p.oid, 2, 3;
+$$ LANGUAGE sql;
-- suppress warning that depends on wal_level
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_default;
@@ -1686,6 +1697,13 @@ SELECT * FROM pg_publication_tables;
pub | sch1 | tbl1 | {a} |
(1 row)
+SELECT * FROM published_stream('pub');
+ pubname | published | synced
+---------+-----------+-----------------
+ pub | sch1.tbl1 | sch1.tbl1
+ pub | sch1.tbl1 | sch2.tbl1_part1
+(2 rows)
+
DROP PUBLICATION pub;
-- Schema publication that does not include the schema that has the parent table
CREATE PUBLICATION pub FOR TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=0);
@@ -1712,6 +1730,13 @@ SELECT * FROM pg_publication_tables;
pub | sch2 | tbl1_part1 | {a} |
(1 row)
+SELECT * FROM published_stream('pub');
+ pubname | published | synced
+---------+-----------------+-----------------
+ pub | sch1.tbl1 | sch1.tbl1
+ pub | sch2.tbl1_part1 | sch2.tbl1_part1
+(2 rows)
+
DROP PUBLICATION pub;
DROP TABLE sch2.tbl1_part1;
DROP TABLE sch1.tbl1;
@@ -1732,6 +1757,7 @@ DROP PUBLICATION pub;
DROP TABLE sch1.tbl1;
DROP SCHEMA sch1 cascade;
DROP SCHEMA sch2 cascade;
+DROP FUNCTION published_stream;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_publication_user, regress_publication_user2;
DROP ROLE regress_publication_user_dummy;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index d5051a5e74..6bf9554d48 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -6,6 +6,18 @@ CREATE ROLE regress_publication_user2;
CREATE ROLE regress_publication_user_dummy LOGIN NOSUPERUSER;
SET SESSION AUTHORIZATION 'regress_publication_user';
+CREATE FUNCTION published_stream (VARIADIC pubnames text[])
+ RETURNS TABLE (pubname text, published regclass, synced regclass) AS $$
+ -- For each publication, show each published root alongside the tables which
+ -- are published via its OID.
+ SELECT p.pubname, rpi.pubasrelid::regclass, pr.prrelid::regclass
+ FROM pg_publication_rel pr
+ JOIN pg_publication p ON (p.oid = pr.prpubid),
+ pg_get_relation_publishing_info(pr.prrelid, VARIADIC pubnames) rpi
+ WHERE p.pubname = ANY (pubnames)
+ ORDER BY p.oid, 2, 3;
+$$ LANGUAGE sql;
+
-- suppress warning that depends on wal_level
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_default;
@@ -1064,6 +1076,7 @@ SELECT * FROM pg_publication_tables;
-- Table publication that includes both the parent table and the child table
ALTER PUBLICATION pub ADD TABLE sch1.tbl1;
SELECT * FROM pg_publication_tables;
+SELECT * FROM published_stream('pub');
DROP PUBLICATION pub;
-- Schema publication that does not include the schema that has the parent table
@@ -1078,6 +1091,7 @@ SELECT * FROM pg_publication_tables;
-- Table publication that includes both the parent table and the child table
ALTER PUBLICATION pub ADD TABLE sch1.tbl1;
SELECT * FROM pg_publication_tables;
+SELECT * FROM published_stream('pub');
DROP PUBLICATION pub;
DROP TABLE sch2.tbl1_part1;
@@ -1096,6 +1110,7 @@ DROP PUBLICATION pub;
DROP TABLE sch1.tbl1;
DROP SCHEMA sch1 cascade;
DROP SCHEMA sch2 cascade;
+DROP FUNCTION published_stream;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_publication_user, regress_publication_user2;
--
2.25.1
From 0397a525839e1dff3063c69872fac1161ff47c72 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jchamp...@timescale.com>
Date: Mon, 26 Sep 2022 13:23:51 -0700
Subject: [PATCH v2 2/2] WIP: introduce pg_set_logical_root for use with
pubviaroot
Allows regular inherited tables to be published via their root table,
just like partitions. This works by hijacking pg_inherit's inhseqno
column, and replacing a (single) existing entry for the child with the
value zero, indicating that it should be treated as a logical partition
by the publication machinery.
Initial sync works by asking the publisher for a list of logical
descendants of the published table, then COPYing them one-by-one into
the root. The publisher reuses the existing pubviaroot logic, adding the
new logical roots to code that previously looked only for partition
roots.
Known bugs/TODOs:
- The pg_inherits machinery doesn't prohibit changes to inheritance
after an entry has been marked as a logical root.
- I haven't given any thought to interactions with row filters, or to
column lists.
- I'm not sure that I'm taking all the necessary locks yet, and those I
do take may be taken in the wrong order.
- Dump and upgrade aren't supported yet.
---
src/backend/catalog/pg_inherits.c | 212 ++++++++++++-
src/backend/catalog/pg_publication.c | 282 +++++++++++++-----
src/backend/commands/publicationcmds.c | 10 +
src/backend/partitioning/partdesc.c | 3 +-
src/backend/replication/logical/tablesync.c | 257 ++++++++++------
src/backend/replication/pgoutput/pgoutput.c | 1 -
src/include/catalog/pg_inherits.h | 7 +-
src/include/catalog/pg_proc.dat | 12 +
src/test/regress/expected/publication.out | 266 +++++++++++++++++
src/test/regress/sql/publication.sql | 121 ++++++++
src/test/subscription/t/013_partition.pl | 312 ++++++++++++++++++++
11 files changed, 1309 insertions(+), 174 deletions(-)
diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c
index da969bd2f9..b59f93c833 100644
--- a/src/backend/catalog/pg_inherits.c
+++ b/src/backend/catalog/pg_inherits.c
@@ -24,14 +24,22 @@
#include "access/table.h"
#include "catalog/indexing.h"
#include "catalog/pg_inherits.h"
+#include "catalog/partition.h"
+#include "miscadmin.h"
#include "parser/parse_type.h"
#include "storage/lmgr.h"
+#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
+#include "utils/fmgrprotos.h"
+#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
+static List * find_all_inheritors_internal(Oid parentrelId, LOCKMODE lockmode,
+ List **numparents, bool logical_only);
+
/*
* Entry of a hash table used in find_all_inheritors. See below.
*/
@@ -59,14 +67,14 @@ List *
find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
{
return find_inheritance_children_extended(parentrelId, true, lockmode,
- NULL, NULL);
+ NULL, NULL, false);
}
/*
* find_inheritance_children_extended
*
* As find_inheritance_children, with more options regarding detached
- * partitions.
+ * partitions and logical roots.
*
* If a partition's pg_inherits row is marked "detach pending",
* *detached_exist (if not null) is set true.
@@ -78,16 +86,21 @@ find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
* whether the transaction that marked those partitions as detached appears
* committed to the active snapshot. In addition, *detached_xmin (if not null)
* is set to the xmin of the row of the detached partition.
+ *
+ * If logical_only is true, only tables marked explicitly via
+ * pg_set_logical_root() are included in the output list.
*/
List *
find_inheritance_children_extended(Oid parentrelId, bool omit_detached,
LOCKMODE lockmode, bool *detached_exist,
- TransactionId *detached_xmin)
+ TransactionId *detached_xmin,
+ bool logical_only)
{
List *list = NIL;
Relation relation;
+ Oid index;
SysScanDesc scan;
- ScanKeyData key[1];
+ ScanKeyData key[2];
HeapTuple inheritsTuple;
Oid inhrelid;
Oid *oidarr;
@@ -110,14 +123,24 @@ find_inheritance_children_extended(Oid parentrelId, bool omit_detached,
numoids = 0;
relation = table_open(InheritsRelationId, AccessShareLock);
+ index = InheritsParentIndexId;
ScanKeyInit(&key[0],
Anum_pg_inherits_inhparent,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(parentrelId));
- scan = systable_beginscan(relation, InheritsParentIndexId, true,
- NULL, 1, key);
+ if (logical_only)
+ {
+ index = InheritsParentSeqnoIndexId;
+ ScanKeyInit(&key[1],
+ Anum_pg_inherits_inhseqno,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum(0));
+ }
+
+ scan = systable_beginscan(relation, index, true,
+ NULL, logical_only ? 2 : 1, key);
while ((inheritsTuple = systable_getnext(scan)) != NULL)
{
@@ -254,6 +277,20 @@ find_inheritance_children_extended(Oid parentrelId, bool omit_detached,
*/
List *
find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
+{
+ return find_all_inheritors_internal(parentrelId, lockmode, numparents,
+ false);
+}
+
+List *
+find_all_logical_inheritors(Oid parentrelId)
+{
+ return find_all_inheritors_internal(parentrelId, NoLock, NULL, true);
+}
+
+static List *
+find_all_inheritors_internal(Oid parentrelId, LOCKMODE lockmode,
+ List **numparents, bool logical_only)
{
/* hash table for O(1) rel_oid -> rel_numparents cell lookup */
HTAB *seen_rels;
@@ -290,7 +327,9 @@ find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
ListCell *lc;
/* Get the direct children of this rel */
- currentchildren = find_inheritance_children(currentrel, lockmode);
+ currentchildren =
+ find_inheritance_children_extended(currentrel, true, lockmode,
+ NULL, NULL, logical_only);
/*
* Add to the queue only those children not already seen. This avoids
@@ -655,3 +694,162 @@ PartitionHasPendingDetach(Oid partoid)
elog(ERROR, "relation %u is not a partition", partoid);
return false; /* keep compiler quiet */
}
+
+static Oid
+get_logical_parent_worker(Relation inhRel, Oid relid)
+{
+ SysScanDesc scan;
+ ScanKeyData key[2];
+ Oid result = InvalidOid;
+ HeapTuple tuple;
+
+ ScanKeyInit(&key[0],
+ Anum_pg_inherits_inhrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(relid));
+ ScanKeyInit(&key[1],
+ Anum_pg_inherits_inhseqno,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum(0));
+
+ scan = systable_beginscan(inhRel, InheritsRelidSeqnoIndexId, true,
+ NULL, 2, key);
+ tuple = systable_getnext(scan);
+ if (HeapTupleIsValid(tuple))
+ {
+ Form_pg_inherits form = (Form_pg_inherits) GETSTRUCT(tuple);
+ result = form->inhparent;
+ }
+
+ systable_endscan(scan);
+
+ return result;
+}
+
+static void
+get_logical_ancestors_worker(Relation inhRel, Oid relid, List **ancestors)
+{
+ Oid parentOid;
+
+ /*
+ * Recursion ends at the topmost level, ie., when there's no parent.
+ */
+ parentOid = get_logical_parent_worker(inhRel, relid);
+ if (parentOid == InvalidOid)
+ return;
+
+ *ancestors = lappend_oid(*ancestors, parentOid);
+ get_logical_ancestors_worker(inhRel, parentOid, ancestors);
+}
+
+List *
+get_logical_ancestors(Oid relid, bool is_partition)
+{
+ List *result = NIL;
+ Relation inhRel;
+
+ /* For partitions, this is identical to get_partition_ancestors(). */
+ if (is_partition)
+ return get_partition_ancestors(relid);
+
+ inhRel = table_open(InheritsRelationId, AccessShareLock);
+ get_logical_ancestors_worker(inhRel, relid, &result);
+ table_close(inhRel, AccessShareLock);
+
+ return result;
+}
+
+bool
+has_logical_parent(Relation inhRel, Oid relid)
+{
+ return (get_logical_parent_worker(inhRel, relid) != InvalidOid);
+}
+
+Datum
+pg_set_logical_root(PG_FUNCTION_ARGS)
+{
+ Oid tableoid = PG_GETARG_OID(0);
+ Oid rootoid = PG_GETARG_OID(1);
+ char *tablename;
+ char *rootname;
+ Relation inhRel;
+ ScanKeyData key;
+ SysScanDesc scan;
+ Oid parent = InvalidOid;
+ HeapTuple tuple, copyTuple;
+ Form_pg_inherits form;
+
+ /*
+ * Check that the tables exist.
+ * TODO: check identical schemas too? or does replication handle that?
+ */
+ tablename = get_rel_name(tableoid);
+ if (tablename == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("OID %u does not refer to a table", tableoid)));
+ rootname = get_rel_name(rootoid);
+ if (rootname == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("OID %u does not refer to a table", rootoid)));
+
+ /* Check ownership. */
+ if (!object_ownercheck(RelationRelationId, tableoid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER,
+ get_relkind_objtype(get_rel_relkind(tableoid)),
+ tablename);
+ if (!object_ownercheck(RelationRelationId, rootoid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER,
+ get_relkind_objtype(get_rel_relkind(rootoid)),
+ rootname);
+
+ /* Open pg_inherits with RowExclusiveLock so that we can update it. */
+ inhRel = table_open(InheritsRelationId, RowExclusiveLock);
+
+ /*
+ * We have to make sure that the inheritance relationship already exists,
+ * and that there is only one existing parent for this table.
+ *
+ * TODO: do we have to lock the tables themselves to avoid races?
+ */
+ ScanKeyInit(&key,
+ Anum_pg_inherits_inhrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(tableoid));
+
+ scan = systable_beginscan(inhRel, InheritsRelidSeqnoIndexId, true,
+ NULL, 1, &key);
+ tuple = systable_getnext(scan);
+ if (HeapTupleIsValid(tuple))
+ {
+ form = (Form_pg_inherits) GETSTRUCT(tuple);
+ parent = form->inhparent;
+ copyTuple = heap_copytuple(tuple);
+
+ if (HeapTupleIsValid(systable_getnext(scan)))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("table \"%s\" inherits from multiple tables",
+ tablename)));
+ }
+
+ if (parent != rootoid)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("table \"%s\" does not inherit from intended root table \"%s\"",
+ tablename, rootname)));
+
+ systable_endscan(scan);
+
+ /* Mark the inheritance as a logical root by setting it to zero. */
+ form = (Form_pg_inherits) GETSTRUCT(copyTuple);
+ form->inhseqno = 0;
+
+ CatalogTupleUpdate(inhRel, ©Tuple->t_self, copyTuple);
+
+ heap_freetuple(copyTuple);
+ table_close(inhRel, RowExclusiveLock);
+
+ PG_RETURN_VOID();
+}
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index df9abf09c3..f23c75d058 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -51,6 +51,7 @@ typedef struct
Oid relid; /* OID of published table */
Oid pubid; /* OID of publication that publishes this
* table. */
+ bool viaroot; /* does pubid use publish_via_partition_root? */
} published_rel;
static void publication_translate_columns(Relation targetrel, List *columns,
@@ -180,56 +181,46 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
}
/*
- * Returns true if the ancestor is in the list of published relations.
- * Otherwise, returns false.
- */
-static bool
-is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
-{
- ListCell *lc;
-
- foreach(lc, table_infos)
- {
- Oid relid = ((published_rel *) lfirst(lc))->relid;
-
- if (relid == ancestor)
- return true;
- }
-
- return false;
-}
-
-/*
- * Filter out the partitions whose parent tables are also present in the list.
+ * Filter out the tables who are already being published via a logical root.
+ *
+ * If we only had partitioned tables to check, this would be much easier: it's
+ * impossible for a partition root to be in this list unless pubviaroot=true for
+ * that table, so we would just have to check for the existence of an ancestor,
+ * and if any were found then we'd filter the table.
+ *
+ * Logical roots add another wrinkle, because it's acceptable for a
+ * pubviaroot=false publication to contain a logical root. (Logical roots can
+ * contain data of their own, unlike partition roots.) It's also possible for a
+ * logical root to be included in a pubviaroot publication without any or all of
+ * its children. This is not possible for a partition root, where all of the
+ * leaves are implicitly included.
+ *
+ * So, to keep things relatively consistent, this now uses the same code that
+ * the output plugin uses to decide which relation OID to publish data under. If
+ * the publishing OID is not the same as the relation OID, we remove it from
+ * this list.
+ *
+ * XXX And that's expensive.
*/
static void
-filter_partitions(List *table_infos)
+filter_published_descendants(List *table_infos, List *publications)
{
ListCell *lc;
foreach(lc, table_infos)
{
- bool skip = false;
- List *ancestors = NIL;
- ListCell *lc2;
published_rel *table_info = (published_rel *) lfirst(lc);
+ Oid publish_as_relid;
+ List *rel_publications;
- if (get_rel_relispartition(table_info->relid))
- ancestors = get_partition_ancestors(table_info->relid);
-
- foreach(lc2, ancestors)
- {
- Oid ancestor = lfirst_oid(lc2);
-
- if (is_ancestor_member_tableinfos(ancestor, table_infos))
- {
- skip = true;
- break;
- }
- }
+ rel_publications =
+ process_relation_publications(table_info->relid, publications, NULL,
+ &publish_as_relid);
- if (skip)
+ if (publish_as_relid != table_info->relid)
table_infos = foreach_delete_current(table_infos, lc);
+
+ list_free(rel_publications);
}
}
@@ -805,11 +796,15 @@ List *
GetAllTablesPublicationRelations(bool pubviaroot)
{
Relation classRel;
+ Relation inhRel;
ScanKeyData key[1];
TableScanDesc scan;
HeapTuple tuple;
List *result = NIL;
+ /* TODO: is there a required order to acquire these locks? */
+ if (pubviaroot)
+ inhRel = table_open(InheritsRelationId, AccessShareLock);
classRel = table_open(RelationRelationId, AccessShareLock);
ScanKeyInit(&key[0],
@@ -825,7 +820,8 @@ GetAllTablesPublicationRelations(bool pubviaroot)
Oid relid = relForm->oid;
if (is_publishable_class(relid, relForm) &&
- !(relForm->relispartition && pubviaroot))
+ !(relForm->relispartition && pubviaroot) &&
+ !(pubviaroot && has_logical_parent(inhRel, relid)))
result = lappend_oid(result, relid);
}
@@ -854,6 +850,9 @@ GetAllTablesPublicationRelations(bool pubviaroot)
}
table_close(classRel, AccessShareLock);
+ if (pubviaroot)
+ table_close(inhRel, AccessShareLock);
+
return result;
}
@@ -1070,6 +1069,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
int nelems,
i;
bool viaroot = false;
+ List *publications = NIL;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
@@ -1093,6 +1093,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
ListCell *lc;
pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
+ publications = lappend(publications, pub_elem);
/*
* Publications support partitioned tables. If
@@ -1132,6 +1133,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
table_info->relid = lfirst_oid(lc);
table_info->pubid = pub_elem->oid;
+ table_info->viaroot = pub_elem->pubviaroot;
table_infos = lappend(table_infos, table_info);
}
@@ -1141,15 +1143,14 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
}
/*
- * If the publication publishes partition changes via their respective
- * root partitioned tables, we must exclude partitions in favor of
- * including the root partitioned tables. Otherwise, the function
- * could return both the child and parent tables which could cause
- * data of the child table to be double-published on the subscriber
- * side.
+ * If the publication publishes table changes via their respective
+ * logical root tables, we must exclude partitions in favor of
+ * including the root tables. Otherwise, the function could return
+ * both the child and parent tables which could cause data of the
+ * child table to be double-published on the subscriber side.
*/
if (viaroot)
- filter_partitions(table_infos);
+ filter_published_descendants(table_infos, publications);
/* Construct a tuple descriptor for the result rows. */
tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
@@ -1165,6 +1166,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
funcctx->user_fctx = (void *) table_infos;
+ list_free(publications);
+
MemoryContextSwitchTo(oldcontext);
}
@@ -1260,6 +1263,118 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+/*
+ * Returns a list of tables that should be copied during initial sync for the
+ * given root table and publications.
+ *
+ * The only time this list consists of anything more than the table itself is
+ * when a publication's publish_via_partition_root is set to true and the table
+ * has inherited child tables in the publications that have been marked with
+ * pg_set_logical_root().
+ */
+Datum
+pg_get_publication_rels_to_sync(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ List *tables = NIL;
+
+ /* stuff done only on the first call of the function */
+ if (SRF_IS_FIRSTCALL())
+ {
+ Oid rootid = PG_GETARG_OID(0);
+ MemoryContext oldcontext;
+ ArrayType *arr;
+ Datum *elems;
+ int nelems,
+ i;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /* switch to memory context appropriate for multiple function calls */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /*
+ * Deconstruct the parameter into elements where each element is a
+ * publication name.
+ */
+ arr = PG_GETARG_ARRAYTYPE_P(1);
+ deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
+ &elems, NULL, &nelems);
+
+ /* Get Oids of tables from each publication. */
+ for (i = 0; i < nelems; i++)
+ {
+ char *pubname = TextDatumGetCString(elems[i]);
+ Publication *publication = GetPublicationByName(pubname, false);
+ List *pub_tables = NIL;
+
+ /* TODO: do the tables in this list need to be locked? */
+ if (publication->pubviaroot)
+ pub_tables = find_all_logical_inheritors(rootid);
+ else
+ pub_tables = list_make1_oid(rootid);
+
+ if (!publication->alltables)
+ {
+ List *relids;
+ List *schemarelids;
+ List *published;
+ ListCell *cell;
+
+ relids = GetPublicationRelations(publication->oid,
+ publication->pubviaroot ?
+ PUBLICATION_PART_ROOT :
+ PUBLICATION_PART_ALL);
+ schemarelids = GetAllSchemaPublicationRelations(publication->oid,
+ publication->pubviaroot ?
+ PUBLICATION_PART_ROOT :
+ PUBLICATION_PART_LEAF);
+ published = list_concat_unique_oid(relids, schemarelids);
+
+ /*
+ * First we have to check to make sure the root table is
+ * actually part of the publication; if not, none of its
+ * descendants' contents belong to it.
+ */
+ if (!list_member_oid(published, rootid))
+ continue;
+
+ /*
+ * Now filter out any descendants that aren't part of this
+ * particular publication.
+ */
+ foreach(cell, pub_tables)
+ {
+ Oid current = lfirst_oid(cell);
+
+ if (!list_member_oid(published, current))
+ pub_tables = foreach_delete_current(pub_tables, cell);
+ }
+ }
+
+ tables = list_concat_unique_oid(tables, pub_tables);
+ }
+
+ funcctx->user_fctx = (void *) tables;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ /* stuff done on every call of the function */
+ funcctx = SRF_PERCALL_SETUP();
+ tables = (List *) funcctx->user_fctx;
+
+ if (funcctx->call_cntr < list_length(tables))
+ {
+ Oid relid = list_nth_oid(tables, funcctx->call_cntr);
+
+ SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
+
List *
process_relation_publications(Oid relid, const List *publications,
PublicationActions *pubactions,
@@ -1279,8 +1394,7 @@ process_relation_publications(Oid relid, const List *publications,
bool am_partition = get_rel_relispartition(relid);
char relkind = get_rel_relkind(relid);
List *rel_publications = NIL;
-
- *publish_as_relid = relid;
+ Oid publish_candidate = relid;
foreach(lc, publications)
{
@@ -1296,55 +1410,57 @@ process_relation_publications(Oid relid, const List *publications,
int ancestor_level = 0;
/*
- * If this is a FOR ALL TABLES publication, pick the partition
+ * If this is a FOR ALL TABLES publication, pick the logical
* root and set the ancestor level accordingly.
*/
if (pub->alltables)
{
publish = true;
- if (pub->pubviaroot && am_partition)
+ if (pub->pubviaroot)
{
- List *ancestors = get_partition_ancestors(relid);
+ List *ancestors;
- pub_relid = llast_oid(ancestors);
- ancestor_level = list_length(ancestors);
+ ancestors = get_logical_ancestors(relid, am_partition);
+ if (ancestors != NIL)
+ {
+ pub_relid = llast_oid(ancestors);
+ ancestor_level = list_length(ancestors);
+ }
}
}
if (!publish)
{
bool ancestor_published = false;
+ Oid ancestor;
+ int level;
+ List *ancestors;
/*
- * For a partition, check if any of the ancestors are
- * published. If so, note down the topmost ancestor that is
+ * Check if any of the logical ancestors (that is, partition
+ * parents or tables marked with pg_set_logical_root()) are
+ * published. If so, note down the topmost ancestor that is
* published via this publication, which will be used as the
- * relation via which to publish the partition's changes.
+ * relation via which to publish this table's changes.
*/
- if (am_partition)
- {
- Oid ancestor;
- int level;
- List *ancestors = get_partition_ancestors(relid);
-
- ancestor = GetTopMostAncestorInPublication(pub->oid,
- ancestors,
- &level);
+ ancestors = get_logical_ancestors(relid, am_partition);
+ ancestor = GetTopMostAncestorInPublication(pub->oid,
+ ancestors,
+ &level);
- if (ancestor != InvalidOid)
+ if (ancestor != InvalidOid)
+ {
+ ancestor_published = true;
+ if (pub->pubviaroot)
{
- ancestor_published = true;
- if (pub->pubviaroot)
- {
- pub_relid = ancestor;
- ancestor_level = level;
- }
+ pub_relid = ancestor;
+ ancestor_level = level;
}
}
if (list_member_oid(pubids, pub->oid) ||
list_member_oid(schemaPubids, pub->oid) ||
- ancestor_published)
+ (am_partition && ancestor_published))
publish = true;
}
@@ -1359,10 +1475,13 @@ process_relation_publications(Oid relid, const List *publications,
if (publish &&
(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
{
- pubactions->pubinsert |= pub->pubactions.pubinsert;
- pubactions->pubupdate |= pub->pubactions.pubupdate;
- pubactions->pubdelete |= pub->pubactions.pubdelete;
- pubactions->pubtruncate |= pub->pubactions.pubtruncate;
+ if (pubactions)
+ {
+ pubactions->pubinsert |= pub->pubactions.pubinsert;
+ pubactions->pubupdate |= pub->pubactions.pubupdate;
+ pubactions->pubdelete |= pub->pubactions.pubdelete;
+ pubactions->pubtruncate |= pub->pubactions.pubtruncate;
+ }
/*
* We want to publish the changes as the top-most ancestor
@@ -1381,7 +1500,7 @@ process_relation_publications(Oid relid, const List *publications,
*/
if (publish_ancestor_level < ancestor_level)
{
- *publish_as_relid = pub_relid;
+ publish_candidate = pub_relid;
publish_ancestor_level = ancestor_level;
/* reset the publication list for this relation */
@@ -1390,7 +1509,7 @@ process_relation_publications(Oid relid, const List *publications,
else
{
/* Same ancestor level, has to be the same OID. */
- Assert(*publish_as_relid == pub_relid);
+ Assert(publish_candidate == pub_relid);
}
/* Track publications for this ancestor. */
@@ -1401,6 +1520,9 @@ process_relation_publications(Oid relid, const List *publications,
list_free(pubids);
list_free(schemaPubids);
+ if (publish_as_relid)
+ *publish_as_relid = publish_candidate;
+
return rel_publications;
}
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index f4ba572697..7e23bed6c0 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -238,6 +238,8 @@ contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
* parent table, but the bitmap contains the replica identity
* information of the child table. So, get the column number of the
* child table as parent and child column order could be different.
+ *
+ * TODO: is this applicable to pg_set_logical_root()?
*/
if (context->pubviaroot)
{
@@ -286,6 +288,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
*
* Note that even though the row filter used is for an ancestor, the
* REPLICA IDENTITY used will be for the actual child table.
+ *
+ * TODO: is this applicable to pg_set_logical_root()?
*/
if (pubviaroot && relation->rd_rel->relispartition)
{
@@ -336,6 +340,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
* the column list.
*
* Returns true if any replica identity column is not covered by column list.
+ *
+ * TODO: pg_set_logical_root()?
*/
bool
pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
@@ -628,6 +634,8 @@ TransformPubWhereClauses(List *tables, const char *queryString,
* If the publication doesn't publish changes via the root partitioned
* table, the partition's row filter will be used. So disallow using
* WHERE clause on partitioned table in this case.
+ *
+ * TODO: decide how this interacts with pg_set_logical_root
*/
if (!pubviaroot &&
pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
@@ -715,6 +723,8 @@ CheckPubRelationColumnList(char *pubname, List *tables,
* If the publication doesn't publish changes via the root partitioned
* table, the partition's column list will be used. So disallow using
* a column list on the partitioned table in this case.
+ *
+ * TODO: decide if this interacts with pg_set_logical_root()
*/
if (!pubviaroot &&
pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
diff --git a/src/backend/partitioning/partdesc.c b/src/backend/partitioning/partdesc.c
index 7a2b5e57ff..dfb27990f9 100644
--- a/src/backend/partitioning/partdesc.c
+++ b/src/backend/partitioning/partdesc.c
@@ -162,7 +162,8 @@ RelationBuildPartitionDesc(Relation rel, bool omit_detached)
inhoids = find_inheritance_children_extended(RelationGetRelid(rel),
omit_detached, NoLock,
&detached_exist,
- &detached_xmin);
+ &detached_xmin,
+ false);
nparts = list_length(inhoids);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index c56d42dcd2..db0f56b05e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -757,11 +757,12 @@ copy_read_data(void *outbuf, int minread, int maxread)
/*
* Get information about remote relation in similar fashion the RELATION
* message provides during replication. This function also returns the relation
- * qualifications to be used in the COPY command.
+ * qualifications to be used in the COPY command, and the list of tables to COPY
+ * (which for most tables will contain only one entry).
*/
static void
fetch_remote_table_info(char *nspname, char *relname,
- LogicalRepRelation *lrel, List **qual)
+ LogicalRepRelation *lrel, List **qual, List **to_copy)
{
WalRcvExecResult *res;
StringInfoData cmd;
@@ -1072,6 +1073,87 @@ fetch_remote_table_info(char *nspname, char *relname,
walrcv_clear_result(res);
}
+ /*
+ * See if there are any other tables to be copied besides the original. This
+ * happens when a descendant in the inheritance relationship is marked with
+ * pg_set_logical_root() and is part of a publication with
+ * publish_via_partition_root = true.
+ *
+ * FIXME: if two publications have conflicting settings for
+ * publish_via_partition_root, this behaves strangely. It looks like that's
+ * true for regular partitions too...
+ */
+ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000)
+ {
+ Oid descRow[] = {TEXTOID, TEXTOID};
+ StringInfoData pub_names;
+
+ /* Build the pubname list. */
+ initStringInfo(&pub_names);
+ foreach(lc, MySubscription->publications)
+ {
+ char *pubname = strVal(lfirst(lc));
+
+ if (foreach_current_index(lc) > 0)
+ appendStringInfoString(&pub_names, ", ");
+
+ appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
+ }
+
+ /*
+ * Ask the server what it wants us to sync.
+ */
+ resetStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "SELECT DISTINCT n.nspname, c.relname"
+ " FROM pg_catalog.pg_publication p,"
+ " pg_catalog.pg_class c"
+ " JOIN pg_catalog.pg_namespace n"
+ " ON (c.relnamespace = n.oid)"
+ " JOIN LATERAL pg_catalog.pg_get_publication_rels_to_sync(%u::regclass, p.pubname) relid"
+ " ON (c.oid = relid)"
+ " WHERE p.pubname IN ( %s )",
+ lrel->remoteid, pub_names.data);
+
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(descRow), descRow);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not fetch logical descendants for table \"%s.%s\" from publisher: %s",
+ nspname, relname, res->err)));
+
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *desc_nspname;
+ char *desc_relname;
+ char *quoted;
+
+ desc_nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+ desc_relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ quoted = quote_qualified_identifier(desc_nspname, desc_relname);
+ *to_copy = lappend(*to_copy, quoted);
+
+ ExecClearTuple(slot);
+ }
+
+ ExecDropSingleTupleTableSlot(slot);
+ walrcv_clear_result(res);
+
+ pfree(pub_names.data);
+ }
+ else
+ {
+ /* For older servers, we only COPY the table itself. */
+ char *quoted = quote_qualified_identifier(lrel->nspname,
+ lrel->relname);
+ *to_copy = lappend(*to_copy, quoted);
+ }
+
pfree(cmd.data);
}
@@ -1086,6 +1168,8 @@ copy_table(Relation rel)
LogicalRepRelMapEntry *relmapentry;
LogicalRepRelation lrel;
List *qual = NIL;
+ List *to_copy = NIL;
+ ListCell *cur;
WalRcvExecResult *res;
StringInfoData cmd;
CopyFromState cstate;
@@ -1095,7 +1179,8 @@ copy_table(Relation rel)
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), &lrel, &qual);
+ RelationGetRelationName(rel), &lrel, &qual,
+ &to_copy);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
@@ -1104,105 +1189,109 @@ copy_table(Relation rel)
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
Assert(rel == relmapentry->localrel);
- /* Start copy on the publisher. */
- initStringInfo(&cmd);
-
- /* Regular table with no row filter */
- if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+ foreach(cur, to_copy)
{
- appendStringInfo(&cmd, "COPY %s (",
- quote_qualified_identifier(lrel.nspname, lrel.relname));
-
- /*
- * XXX Do we need to list the columns in all cases? Maybe we're
- * replicating all columns?
- */
- for (int i = 0; i < lrel.natts; i++)
- {
- if (i > 0)
- appendStringInfoString(&cmd, ", ");
+ char *quoted_name = lfirst(cur);
- appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
- }
+ /* Start copy on the publisher. */
+ initStringInfo(&cmd);
- appendStringInfoString(&cmd, ") TO STDOUT");
- }
- else
- {
- /*
- * For non-tables and tables with row filters, we need to do COPY
- * (SELECT ...), but we can't just do SELECT * because we need to not
- * copy generated columns. For tables with any row filters, build a
- * SELECT query with OR'ed row filters for COPY.
- */
- appendStringInfoString(&cmd, "COPY (SELECT ");
- for (int i = 0; i < lrel.natts; i++)
+ /* Regular table with no row filter */
+ if (lrel.relkind == RELKIND_RELATION && qual == NIL)
{
- appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
- if (i < lrel.natts - 1)
- appendStringInfoString(&cmd, ", ");
- }
+ appendStringInfo(&cmd, "COPY %s (", quoted_name);
- appendStringInfoString(&cmd, " FROM ");
+ /*
+ * XXX Do we need to list the columns in all cases? Maybe we're
+ * replicating all columns?
+ */
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ if (i > 0)
+ appendStringInfoString(&cmd, ", ");
- /*
- * For regular tables, make sure we don't copy data from a child that
- * inherits the named table as those will be copied separately.
- */
- if (lrel.relkind == RELKIND_RELATION)
- appendStringInfoString(&cmd, "ONLY ");
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ }
- appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
- /* list of OR'ed filters */
- if (qual != NIL)
+ appendStringInfoString(&cmd, ") TO STDOUT");
+ }
+ else
{
- ListCell *lc;
- char *q = strVal(linitial(qual));
+ /*
+ * For non-tables and tables with row filters, we need to do COPY
+ * (SELECT ...), but we can't just do SELECT * because we need to not
+ * copy generated columns. For tables with any row filters, build a
+ * SELECT query with OR'ed row filters for COPY.
+ */
+ appendStringInfoString(&cmd, "COPY (SELECT ");
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ if (i < lrel.natts - 1)
+ appendStringInfoString(&cmd, ", ");
+ }
+
+ appendStringInfoString(&cmd, " FROM ");
+
+ /*
+ * For regular tables, make sure we don't copy data from a child that
+ * inherits the named table as those will be copied separately.
+ */
+ if (lrel.relkind == RELKIND_RELATION)
+ appendStringInfoString(&cmd, "ONLY ");
- appendStringInfo(&cmd, " WHERE %s", q);
- for_each_from(lc, qual, 1)
+ appendStringInfoString(&cmd, quoted_name);
+ /* list of OR'ed filters */
+ if (qual != NIL)
{
- q = strVal(lfirst(lc));
- appendStringInfo(&cmd, " OR %s", q);
+ ListCell *lc;
+ char *q = strVal(linitial(qual));
+
+ appendStringInfo(&cmd, " WHERE %s", q);
+ for_each_from(lc, qual, 1)
+ {
+ q = strVal(lfirst(lc));
+ appendStringInfo(&cmd, " OR %s", q);
+ }
+ list_free_deep(qual);
}
- list_free_deep(qual);
- }
- appendStringInfoString(&cmd, ") TO STDOUT");
- }
+ appendStringInfoString(&cmd, ") TO STDOUT");
+ }
- /*
- * Prior to v16, initial table synchronization will use text format even
- * if the binary option is enabled for a subscription.
- */
- if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
- MySubscription->binary)
- {
- appendStringInfoString(&cmd, " WITH (FORMAT binary)");
- options = list_make1(makeDefElem("format",
- (Node *) makeString("binary"), -1));
- }
+ /*
+ * Prior to v16, initial table synchronization will use text format even
+ * if the binary option is enabled for a subscription.
+ */
+ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
+ MySubscription->binary)
+ {
+ appendStringInfoString(&cmd, " WITH (FORMAT binary)");
+ options = list_make1(makeDefElem("format",
+ (Node *) makeString("binary"), -1));
+ }
- res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
- pfree(cmd.data);
- if (res->status != WALRCV_OK_COPY_OUT)
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("could not start initial contents copy for table \"%s.%s\": %s",
- lrel.nspname, lrel.relname, res->err)));
- walrcv_clear_result(res);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
+ pfree(cmd.data);
+ if (res->status != WALRCV_OK_COPY_OUT)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not start initial contents copy for table \"%s.%s\" from remote %s: %s",
+ lrel.nspname, lrel.relname, quoted_name, res->err)));
+ walrcv_clear_result(res);
- copybuf = makeStringInfo();
+ copybuf = makeStringInfo();
- pstate = make_parsestate(NULL);
- (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
- NULL, false, false);
+ pstate = make_parsestate(NULL);
+ (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
+ NULL, false, false);
- attnamelist = make_copy_attnamelist(relmapentry);
- cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
+ attnamelist = make_copy_attnamelist(relmapentry);
+ cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
- /* Do the copy */
- (void) CopyFrom(cstate);
+ /* Do the copy */
+ (void) CopyFrom(cstate);
+ }
logicalrep_rel_close(relmapentry, NoLock);
}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 640676e7dc..7c54b46ac7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1462,7 +1462,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
- Assert(relation->rd_rel->relispartition);
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
targetrel = ancestor;
}
diff --git a/src/include/catalog/pg_inherits.h b/src/include/catalog/pg_inherits.h
index ce154ab943..cfcacacf76 100644
--- a/src/include/catalog/pg_inherits.h
+++ b/src/include/catalog/pg_inherits.h
@@ -46,14 +46,17 @@ typedef FormData_pg_inherits *Form_pg_inherits;
DECLARE_UNIQUE_INDEX_PKEY(pg_inherits_relid_seqno_index, 2680, InheritsRelidSeqnoIndexId, on pg_inherits using btree(inhrelid oid_ops, inhseqno int4_ops));
DECLARE_INDEX(pg_inherits_parent_index, 2187, InheritsParentIndexId, on pg_inherits using btree(inhparent oid_ops));
+DECLARE_INDEX(pg_inherits_parent_seqno_index, 8138, InheritsParentSeqnoIndexId, on pg_inherits using btree(inhparent oid_ops, inhseqno int4_ops));
extern List *find_inheritance_children(Oid parentrelId, LOCKMODE lockmode);
extern List *find_inheritance_children_extended(Oid parentrelId, bool omit_detached,
- LOCKMODE lockmode, bool *detached_exist, TransactionId *detached_xmin);
+ LOCKMODE lockmode, bool *detached_exist, TransactionId *detached_xmin,
+ bool logical_only);
extern List *find_all_inheritors(Oid parentrelId, LOCKMODE lockmode,
List **numparents);
+extern List *find_all_logical_inheritors(Oid parentrelId);
extern bool has_subclass(Oid relationId);
extern bool has_superclass(Oid relationId);
extern bool typeInheritsFrom(Oid subclassTypeId, Oid superclassTypeId);
@@ -63,5 +66,7 @@ extern bool DeleteInheritsTuple(Oid inhrelid, Oid inhparent,
bool expect_detach_pending,
const char *childname);
extern bool PartitionHasPendingDetach(Oid partoid);
+extern List *get_logical_ancestors(Oid relid, bool is_partition);
+extern bool has_logical_parent(Relation inhRel, Oid relid);
#endif /* PG_INHERITS_H */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index ed67168374..1dfc8fdd1e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11833,6 +11833,13 @@
proname => 'pg_relation_is_publishable', provolatile => 's',
prorettype => 'bool', proargtypes => 'regclass',
prosrc => 'pg_relation_is_publishable' },
+{ oid => '8137',
+ descr => 'get the relations to copy from all specified publications during a table\'s initial sync',
+ proname => 'pg_get_publication_rels_to_sync', prorows => '10',
+ provariadic => 'text', proretset => 't', provolatile => 's',
+ prorettype => 'regclass', proargtypes => 'regclass _text',
+ proargmodes => '{i,v}', proargnames => '{rootid,pubnames}',
+ prosrc => 'pg_get_publication_rels_to_sync' },
{ oid => '8139',
descr => 'get information on how a relation will be published via a list of publications',
proname => 'pg_get_relation_publishing_info', provariadic => 'text',
@@ -11999,6 +12006,11 @@
proname => 'pg_partition_root', prorettype => 'regclass',
proargtypes => 'regclass', prosrc => 'pg_partition_root' },
+{ oid => '8136', descr => 'mark a table root for logical replication',
+ proname => 'pg_set_logical_root', provolatile => 'v', proparallel => 'u',
+ prorettype => 'void', proargtypes => 'regclass regclass',
+ prosrc => 'pg_set_logical_root' },
+
{ oid => '4350', descr => 'Unicode normalization',
proname => 'normalize', prorettype => 'text', proargtypes => 'text text',
prosrc => 'unicode_normalize_func' },
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index dd8fc99111..9587674edc 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -5,6 +5,17 @@ CREATE ROLE regress_publication_user LOGIN SUPERUSER;
CREATE ROLE regress_publication_user2;
CREATE ROLE regress_publication_user_dummy LOGIN NOSUPERUSER;
SET SESSION AUTHORIZATION 'regress_publication_user';
+CREATE FUNCTION published_sync (VARIADIC pubnames text[])
+ RETURNS TABLE (published regclass, synced regclass) AS $$
+ -- Show each published table alongside the tables to be copied into it during
+ -- initial sync.
+ SELECT t.published, synced
+ FROM (SELECT DISTINCT relid::regclass AS published
+ FROM pg_get_publication_tables(VARIADIC pubnames)) t
+ JOIN LATERAL pg_get_publication_rels_to_sync(t.published, VARIADIC pubnames) synced
+ ON true
+ ORDER BY 1, 2;
+$$ LANGUAGE sql;
CREATE FUNCTION published_stream (VARIADIC pubnames text[])
RETURNS TABLE (pubname text, published regclass, synced regclass) AS $$
-- For each publication, show each published root alongside the tables which
@@ -1697,6 +1708,12 @@ SELECT * FROM pg_publication_tables;
pub | sch1 | tbl1 | {a} |
(1 row)
+SELECT * FROM published_sync('pub');
+ published | synced
+-----------+-----------
+ sch1.tbl1 | sch1.tbl1
+(1 row)
+
SELECT * FROM published_stream('pub');
pubname | published | synced
---------+-----------+-----------------
@@ -1730,6 +1747,12 @@ SELECT * FROM pg_publication_tables;
pub | sch2 | tbl1_part1 | {a} |
(1 row)
+SELECT * FROM published_sync('pub');
+ published | synced
+-----------------+-----------------
+ sch2.tbl1_part1 | sch2.tbl1_part1
+(1 row)
+
SELECT * FROM published_stream('pub');
pubname | published | synced
---------+-----------------+-----------------
@@ -1752,12 +1775,255 @@ SELECT * FROM pg_publication_tables;
pub | sch1 | tbl1 | {a} |
(1 row)
+-- Sanity check cases for pg_set_logical_root().
+CREATE TABLE sch1.iroot (a int);
+CREATE TABLE sch1.ipart1 (a int);
+CREATE TABLE sch1.ipart2 () INHERITS (sch1.iroot);
+-- marking roots between unrelated tables is not allowed
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+ERROR: table "ipart1" does not inherit from intended root table "iroot"
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.tbl1');
+ERROR: table "ipart2" does not inherit from intended root table "tbl1"
+-- establishing an inheritance relationship fixes the problem
+ALTER TABLE sch1.ipart1 INHERIT sch1.iroot;
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+ pg_set_logical_root
+---------------------
+
+(1 row)
+
+-- but multiple inheritance is not allowed
+ALTER TABLE sch1.ipart2 INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+ERROR: table "ipart2" inherits from multiple tables
+ALTER TABLE sch1.ipart2 NO INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+ pg_set_logical_root
+---------------------
+
+(1 row)
+
+-- table ownership must match, like ATTACH PARTITION
+CREATE ROLE regress_test_me;
+CREATE ROLE regress_test_not_me;
+CREATE TABLE root (a int);
+CREATE TABLE part () INHERITS (root);
+ALTER TABLE root OWNER TO regress_test_me;
+ALTER TABLE part OWNER TO regress_test_not_me;
+SET SESSION AUTHORIZATION regress_test_me;
+SELECT pg_set_logical_root('part', 'root'); -- should fail
+ERROR: must be owner of table part
+RESET SESSION AUTHORIZATION;
+ALTER TABLE root OWNER TO regress_test_not_me;
+ALTER TABLE part OWNER TO regress_test_me;
+SET SESSION AUTHORIZATION regress_test_me;
+SELECT pg_set_logical_root('part', 'root'); -- should also fail
+ERROR: must be owner of table root
+RESET SESSION AUTHORIZATION;
+DROP TABLE root, part;
+DROP ROLE regress_test_not_me;
+DROP ROLE regress_test_me;
+-- TODO: make sure existing logical descendant can't be ALTERed [NO] INHERIT
+-- Mixed publication settings for publish_via_partition_root, at different
+-- levels of the inheritance tree, to pin correct behavior in the worst cases.
+CREATE TABLE sch1.ipart1_a () INHERITS (sch1.ipart1);
+CREATE TABLE sch1.ipart1_a1 () INHERITS (sch1.ipart1_a);
+SELECT pg_set_logical_root('sch1.ipart1_a', 'sch1.ipart1');
+ pg_set_logical_root
+---------------------
+
+(1 row)
+
+SELECT pg_set_logical_root('sch1.ipart1_a1', 'sch1.ipart1_a');
+ pg_set_logical_root
+---------------------
+
+(1 row)
+
+CREATE PUBLICATION ipub_root FOR TABLE sch1.iroot;
+CREATE PUBLICATION ipub_part1 FOR TABLE ONLY sch1.ipart1, ONLY sch1.ipart1_a1
+ WITH (publish_via_partition_root);
+CREATE PUBLICATION ipub_other FOR TABLE ONLY sch1.iroot, ONLY sch1.ipart1_a,
+ ONLY sch1.ipart2
+ WITH (publish_via_partition_root);
+-- At this point, the published trees look like this:
+--
+-- ipub_root ipub_part1 (pubviaroot) ipub_other (pubviaroot)
+-- ------------------ ----------------------- -----------------------
+-- iroot iroot
+-- +- ipart1 ipart1 |
+-- | +- ipart1_a | +--- ipart1_a
+-- | +- ipart1_a1 +--- ipart1_a1 |
+-- +- ipart2 +- ipart2
+-- What a subscription to only ipub_root should see
+SELECT relid::regclass FROM pg_get_publication_tables('ipub_root');
+ relid
+----------------
+ sch1.iroot
+ sch1.ipart1
+ sch1.ipart2
+ sch1.ipart1_a
+ sch1.ipart1_a1
+(5 rows)
+
+SELECT * FROM published_sync('ipub_root');
+ published | synced
+----------------+----------------
+ sch1.iroot | sch1.iroot
+ sch1.ipart1 | sch1.ipart1
+ sch1.ipart2 | sch1.ipart2
+ sch1.ipart1_a | sch1.ipart1_a
+ sch1.ipart1_a1 | sch1.ipart1_a1
+(5 rows)
+
+SELECT * FROM published_stream('ipub_root');
+ pubname | published | synced
+-----------+----------------+----------------
+ ipub_root | sch1.iroot | sch1.iroot
+ ipub_root | sch1.ipart1 | sch1.ipart1
+ ipub_root | sch1.ipart2 | sch1.ipart2
+ ipub_root | sch1.ipart1_a | sch1.ipart1_a
+ ipub_root | sch1.ipart1_a1 | sch1.ipart1_a1
+(5 rows)
+
+-- What a subscription to only ipub_part1 should see
+SELECT relid::regclass FROM pg_get_publication_tables('ipub_part1');
+ relid
+-------------
+ sch1.ipart1
+(1 row)
+
+SELECT * FROM published_sync('ipub_part1');
+ published | synced
+-------------+----------------
+ sch1.ipart1 | sch1.ipart1
+ sch1.ipart1 | sch1.ipart1_a1
+(2 rows)
+
+SELECT * FROM published_stream('ipub_part1');
+ pubname | published | synced
+------------+-------------+----------------
+ ipub_part1 | sch1.ipart1 | sch1.ipart1
+ ipub_part1 | sch1.ipart1 | sch1.ipart1_a1
+(2 rows)
+
+-- What a subscription to both ipub_root and ipub_part1 should see
+SELECT pubname, relid::regclass
+ FROM pg_get_publication_tables('ipub_root', 'ipub_part1') t
+ JOIN pg_publication p ON (p.oid = t.pubid);
+ pubname | relid
+------------+---------------
+ ipub_root | sch1.iroot
+ ipub_root | sch1.ipart1
+ ipub_root | sch1.ipart2
+ ipub_root | sch1.ipart1_a
+ ipub_part1 | sch1.ipart1
+(5 rows)
+
+SELECT * FROM published_sync('ipub_root', 'ipub_part1');
+ published | synced
+---------------+----------------
+ sch1.iroot | sch1.iroot
+ sch1.ipart1 | sch1.ipart1
+ sch1.ipart1 | sch1.ipart1_a1
+ sch1.ipart2 | sch1.ipart2
+ sch1.ipart1_a | sch1.ipart1_a
+(5 rows)
+
+SELECT * FROM published_stream('ipub_root', 'ipub_part1');
+ pubname | published | synced
+------------+---------------+----------------
+ ipub_root | sch1.iroot | sch1.iroot
+ ipub_root | sch1.ipart1 | sch1.ipart1
+ ipub_root | sch1.ipart1 | sch1.ipart1_a1
+ ipub_root | sch1.ipart2 | sch1.ipart2
+ ipub_root | sch1.ipart1_a | sch1.ipart1_a
+ ipub_part1 | sch1.ipart1 | sch1.ipart1
+ ipub_part1 | sch1.ipart1 | sch1.ipart1_a1
+(7 rows)
+
+-- What a subscription to both ipub_part1 and ipub_other should see
+SELECT pubname, relid::regclass
+ FROM pg_get_publication_tables('ipub_part1', 'ipub_other') t
+ JOIN pg_publication p ON (p.oid = t.pubid);
+ pubname | relid
+------------+-------------
+ ipub_part1 | sch1.ipart1
+ ipub_other | sch1.iroot
+(2 rows)
+
+SELECT * FROM published_sync('ipub_part1', 'ipub_other');
+ published | synced
+-------------+----------------
+ sch1.iroot | sch1.iroot
+ sch1.iroot | sch1.ipart2
+ sch1.iroot | sch1.ipart1_a
+ sch1.ipart1 | sch1.ipart1
+ sch1.ipart1 | sch1.ipart1_a1
+(5 rows)
+
+SELECT * FROM published_stream('ipub_part1', 'ipub_other');
+ pubname | published | synced
+------------+-------------+----------------
+ ipub_part1 | sch1.ipart1 | sch1.ipart1
+ ipub_part1 | sch1.ipart1 | sch1.ipart1_a1
+ ipub_other | sch1.iroot | sch1.iroot
+ ipub_other | sch1.iroot | sch1.ipart2
+ ipub_other | sch1.iroot | sch1.ipart1_a
+(5 rows)
+
+-- What a subscription to all three should see
+SELECT pubname, relid::regclass
+ FROM pg_get_publication_tables('ipub_root', 'ipub_part1', 'ipub_other') t
+ JOIN pg_publication p ON (p.oid = t.pubid);
+ pubname | relid
+------------+-------------
+ ipub_root | sch1.iroot
+ ipub_root | sch1.ipart1
+ ipub_part1 | sch1.ipart1
+ ipub_other | sch1.iroot
+(4 rows)
+
+SELECT * FROM published_sync('ipub_root', 'ipub_part1', 'ipub_other');
+ published | synced
+-------------+----------------
+ sch1.iroot | sch1.iroot
+ sch1.iroot | sch1.ipart2
+ sch1.iroot | sch1.ipart1_a
+ sch1.ipart1 | sch1.ipart1
+ sch1.ipart1 | sch1.ipart1_a1
+(5 rows)
+
+SELECT * FROM published_stream('ipub_root', 'ipub_part1', 'ipub_other');
+ pubname | published | synced
+------------+-------------+----------------
+ ipub_root | sch1.iroot | sch1.iroot
+ ipub_root | sch1.iroot | sch1.ipart2
+ ipub_root | sch1.iroot | sch1.ipart1_a
+ ipub_root | sch1.ipart1 | sch1.ipart1
+ ipub_root | sch1.ipart1 | sch1.ipart1_a1
+ ipub_part1 | sch1.ipart1 | sch1.ipart1
+ ipub_part1 | sch1.ipart1 | sch1.ipart1_a1
+ ipub_other | sch1.iroot | sch1.iroot
+ ipub_other | sch1.iroot | sch1.ipart2
+ ipub_other | sch1.iroot | sch1.ipart1_a
+(10 rows)
+
+DROP PUBLICATION ipub_other;
+DROP PUBLICATION ipub_part1;
+DROP PUBLICATION ipub_root;
RESET client_min_messages;
DROP PUBLICATION pub;
DROP TABLE sch1.tbl1;
+DROP TABLE sch1.ipart1_a1;
+DROP TABLE sch1.ipart1_a;
+DROP TABLE sch1.ipart1;
+DROP TABLE sch1.ipart2;
+DROP TABLE sch1.iroot;
DROP SCHEMA sch1 cascade;
DROP SCHEMA sch2 cascade;
DROP FUNCTION published_stream;
+DROP FUNCTION published_sync;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_publication_user, regress_publication_user2;
DROP ROLE regress_publication_user_dummy;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 6bf9554d48..a498c53862 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -6,6 +6,18 @@ CREATE ROLE regress_publication_user2;
CREATE ROLE regress_publication_user_dummy LOGIN NOSUPERUSER;
SET SESSION AUTHORIZATION 'regress_publication_user';
+CREATE FUNCTION published_sync (VARIADIC pubnames text[])
+ RETURNS TABLE (published regclass, synced regclass) AS $$
+ -- Show each published table alongside the tables to be copied into it during
+ -- initial sync.
+ SELECT t.published, synced
+ FROM (SELECT DISTINCT relid::regclass AS published
+ FROM pg_get_publication_tables(VARIADIC pubnames)) t
+ JOIN LATERAL pg_get_publication_rels_to_sync(t.published, VARIADIC pubnames) synced
+ ON true
+ ORDER BY 1, 2;
+$$ LANGUAGE sql;
+
CREATE FUNCTION published_stream (VARIADIC pubnames text[])
RETURNS TABLE (pubname text, published regclass, synced regclass) AS $$
-- For each publication, show each published root alongside the tables which
@@ -1076,6 +1088,7 @@ SELECT * FROM pg_publication_tables;
-- Table publication that includes both the parent table and the child table
ALTER PUBLICATION pub ADD TABLE sch1.tbl1;
SELECT * FROM pg_publication_tables;
+SELECT * FROM published_sync('pub');
SELECT * FROM published_stream('pub');
DROP PUBLICATION pub;
@@ -1091,6 +1104,7 @@ SELECT * FROM pg_publication_tables;
-- Table publication that includes both the parent table and the child table
ALTER PUBLICATION pub ADD TABLE sch1.tbl1;
SELECT * FROM pg_publication_tables;
+SELECT * FROM published_sync('pub');
SELECT * FROM published_stream('pub');
DROP PUBLICATION pub;
@@ -1105,12 +1119,119 @@ ALTER TABLE sch1.tbl1 ATTACH PARTITION sch1.tbl1_part3 FOR VALUES FROM (20) to (
CREATE PUBLICATION pub FOR TABLES IN SCHEMA sch1 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
SELECT * FROM pg_publication_tables;
+-- Sanity check cases for pg_set_logical_root().
+CREATE TABLE sch1.iroot (a int);
+CREATE TABLE sch1.ipart1 (a int);
+CREATE TABLE sch1.ipart2 () INHERITS (sch1.iroot);
+
+-- marking roots between unrelated tables is not allowed
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.tbl1');
+
+-- establishing an inheritance relationship fixes the problem
+ALTER TABLE sch1.ipart1 INHERIT sch1.iroot;
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+
+-- but multiple inheritance is not allowed
+ALTER TABLE sch1.ipart2 INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+
+ALTER TABLE sch1.ipart2 NO INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+
+-- table ownership must match, like ATTACH PARTITION
+CREATE ROLE regress_test_me;
+CREATE ROLE regress_test_not_me;
+CREATE TABLE root (a int);
+CREATE TABLE part () INHERITS (root);
+ALTER TABLE root OWNER TO regress_test_me;
+ALTER TABLE part OWNER TO regress_test_not_me;
+SET SESSION AUTHORIZATION regress_test_me;
+SELECT pg_set_logical_root('part', 'root'); -- should fail
+RESET SESSION AUTHORIZATION;
+ALTER TABLE root OWNER TO regress_test_not_me;
+ALTER TABLE part OWNER TO regress_test_me;
+SET SESSION AUTHORIZATION regress_test_me;
+SELECT pg_set_logical_root('part', 'root'); -- should also fail
+RESET SESSION AUTHORIZATION;
+DROP TABLE root, part;
+DROP ROLE regress_test_not_me;
+DROP ROLE regress_test_me;
+
+-- TODO: make sure existing logical descendant can't be ALTERed [NO] INHERIT
+
+-- Mixed publication settings for publish_via_partition_root, at different
+-- levels of the inheritance tree, to pin correct behavior in the worst cases.
+CREATE TABLE sch1.ipart1_a () INHERITS (sch1.ipart1);
+CREATE TABLE sch1.ipart1_a1 () INHERITS (sch1.ipart1_a);
+
+SELECT pg_set_logical_root('sch1.ipart1_a', 'sch1.ipart1');
+SELECT pg_set_logical_root('sch1.ipart1_a1', 'sch1.ipart1_a');
+
+CREATE PUBLICATION ipub_root FOR TABLE sch1.iroot;
+CREATE PUBLICATION ipub_part1 FOR TABLE ONLY sch1.ipart1, ONLY sch1.ipart1_a1
+ WITH (publish_via_partition_root);
+CREATE PUBLICATION ipub_other FOR TABLE ONLY sch1.iroot, ONLY sch1.ipart1_a,
+ ONLY sch1.ipart2
+ WITH (publish_via_partition_root);
+
+-- At this point, the published trees look like this:
+--
+-- ipub_root ipub_part1 (pubviaroot) ipub_other (pubviaroot)
+-- ------------------ ----------------------- -----------------------
+-- iroot iroot
+-- +- ipart1 ipart1 |
+-- | +- ipart1_a | +--- ipart1_a
+-- | +- ipart1_a1 +--- ipart1_a1 |
+-- +- ipart2 +- ipart2
+
+-- What a subscription to only ipub_root should see
+SELECT relid::regclass FROM pg_get_publication_tables('ipub_root');
+SELECT * FROM published_sync('ipub_root');
+SELECT * FROM published_stream('ipub_root');
+
+-- What a subscription to only ipub_part1 should see
+SELECT relid::regclass FROM pg_get_publication_tables('ipub_part1');
+SELECT * FROM published_sync('ipub_part1');
+SELECT * FROM published_stream('ipub_part1');
+
+-- What a subscription to both ipub_root and ipub_part1 should see
+SELECT pubname, relid::regclass
+ FROM pg_get_publication_tables('ipub_root', 'ipub_part1') t
+ JOIN pg_publication p ON (p.oid = t.pubid);
+SELECT * FROM published_sync('ipub_root', 'ipub_part1');
+SELECT * FROM published_stream('ipub_root', 'ipub_part1');
+
+-- What a subscription to both ipub_part1 and ipub_other should see
+SELECT pubname, relid::regclass
+ FROM pg_get_publication_tables('ipub_part1', 'ipub_other') t
+ JOIN pg_publication p ON (p.oid = t.pubid);
+SELECT * FROM published_sync('ipub_part1', 'ipub_other');
+SELECT * FROM published_stream('ipub_part1', 'ipub_other');
+
+-- What a subscription to all three should see
+SELECT pubname, relid::regclass
+ FROM pg_get_publication_tables('ipub_root', 'ipub_part1', 'ipub_other') t
+ JOIN pg_publication p ON (p.oid = t.pubid);
+SELECT * FROM published_sync('ipub_root', 'ipub_part1', 'ipub_other');
+SELECT * FROM published_stream('ipub_root', 'ipub_part1', 'ipub_other');
+
+DROP PUBLICATION ipub_other;
+DROP PUBLICATION ipub_part1;
+DROP PUBLICATION ipub_root;
+
RESET client_min_messages;
DROP PUBLICATION pub;
DROP TABLE sch1.tbl1;
+DROP TABLE sch1.ipart1_a1;
+DROP TABLE sch1.ipart1_a;
+DROP TABLE sch1.ipart1;
+DROP TABLE sch1.ipart2;
+DROP TABLE sch1.iroot;
DROP SCHEMA sch1 cascade;
DROP SCHEMA sch2 cascade;
DROP FUNCTION published_stream;
+DROP FUNCTION published_sync;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_publication_user, regress_publication_user2;
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 275fb3b525..bc40c68030 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -388,6 +388,59 @@ $node_subscriber1->append_conf('postgresql.conf',
"log_min_messages = warning");
$node_subscriber1->reload;
+# Make sure standard inheritance setups aren't broken by the new
+# pg_set_logical_root() handling.
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1_1 (LIKE itab1)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1_2 (LIKE itab1)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1_1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1_2 (CHECK (a = 2)) INHERITS (itab1)");
+
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab1_1', 'itab1')");
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab1_2', 'itab1')");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (0, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1_1 VALUES (1, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1_2 VALUES (2, 'itab1')");
+
+# Regression: Create a publication for an unrelated table and set
+# publish_via_partition_root. This should have no effect at all, since itab1
+# isn't supposed to be using this publication...
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION dummy_pub FOR TABLE tab1 WITH (publish_via_partition_root = true)");
+$node_subscriber2->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub2 ADD PUBLICATION dummy_pub");
+$node_subscriber2->wait_for_subscription_sync;
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1");
+is($result, qq(0|itab1), 'initial data synced for itab1 on subscriber 2');
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1_1");
+is($result, qq(1|itab1), 'initial data synced for itab1_1 on subscriber 2');
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1_2");
+is($result, qq(2|itab1), 'initial data synced for itab1_2 on subscriber 2');
+
+$node_publisher->safe_psql('postgres',
+ "DROP TABLE itab1 CASCADE;");
+$node_subscriber2->safe_psql('postgres',
+ "DROP TABLE itab1 CASCADE;");
+
+$node_subscriber2->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub2 DROP PUBLICATION dummy_pub");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION dummy_pub");
+
# Tests for replication using root table identity and schema
# publisher
@@ -886,4 +939,263 @@ $result = $node_subscriber2->safe_psql('postgres',
"SELECT a, b, c FROM tab5_1 ORDER BY 1");
is($result, qq(4||1), 'updates of tab5 replicated correctly');
+# Test that replication works for older inheritance/trigger setups as well.
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1_1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab1_2 (CHECK (a = 2)) INHERITS (itab1)");
+
+$node_publisher->safe_psql('postgres', "
+ CREATE OR REPLACE FUNCTION itab1_trigger()
+ RETURNS TRIGGER AS \$\$
+ BEGIN
+ IF ( NEW.a = 1 ) THEN INSERT INTO itab1_1 VALUES (NEW.*);
+ ELSIF ( NEW.a = 2 ) THEN INSERT INTO itab1_2 VALUES (NEW.*);
+ ELSE RETURN NEW;
+ END IF;
+ RETURN NULL;
+ END;
+ \$\$
+ LANGUAGE plpgsql;");
+$node_publisher->safe_psql('postgres', "
+ CREATE TRIGGER itab1_trigger
+ BEFORE INSERT ON itab1
+ FOR EACH ROW EXECUTE FUNCTION itab1_trigger();");
+
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab1_1', 'itab1')");
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab1_2', 'itab1')");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab2 (a int, b text)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab2_1 (CHECK (a = 1)) INHERITS (itab2)");
+
+$node_publisher->safe_psql('postgres', "
+ CREATE OR REPLACE FUNCTION itab2_trigger()
+ RETURNS TRIGGER AS \$\$
+ BEGIN
+ IF ( NEW.a = 1 ) THEN INSERT INTO itab2_1 VALUES (NEW.*);
+ ELSE RETURN NEW;
+ END IF;
+ RETURN NULL;
+ END;
+ \$\$
+ LANGUAGE plpgsql;");
+$node_publisher->safe_psql('postgres', "
+ CREATE TRIGGER itab2_trigger
+ BEFORE INSERT ON itab2
+ FOR EACH ROW EXECUTE FUNCTION itab2_trigger();");
+
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab2_1', 'itab2')");
+
+# itab2_1 should be published using its own identity here, since its parent is
+# not included. itab1_1 should be published via its parent, itab1, without
+# duplicating the rows.
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION pub_viaroot ADD TABLE itab1, itab1_1, itab2_1");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (0, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (1, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (2, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab2 VALUES (0, 'itab2')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab2 VALUES (1, 'itab2')");
+
+# Subscriber 1 only subscribes to some of the partitions, and does not set up
+# partition triggers, to check for the correct routing.
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE itab1_1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE itab2_1 (a int, b text)");
+
+# Subscriber 2 has different partition names for itab1, and it doesn't partition
+# itab2 at all.
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1 (a int, b text)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1_part1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab1_part2 (CHECK (a = 2)) INHERITS (itab1)");
+
+$node_subscriber2->safe_psql('postgres', "
+ CREATE OR REPLACE FUNCTION itab_trigger()
+ RETURNS TRIGGER AS \$\$
+ BEGIN
+ IF ( NEW.a = 1 ) THEN INSERT INTO public.itab1_part1 VALUES (NEW.*);
+ ELSIF ( NEW.a = 2 ) THEN INSERT INTO public.itab1_part2 VALUES (NEW.*);
+ ELSE RETURN NEW;
+ END IF;
+ RETURN NULL;
+ END;
+ \$\$
+ LANGUAGE plpgsql;");
+$node_subscriber2->safe_psql('postgres', "
+ CREATE TRIGGER itab_trigger
+ BEFORE INSERT ON itab1
+ FOR EACH ROW EXECUTE FUNCTION itab_trigger();");
+$node_subscriber2->safe_psql('postgres', "
+ ALTER TABLE itab1 ENABLE ALWAYS TRIGGER itab_trigger;");
+
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab2 (a int, b text)");
+
+$node_subscriber1->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub_viaroot REFRESH PUBLICATION");
+$node_subscriber2->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+
+$node_subscriber1->wait_for_subscription_sync;
+$node_subscriber2->wait_for_subscription_sync;
+
+# check that data is synced correctly
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+2|itab1), 'initial data synced for itab1 on subscriber 1');
+
+# all of the data should have been routed to itab1 directly (there are no
+# triggers on subscriber 1 to move it elsewhere)
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+2|itab1), 'initial data correctly routed for itab1 on subscriber 1');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM itab2_1 ORDER BY 1, 2");
+is($result, qq(1|itab2), 'initial data synced for itab2_1 on subscriber 1');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+2|itab1), 'initial data synced for itab1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1), 'initial data correctly routed for itab1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab2 ORDER BY 1, 2");
+is($result, qq(0|itab2
+1|itab2), 'initial data synced for itab2 on subscriber 2');
+
+# make sure new data is also correctly routed to the roots
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (1, 'itab1-new')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab2 VALUES (1, 'itab2-new')");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+1|itab1-new
+2|itab1), 'new data routed for itab1 on subscriber 1');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a, b FROM itab2_1 ORDER BY 1, 2");
+is($result, qq(1|itab2
+1|itab2-new), 'new data routed for itab2_1 on subscriber 1');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+1|itab1-new
+2|itab1), 'new data routed for itab1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab1_part1 ORDER BY 1, 2");
+is($result, qq(1|itab1
+1|itab1-new), 'new data moved to itab1_part1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM itab2 ORDER BY 1, 2");
+is($result, qq(0|itab2
+1|itab2
+1|itab2-new), 'new data routed for itab2 on subscriber 2');
+
+# Finally, check the effect of mixed publish_via_partition_root settings on
+# logical roots.
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab3 (a int, b text)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab3_1 () INHERITS (itab3)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE itab3_1_1 () INHERITS (itab3_1)");
+
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab3_1', 'itab3')");
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_set_logical_root('itab3_1_1', 'itab3_1')");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO itab3 VALUES (1, 'itab3')");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO itab3_1 VALUES (2, 'itab3_1')");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO itab3_1_1 VALUES (3, 'itab3_1_1')");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION ipub3 FOR TABLE itab3 *");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION ipub3_1 FOR TABLE itab3_1 * WITH (publish_via_partition_root = true)");
+
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab3 (a int, b text)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab3_1 () INHERITS (itab3)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE itab3_1_1 () INHERITS (itab3_1)");
+
+$node_subscriber2->safe_psql('postgres',
+ "CREATE SUBSCRIPTION mixed CONNECTION '$publisher_connstr' PUBLICATION ipub3, ipub3_1");
+$node_subscriber2->wait_for_subscription_sync;
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab3");
+is($result, qq(1|itab3), 'initial data routed for itab3 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab3_1 ORDER BY 1, 2");
+is($result, qq(2|itab3_1
+3|itab3_1_1), 'initial data routed for itab3_1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab3_1_1");
+is($result, qq(), 'initial data routed for itab3_1_1 on subscriber 2');
+
+# make sure new data is also correctly routed to the roots
+$node_publisher->safe_psql('postgres', "INSERT INTO itab3 VALUES (4, 'itab3-new')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab3_1 VALUES (4, 'itab3_1-new')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab3_1_1 VALUES (4, 'itab3_1_1-new')");
+
+$node_publisher->wait_for_catchup('mixed');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab3 ORDER BY 1, 2");
+is($result, qq(1|itab3
+4|itab3-new), 'new data routed for itab3 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab3_1 ORDER BY 1, 2");
+is($result, qq(2|itab3_1
+3|itab3_1_1
+4|itab3_1-new
+4|itab3_1_1-new), 'new data routed for itab3_1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM ONLY itab3_1_1");
+is($result, qq(), 'new data routed for itab3_1_1 on subscriber 2');
+
done_testing();
--
2.25.1