Further review (based on 20220310 patch):
doc/src/sgml/ref/create_publication.sgml | 3 +
For the clauses added to the synopsis, descriptions should be added
below. See attached patch for a start.
src/backend/commands/sequence.c | 79 ++
There is quite a bit of overlap between ResetSequence() and
ResetSequence2(), but I couldn't see a good way to combine them that
genuinely saves code and complexity. So maybe it's ok.
Actually, ResetSequence2() is not really "reset", it's just "set".
Maybe pick a different function name.
src/backend/commands/subscriptioncmds.c | 272 +++++++
The code added in AlterSubscription_refresh() seems to be entirely
copy-and-paste from the tables case. I think this could be combined
by concatenating the lists from fetch_table_list() and
fetch_sequence_list() and looping over it once. The same also applies
to CreateSubscription(), although the code duplication is smaller
there.
This in turn means that fetch_table_list() and fetch_sequence_list()
can be combined, so that you don't actually need any extensive new
code in CreateSubscription() and AlterSubscription_refresh() for
sequences. This could go on, you can combine more of the underlying
code, like pg_publication_tables and pg_publication_sequences and so
on.
src/backend/replication/logical/proto.c | 52 ++
The documentation of the added protocol message needs to be added to
the documentation. See attached patch for a start.
The sequence message does not contain the sequence Oid, unlike the
relation message. Would that be good to add?
src/backend/replication/logical/worker.c | 56 ++
Maybe the Asserts in apply_handle_sequence() should be elogs. These
are checking what is sent over the network, so we don't want a
bad/evil peer able to trigger asserts. And in non-assert builds these
conditions would be unchecked.
src/backend/replication/pgoutput/pgoutput.c | 82 +-
I find the the in get_rel_sync_entry() confusing. You add a section for
if (!publish && is_sequence)
but then shouldn't the code below that be something like
if (!publish && !is_sequence)
src/bin/pg_dump/t/002_pg_dump.pl | 38 +-
This adds a new publication "pub4", but the tests already contain a
"pub4". I'm not sure why this even works, but perhaps the new one
shold be "pub5", unless there is a deeper meaning.
src/include/catalog/pg_publication_namespace.h | 3 +-
I don't like how the distinction between table and sequence is done
using a bool field. That affects also the APIs in pg_publication.c
and publicationcmds.c especially. There is a lot of unadorned "true"
and "false" being passed around that isn't very clear, and it all
appears to originate at this catalog. I think we could use a char
field here that uses the relkind constants. That would also make the
code in pg_publication.c etc. slightly clearer.
See attached patch for more small tweaks.
Your patch still contains a number of XXX and FIXME comments, which in
my assessment are all more or less correct, so I didn't comment on those
separately.
Other than that, this seems pretty good.
Earlier in the thread I commented on some aspects of the new grammar
(e.g., do we need FOR ALL SEQUENCES?). I think this would be useful to
review again after all the new logical replication patches are in. I
don't want to hold up this patch for that at this point.From bdded82050841d3b71308ce82110efd21d99ea53 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Sun, 13 Mar 2022 07:38:46 +0100
Subject: [PATCH] fixup! Add support for decoding sequences to built-in
replication
---
doc/src/sgml/protocol.sgml | 119 ++++++++++++++++++++++
doc/src/sgml/ref/alter_publication.sgml | 2 +-
doc/src/sgml/ref/create_publication.sgml | 42 +++++---
src/backend/catalog/pg_publication.c | 8 +-
src/backend/commands/subscriptioncmds.c | 2 +-
src/backend/parser/gram.y | 14 ---
src/backend/replication/logical/worker.c | 2 +-
src/bin/pg_dump/pg_dump.c | 6 +-
src/test/regress/expected/publication.out | 2 +-
src/test/regress/sql/object_address.sql | 1 +
src/test/regress/sql/publication.sql | 2 +-
src/test/subscription/t/029_sequences.pl | 14 +--
12 files changed, 165 insertions(+), 49 deletions(-)
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 9178c779ba..49c05e1866 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -7055,6 +7055,125 @@ <title>Logical Replication Message Formats</title>
</listitem>
</varlistentry>
+<varlistentry id="protocol-logicalrep-message-formats-Sequence">
+<term>
+Sequence
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+ Byte1('X')
+</term>
+<listitem>
+<para>
+ Identifies the message as a sequence message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Int32 (TransactionId)
+</term>
+<listitem>
+<para>
+ Xid of the transaction (only present for streamed transactions).
+ This field is available since protocol version 2.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Int8(0)
+</term>
+<listitem>
+<para>
+ Flags; currently unused.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Int64 (XLogRecPtr)
+</term>
+<listitem>
+<para>
+ The LSN of FIXME.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ String
+</term>
+<listitem>
+<para>
+ Namespace (empty string for <literal>pg_catalog</literal>).
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ String
+</term>
+<listitem>
+<para>
+ Relation name.
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+ Int8
+</term>
+<listitem>
+<para>
+ 1 if the sequence update is transactions, 0 otherwise.
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+ Int64
+</term>
+<listitem>
+<para>
+ <structfield>last_value</structfield> value of the sequence.
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+ Int64
+</term>
+<listitem>
+<para>
+ <structfield>log_cnt</structfield> value of the sequence.
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+ Int8
+</term>
+<listitem>
+<para>
+ <structfield>is_called</structfield> value of the sequence.
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+</para>
+</listitem>
+</varlistentry>
+
<varlistentry id="protocol-logicalrep-message-formats-Type">
<term>
Type
diff --git a/doc/src/sgml/ref/alter_publication.sgml
b/doc/src/sgml/ref/alter_publication.sgml
index 36c9a5f438..5dacb732b6 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -31,7 +31,7 @@
<phrase>where <replaceable class="parameter">publication_object</replaceable>
is one of:</phrase>
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ *
] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
- SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ]
[, ... ]
+ SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [, ...
]
ALL TABLES IN SCHEMA { <replaceable
class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
ALL SEQUENCES IN SCHEMA { <replaceable
class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
</synopsis>
diff --git a/doc/src/sgml/ref/create_publication.sgml
b/doc/src/sgml/ref/create_publication.sgml
index f72318e97d..286529e749 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -66,6 +66,20 @@ <title>Parameters</title>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>FOR SEQUENCE</literal></term>
+ <listitem>
+ <para>
+ Specifies a list of sequences to add to the publication.
+ </para>
+
+ <para>
+ Specifying a sequence that is part of a schema specified by <literal>FOR
+ ALL SEQUENCES IN SCHEMA</literal> is not supported.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>FOR TABLE</literal></term>
<listitem>
@@ -111,26 +125,28 @@ <title>Parameters</title>
</varlistentry>
<varlistentry>
+ <term><literal>FOR ALL SEQUENCES</literal></term>
<term><literal>FOR ALL TABLES</literal></term>
<listitem>
<para>
- Marks the publication as one that replicates changes for all tables in
- the database, including tables created in the future.
+ Marks the publication as one that replicates changes for all
sequences/tables in
+ the database, including sequences/tables created in the future.
</para>
</listitem>
</varlistentry>
<varlistentry>
+ <term><literal>FOR ALL SEQUENCES IN SCHEMA</literal></term>
<term><literal>FOR ALL TABLES IN SCHEMA</literal></term>
<listitem>
<para>
- Marks the publication as one that replicates changes for all tables in
- the specified list of schemas, including tables created in the future.
+ Marks the publication as one that replicates changes for all
sequences/tables in
+ the specified list of schemas, including sequences/tables created in the
future.
</para>
<para>
- Specifying a schema along with a table which belongs to the specified
- schema using <literal>FOR TABLE</literal> is not supported.
+ Specifying a schema along with a sequence/table which belongs to the
specified
+ schema using <literal>FOR SEQUENCE</literal>/<literal>FOR
TABLE</literal> is not supported.
</para>
<para>
@@ -205,10 +221,9 @@ <title>Parameters</title>
<title>Notes</title>
<para>
- If <literal>FOR TABLE</literal>, <literal>FOR ALL TABLES</literal> or
- <literal>FOR ALL TABLES IN SCHEMA</literal> are not specified, then the
- publication starts out with an empty set of tables. That is useful if
- tables or schemas are to be added later.
+ If <literal>FOR TABLE</literal>, <literal>FOR SEQUENCE</literal>, etc. is
+ not specified, then the publication starts out with an empty set of tables
+ and sequences. That is useful if objects are to be added later.
</para>
<para>
@@ -223,10 +238,9 @@ <title>Notes</title>
</para>
<para>
- To add a table to a publication, the invoking user must have ownership
- rights on the table. The <command>FOR ALL TABLES</command> and
- <command>FOR ALL TABLES IN SCHEMA</command> clauses require the invoking
- user to be a superuser.
+ To add a table or sequence to a publication, the invoking user must have
+ ownership rights on the table or sequence. The <command>FOR ALL
+ ...</command> clauses require the invoking user to be a superuser.
</para>
<para>
diff --git a/src/backend/catalog/pg_publication.c
b/src/backend/catalog/pg_publication.c
index d866e8a9b2..8e26e0cee2 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -636,7 +636,7 @@ GetAllSequencesPublications(void)
SysScanDesc scan;
HeapTuple tup;
- /* Find all publications that are marked as for all tables. */
+ /* Find all publications that are marked as for all sequences. */
rel = table_open(PublicationRelationId, AccessShareLock);
ScanKeyInit(&scankey,
@@ -892,11 +892,7 @@ GetAllSchemaPublicationRelations(Oid pubid, bool sequences,
}
/*
- * Gets list of all relation published by FOR ALL TABLES publication(s).
- *
- * If the publication publishes partition changes via their respective root
- * partitioned tables, we must exclude partitions in favor of including the
- * root partitioned tables.
+ * Gets list of all relation published by FOR ALL SEQUENCES publication(s).
*/
List *
GetAllSequencesPublicationRelations(void)
diff --git a/src/backend/commands/subscriptioncmds.c
b/src/backend/commands/subscriptioncmds.c
index 5beb67e765..1c70c4369a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1859,7 +1859,7 @@ fetch_sequence_list(WalReceiverConn *wrconn, List
*publications)
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
- (errmsg("could not receive list of replicated
tables from the publisher: %s",
+ (errmsg("could not receive list of replicated
sequences from the publisher: %s",
res->err)));
/* Process tables. */
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 9097ac3fab..6ff0ddd62b 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9705,13 +9705,6 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes
OWNER TO RoleSpec
*
* CREATE PUBLICATION FOR pub_obj [, ...] [WITH options]
*
- * pub_obj is one of:
- *
- * TABLE table [, ...]
- * SEQUENCE table [, ...]
- * ALL TABLES IN SCHEMA schema [, ...]
- * ALL SEQUENCES IN SCHEMA schema [, ...]
- *
*****************************************************************************/
CreatePublicationStmt:
@@ -9868,13 +9861,6 @@ pub_obj_list: PublicationObjSpec
*
* ALTER PUBLICATION name SET pub_obj [, ...]
*
- * pub_obj is one of:
- *
- * TABLE table_name [, ...]
- * SEQUENCE table_name [, ...]
- * ALL TABLES IN SCHEMA schema_name [, ...]
- * ALL SEQUENCES IN SCHEMA schema_name [, ...]
- *
*****************************************************************************/
AlterPublicationStmt:
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 860c31fa05..1282c15f92 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -142,9 +142,9 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_tablespace.h"
+#include "commands/sequence.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
-#include "commands/sequence.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index ef8c6e43c6..35a8fc7631 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3819,19 +3819,19 @@ getPublications(Archive *fout, int *numPublications)
appendPQExpBuffer(query,
"SELECT p.tableoid, p.oid,
p.pubname, "
"p.pubowner, "
- "p.puballtables, false AS
p.puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false
AS p.pubsequence, p.pubviaroot "
+ "p.puballtables, false AS
puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS
pubsequence, p.pubviaroot "
"FROM pg_publication p");
else if (fout->remoteVersion >= 110000)
appendPQExpBuffer(query,
"SELECT p.tableoid, p.oid,
p.pubname, "
"p.pubowner, "
- "p.puballtables, false AS
p.puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false
AS p.pubsequence, false AS pubviaroot "
+ "p.puballtables, false AS
puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS
pubsequence, false AS pubviaroot "
"FROM pg_publication p");
else
appendPQExpBuffer(query,
"SELECT p.tableoid, p.oid,
p.pubname, "
"p.pubowner, "
- "p.puballtables, false AS
p.puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate,
false AS p.pubsequence, false AS pubviaroot "
+ "p.puballtables, false AS
puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate,
false AS pubsequence, false AS pubviaroot "
"FROM pg_publication p");
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
diff --git a/src/test/regress/expected/publication.out
b/src/test/regress/expected/publication.out
index 620fab87e6..92c50b13ec 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -262,7 +262,7 @@ Sequences from schemas:
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_forschema FOR ALL SEQUENCES IN SCHEMA pub_test;
RESET client_min_messages;
--- fail - can't create publication with schema and table of the same schema
+-- fail - can't create publication with schema and sequence of the same schema
CREATE PUBLICATION testpub_for_seq_schema FOR ALL SEQUENCES IN SCHEMA
pub_test, SEQUENCE pub_test.testpub_seq1;
ERROR: cannot add relation "pub_test.testpub_seq1" to publication
DETAIL: Sequence's schema "pub_test" is already part of the publication or
part of the specified schema list.
diff --git a/src/test/regress/sql/object_address.sql
b/src/test/regress/sql/object_address.sql
index f90afad804..2f40156eb4 100644
--- a/src/test/regress/sql/object_address.sql
+++ b/src/test/regress/sql/object_address.sql
@@ -143,6 +143,7 @@ CREATE STATISTICS addr_nsp.gentable_stat ON a, b FROM
addr_nsp.gentable;
SELECT pg_get_object_address('publication', '{one,two}', '{}');
SELECT pg_get_object_address('subscription', '{one}', '{}');
SELECT pg_get_object_address('subscription', '{one,two}', '{}');
+
-- test successful cases
WITH objects (type, name, args) AS (VALUES
('table', '{addr_nsp, gentable}'::text[],
'{}'::text[]),
diff --git a/src/test/regress/sql/publication.sql
b/src/test/regress/sql/publication.sql
index af665395e1..5043c4bbba 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -144,7 +144,7 @@ CREATE PUBLICATION testpub_forsequence FOR SEQUENCE
testpub_seq0;
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_forschema FOR ALL SEQUENCES IN SCHEMA pub_test;
RESET client_min_messages;
--- fail - can't create publication with schema and table of the same schema
+-- fail - can't create publication with schema and sequence of the same schema
CREATE PUBLICATION testpub_for_seq_schema FOR ALL SEQUENCES IN SCHEMA
pub_test, SEQUENCE pub_test.testpub_seq1;
-- fail - can't add a sequence of the same schema to the schema publication
ALTER PUBLICATION testpub_forschema ADD SEQUENCE pub_test.testpub_seq1;
diff --git a/src/test/subscription/t/029_sequences.pl
b/src/test/subscription/t/029_sequences.pl
index cdd7f7f344..9ae3c03d7d 100644
--- a/src/test/subscription/t/029_sequences.pl
+++ b/src/test/subscription/t/029_sequences.pl
@@ -46,7 +46,7 @@
"ALTER PUBLICATION seq_pub ADD SEQUENCE s");
$node_subscriber->safe_psql('postgres',
- "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr'
PUBLICATION seq_pub WITH (slot_name = seq_sub_slot)"
+ "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr'
PUBLICATION seq_pub"
);
$node_publisher->wait_for_catchup('seq_sub');
@@ -73,7 +73,7 @@
));
is( $result, '132|0|t',
- 'check replicated sequence values on subscriber');
+ 'initial test data replicated');
# advance the sequence in a rolled-back transaction - the rollback
@@ -96,7 +96,7 @@
));
is( $result, '231|0|t',
- 'check replicated sequence values on subscriber');
+ 'advance sequence in rolled-back transaction');
# create a new sequence and roll it back - should not be replicated, due to
@@ -119,7 +119,7 @@
));
is( $result, '1|0|f',
- 'check replicated sequence values on subscriber');
+ 'create new sequence and roll it back');
# create a new sequence, advance it in a rolled-back transaction, but commit
@@ -150,7 +150,7 @@
));
is( $result, '132|0|t',
- 'check replicated sequence values on subscriber');
+ 'create sequence, advance it in rolled-back transaction, but commit the
create');
# advance the new sequence in a transaction, and roll it back - the rollback
@@ -173,7 +173,7 @@
));
is( $result, '231|0|t',
- 'check replicated sequence values on subscriber');
+ 'advance the new sequence in a transaction and roll it back');
# advance the sequence in a subtransaction - the subtransaction gets rolled
@@ -196,7 +196,7 @@
));
is( $result, '330|0|t',
- 'check replicated sequence values on subscriber');
+ 'advance sequence in a subtransaction');
done_testing();
--
2.35.1