On Thu, Oct 28, 2021 at 8:12 AM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Mon, Oct 25, 2021 at 3:09 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> >
> > On Mon, Oct 25, 2021 at 1:11 PM vignesh C <vignes...@gmail.com> wrote:
> > >
> > > I have fixed this in the v47 version attached.
> > >
> >
> > Thanks, the first patch in the series "Allow publishing the tables of
> > schema." looks good to me. Unless there are more
> > comments/bugs/objections, I am planning to commit it in a day or so.
> >
>
> Yesterday, I have pushed the first patch. Feel free to submit the
> remaining patches.

Thanks for committing the patch, please find the remaining patches attached.
Thanks Hou Zhijie and Greg Nancarrow for sharing a few comments
offline, I have fixed those in the attached patch.

Regards,
Vignesh
From fa0fe25d1143575bdf31489bc8d1c88aa1ea362b Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Thu, 21 Oct 2021 14:25:24 +0530
Subject: [PATCH v48 1/2] Add tap tests for the schema publication feature of
 logical replication

Add tap tests for the schema publication feature of logical replication

Author: Vignesh C, Tang Haiying
Reviewed-by: Amit Kapila, Hou Zhijie, Greg Nancarrow, Masahiko Sawada, Peter Eisentraut, Tom Lane, Peter Smith, Ajin Cherian, Rahila Syed, Bharath Rupireddy, Mark Dilger
Tested-by: Tang Haiying
Discussion: https://www.postgresql.org/message-id/CALDaNm0OANxuJ6RXqwZsM1MSY4s19nuH3734j4a72etDwvBETQ%40mail.gmail.com
---
 .../t/025_rep_changes_for_schema.pl           | 205 ++++++++++++++++++
 1 file changed, 205 insertions(+)
 create mode 100644 src/test/subscription/t/025_rep_changes_for_schema.pl

diff --git a/src/test/subscription/t/025_rep_changes_for_schema.pl b/src/test/subscription/t/025_rep_changes_for_schema.pl
new file mode 100644
index 0000000000..8f14da06a7
--- /dev/null
+++ b/src/test/subscription/t/025_rep_changes_for_schema.pl
@@ -0,0 +1,205 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Logical replication tests for schema publications
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 13;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Test replication with publications created using FOR ALL TABLES IN SCHEMA
+# option.
+# Create schemas and tables on publisher
+$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE sch1.tab1 AS SELECT generate_series(1,10) AS a");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE sch1.tab2 AS SELECT generate_series(1,10) AS a");
+$node_publisher->safe_psql('postgres',
+        "CREATE TABLE sch1.tab1_parent (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+        "CREATE TABLE public.tab1_child1 PARTITION OF sch1.tab1_parent FOR VALUES IN (1, 2, 3)");
+$node_publisher->safe_psql('postgres',
+        "CREATE TABLE public.tab1_child2 PARTITION OF sch1.tab1_parent FOR VALUES IN (4, 5, 6)");
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO sch1.tab1_parent values (1),(4)");
+
+# Create schemas and tables on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.tab1 (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.tab2 (a int)");
+$node_subscriber->safe_psql('postgres',
+        "CREATE TABLE sch1.tab1_parent (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+        "CREATE TABLE public.tab1_child1 PARTITION OF sch1.tab1_parent FOR VALUES IN (1, 2, 3)");
+$node_subscriber->safe_psql('postgres',
+        "CREATE TABLE public.tab1_child2 PARTITION OF sch1.tab1_parent FOR VALUES IN (4, 5, 6)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_schema FOR ALL TABLES IN SCHEMA sch1");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema"
+);
+
+$node_publisher->wait_for_catchup('tap_sub_schema');
+
+# Also wait for initial table sync to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check the schema table data is synced up
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM sch1.tab1");
+is($result, qq(10|1|10), 'check rows on subscriber catchup');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM sch1.tab2");
+is($result, qq(10|1|10), 'check rows on subscriber catchup');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM sch1.tab1_parent order by 1");
+is($result, qq(1|
+4|), 'check rows on subscriber catchup');
+
+# Insert some data into few tables and verify that inserted data is replicated
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO sch1.tab1 VALUES(generate_series(11,20))");
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO sch1.tab1_parent values (2),(5)");
+
+$node_publisher->wait_for_catchup('tap_sub_schema');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM sch1.tab1");
+is($result, qq(20|1|20), 'check replicated inserts on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM sch1.tab1_parent order by 1");
+is($result, qq(1|
+2|
+4|
+5|), 'check replicated inserts on subscriber');
+
+# Create new table in the publication schema, verify that subscriber does not get
+# the new table data before refresh.
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE sch1.tab3 AS SELECT generate_series(1,10) AS a");
+
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.tab3(a int)");
+
+$node_publisher->wait_for_catchup('tap_sub_schema');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM sch1.tab3");
+is($result, qq(0), 'check replicated inserts on subscriber');
+
+# Table data should be reflected after refreshing the publication in
+# subscriber.
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
+
+# Wait for sync to finish
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher->safe_psql('postgres', "INSERT INTO sch1.tab3 VALUES(11)");
+
+$node_publisher->wait_for_catchup('tap_sub_schema');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM sch1.tab3");
+is($result, qq(11|1|11), 'check rows on subscriber catchup');
+
+# Set the schema of a publication schema table to a non publication schema and
+# verify that inserted data is not reflected by the subscriber.
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE sch1.tab3 SET SCHEMA public");
+$node_publisher->safe_psql('postgres', "INSERT INTO public.tab3 VALUES(12)");
+
+$node_publisher->wait_for_catchup('tap_sub_schema');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM sch1.tab3");
+is($result, qq(11|1|11), 'check replicated inserts on subscriber');
+
+# Verify that the subscription relation list is updated after refresh
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
+);
+is($result, qq(5),
+	'check subscription relation status is not yet dropped on subscriber');
+
+# Ask for data sync
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
+
+# Wait for sync to finish
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
+);
+is($result, qq(4),
+	'check subscription relation status was dropped on subscriber');
+
+# Drop table from the publication schema, verify that subscriber removes the
+# table entry after refresh.
+$node_publisher->safe_psql('postgres', "DROP TABLE sch1.tab2");
+$node_publisher->wait_for_catchup('tap_sub_schema');
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
+);
+is($result, qq(4),
+	'check subscription relation status is not yet dropped on subscriber');
+
+# Table should be removed from pg_subscription_rel after refreshing the
+# publication in subscriber.
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
+
+# Wait for sync to finish
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
+);
+is($result, qq(3),
+	'check subscription relation status was dropped on subscriber');
+
+# Drop schema from publication, verify that the inserts are not published after
+# dropping the schema from publication. Here 2nd insert should not be
+# published.
+$node_publisher->safe_psql('postgres', "
+	INSERT INTO sch1.tab1 VALUES(21);
+	ALTER PUBLICATION tap_pub_schema DROP ALL TABLES IN SCHEMA sch1;
+	INSERT INTO sch1.tab1 values(22);"
+);
+
+$node_publisher->wait_for_catchup('tap_sub_schema');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM sch1.tab1");
+is($result, qq(21|1|21), 'check replicated inserts on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.30.2

From 9d07432ddff9f2259f9a2b945fcdf5e3aaa03f71 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Tue, 31 Aug 2021 18:25:11 +0530
Subject: [PATCH v48 2/2] Add new "pg_publication_objects" view to display
 "TABLE"/"SCHEMA" publication objects

A new "pg_publication_objects" view is added, to display table/schema object
information associated with publications.

Author: Vignesh C
Reviewed-by: Amit Kapila, Hou Zhijie, Greg Nancarrow, Masahiko Sawada, Peter Eisentraut, Tom Lane, Peter Smith, Ajin Cherian, Rahila Syed, Bharath Rupireddy, Mark Dilger
Tested-by: Tang Haiying
Discussion: https://www.postgresql.org/message-id/CALDaNm0OANxuJ6RXqwZsM1MSY4s19nuH3734j4a72etDwvBETQ%40mail.gmail.com
---
 doc/src/sgml/catalogs.sgml           | 70 ++++++++++++++++++++++++++++
 src/backend/catalog/system_views.sql | 19 ++++++++
 src/test/regress/expected/rules.out  | 15 ++++++
 3 files changed, 104 insertions(+)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1d11be73f..45fb28c14a 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9503,6 +9503,11 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       <entry>publications and their associated tables</entry>
      </row>
 
+     <row>
+      <entry><link linkend="view-pg-publication-objects"><structname>pg_publication_objects</structname></link></entry>
+      <entry>publications and their associated objects</entry>
+     </row>
+
      <row>
       <entry><link linkend="view-pg-replication-origin-status"><structname>pg_replication_origin_status</structname></link></entry>
       <entry>information about replication origins, including replication progress</entry>
@@ -11333,6 +11338,71 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
  </sect1>
 
+<sect1 id="view-pg-publication-objects">
+  <title><structname>pg_publication_objects</structname></title>
+
+  <indexterm zone="view-pg-publication-objects">
+   <primary>pg_publication_objects</primary>
+  </indexterm>
+
+  <para>
+   The view <structname>pg_publication_objects</structname> provides
+   information about the mapping between publications and the objects they
+   contain.  Unlike the underlying catalog
+   <link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>,
+   this view expands publications defined as <literal>FOR TABLE</literal>
+   and <literal>FOR ALL TABLES IN SCHEMA</literal>, so for such publications
+   there will be a row for each eligible object.
+  </para>
+
+  <table>
+   <title><structname>pg_publication_objects</structname> Columns</title>
+   <tgroup cols="1">
+    <thead>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       Column Type
+      </para>
+      <para>
+       Description
+      </para></entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>pubname</structfield> <type>name</type>
+       (references <link linkend="catalog-pg-publication"><structname>pg_publication</structname></link>.<structfield>pubname</structfield>)
+      </para>
+      <para>
+       Name of publication
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>objname</structfield> <type>name</type>
+       (references <link linkend="catalog-pg-namespace"><structname>pg_namespace</structname></link>.<structfield>nspname</structfield> or <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>relname</structfield>)
+      </para>
+      <para>
+       Name of schema or Name of table
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>objtype</structfield> <type>name</type>
+      </para>
+      <para>
+       The object type: <literal>schema</literal> or <literal>table</literal>
+      </para></entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+ </sect1>
+ 
  <sect1 id="view-pg-publication-tables">
   <title><structname>pg_publication_tables</structname></title>
 
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index eb560955cd..9be1ea8487 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -362,6 +362,25 @@ CREATE VIEW pg_stats_ext_exprs WITH (security_barrier) AS
 -- unprivileged users may read pg_statistic_ext but not pg_statistic_ext_data
 REVOKE ALL ON pg_statistic_ext_data FROM public;
 
+CREATE VIEW pg_publication_objects AS
+SELECT
+    P.pubname,
+    N.nspname AS objname,
+    'schema'::text AS objtype
+FROM pg_catalog.pg_publication P
+    JOIN pg_catalog.pg_publication_namespace S ON P.oid = S.pnpubid
+    JOIN pg_catalog.pg_namespace N on N.oid = S.pnnspid
+UNION
+SELECT
+    P.pubname,
+    quote_ident(N.nspname) || '.' || quote_ident(C.relname) AS objname,
+    'table'::text AS objtype
+FROM pg_catalog.pg_publication P
+    JOIN pg_catalog.pg_publication_rel R ON P.oid = R.prpubid
+    JOIN pg_catalog.pg_class C ON C.oid = R.prrelid
+    JOIN pg_catalog.pg_namespace N ON N.oid = C.relnamespace
+ORDER BY pubname;
+
 CREATE VIEW pg_publication_tables AS
     SELECT
         P.pubname AS pubname,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 2fa00a3c29..8796f71de2 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,6 +1451,21 @@ pg_prepared_xacts| SELECT p.transaction,
    FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
      LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
      LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_publication_objects| SELECT p.pubname,
+    n.nspname AS objname,
+    'schema'::text AS objtype
+   FROM ((pg_publication p
+     JOIN pg_publication_namespace s ON ((p.oid = s.pnpubid)))
+     JOIN pg_namespace n ON ((n.oid = s.pnnspid)))
+UNION
+ SELECT p.pubname,
+    ((quote_ident((n.nspname)::text) || '.'::text) || quote_ident((c.relname)::text)) AS objname,
+    'table'::text AS objtype
+   FROM (((pg_publication p
+     JOIN pg_publication_rel r ON ((p.oid = r.prpubid)))
+     JOIN pg_class c ON ((c.oid = r.prrelid)))
+     JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
+  ORDER BY 1;
 pg_publication_tables| SELECT p.pubname,
     n.nspname AS schemaname,
     c.relname AS tablename
-- 
2.30.2

Reply via email to