Hi,
Here's a an updated version of this patch - rebased to current master,
and fixing some of the issues raised in Peter's review.
Mainly, this adds a TAP test to src/test/subscription, focusing on
testing the various situations with transactional and non-transactional
behavior (with subtransactions and various ROLLBACK versions).
This new TAP test however uncovered an issue with wait_for_catchup(),
because that uses pg_current_wal_lsn() to wait for replication of all
the changes. But that does not work when the sequence gets advanced in a
transaction that is then aborted, e.g. like this:
BEGIN;
SELECT nextval('s') FROM generate_series(1,100);
ROLLBACK;
The root cause is that pg_current_wal_lsn() uses the LogwrtResult.Write,
which is updated by XLogFlush() - but only in RecordTransactionCommit.
Which makes sense, because only the committed stuff is "visible". But
the non-transactional behavior changes this, because now some of the
changes from aborted transactions may need to be replicated. Which means
the wait_for_catchup() ends up not waiting for the sequence change.
One option would be adding XLogFlush() to RecordTransactionAbort(), but
my guess is we don't do that intentionally (even though aborts should be
fairly rare in most workloads).
I'm not entirely sure changing this (replicating changes from aborted
xacts) is a good idea. Maybe it'd be better to replicate this "lazily" -
instead of replicating the advances right away, we might remember which
sequences were advanced in the transaction, and then replicate current
state for those sequences at commit time.
The idea is that if an increment is "invisible" we probably don't need
to replicate it, it's fine to replicate the next "visible" increment. So
for example given
BEGIN;
SELECT nextval('s');
ROLLBACK;
BEGIN;
SELECT nextval('s');
COMMIT;
we don't need to replicate the first change, but we need to replicate
the second one.
The trouble is we don't decode individual sequence advances, just those
that update the sequence tuple (so every 32 values or so). So we'd need
to remeber the first increment, in a way that is (a) persistent across
restarts and (b) shared by all backends.
The other challenge seems to be ordering of the changes - at the moment
we have no issues with this, because increments on the same sequence are
replicated immediately, in the WAL order. But postponing them to commit
time would affect this order.
I've also briefly looked at the code duplication - there's a couple
functions in the patch that I shamelessly copy-pasted and tweaked to
handle sequences instead of tables:
publicationcmds.c
-----------------
1) OpenTableList/CloseTableList - > OpenSequenceList/CloseSequenceList
Trivial differences, trivial to get rid of - the only difference is
pretty much just table_open vs. relation open.
2) AlterPublicationTables -> AlterPublicationSequences
This is a bit more complicated, because the tables also handle
inheritance (which I think does not apply to sequences). Other than
that, it's calling the functions from (1).
subscriptioncmds.c
------------------
1) fetch_table_list, fetch_sequence_list
Minimal differences, essentially just the catalog name.
2) AlterSubscription_refresh
A lot of duplication, but the code handling tables and sequences is
almost exactly the same and can be reused fairly easily (moved to a
separate function, called for tables and then sequences).
regards
--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 9a31e0b879..56ddc3abae 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
- spill slot truncate stream stats twophase twophase_stream
+ spill slot truncate stream stats twophase twophase_stream \
+ sequence
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot
diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out
new file mode 100644
index 0000000000..07f3e3e92c
--- /dev/null
+++ b/contrib/test_decoding/expected/sequence.out
@@ -0,0 +1,327 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE SEQUENCE test_sequence;
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 4
+(1 row)
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 14
+(1 row)
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 4000
+(1 row)
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, true);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, false);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3500
+(1 row)
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+ data
+------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 4, log_cnt: 0 is_called: 1
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 334, log_cnt: 0 is_called: 1
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 14, log_cnt: 32 is_called: 1
+ COMMIT
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 14, log_cnt: 0 is_called: 1
+ COMMIT
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 4000, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 4320, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3830, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3830, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 0
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3820, log_cnt: 0 is_called: 1
+(24 rows)
+
+DROP SEQUENCE test_sequence;
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3001
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3002
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3003
+(1 row)
+
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 6001
+(1 row)
+
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data
+------
+(0 rows)
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data
+------
+(0 rows)
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 3003 | 30 | t
+(1 row)
+
+SELECT nextval('test_table_a_seq');
+ nextval
+---------
+ 3004
+(1 row)
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+ data
+---------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_table_a_seq transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 3000, log_cnt: 0 is_called: 1
+ sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 3033, log_cnt: 0 is_called: 1
+ BEGIN
+ COMMIT
+(8 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+ data
+-------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_table_a_seq transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ sequence: public.test_table_a_seq transactional: 1 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ table public.test_table: INSERT: a[integer]:1 b[integer]:100
+ table public.test_table: INSERT: a[integer]:2 b[integer]:200
+ COMMIT
+(6 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3001
+(1 row)
+
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ setval
+--------
+ 5000
+(1 row)
+
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 3001 | 32 | t
+(1 row)
+
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+ data
+------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ sequence: public.test_sequence transactional: 1 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 1 created: 0 last_value: 3000, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 1 created: 0 last_value: 3033, log_cnt: 0 is_called: 1
+ COMMIT
+(6 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql
new file mode 100644
index 0000000000..d8a34738f3
--- /dev/null
+++ b/contrib/test_decoding/sql/sequence.sql
@@ -0,0 +1,119 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE SEQUENCE test_sequence;
+
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, true);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, false);
+SELECT nextval('test_sequence');
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+DROP SEQUENCE test_sequence;
+
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+SELECT nextval('test_table_a_seq');
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index e5cd84e85e..54d734cb85 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -35,6 +35,7 @@ typedef struct
bool include_timestamp;
bool skip_empty_xacts;
bool only_local;
+ bool skip_sequences;
} TestDecodingData;
/*
@@ -76,6 +77,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called);
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
@@ -141,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
+ cb->sequence_cb = pg_decode_sequence;
cb->filter_prepare_cb = pg_decode_filter_prepare;
cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
cb->prepare_cb = pg_decode_prepare_txn;
@@ -175,6 +181,9 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->skip_empty_xacts = false;
data->only_local = false;
+ /* skip sequences by default for backwards compatibility */
+ data->skip_sequences = true;
+
ctx->output_plugin_private = data;
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
@@ -265,6 +274,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "skip-sequences") == 0)
+ {
+
+ if (elem->arg == NULL)
+ continue; /* true by default */
+ else if (!parse_bool(strVal(elem->arg), &data->skip_sequences))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -744,6 +764,26 @@ pg_decode_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
+static void
+pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ char *nspname = get_namespace_name(RelationGetNamespace(rel));
+
+ /* return if requested to skip_sequences */
+ if (data->skip_sequences)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfo(ctx->out, "sequence: %s.%s transactional: %d created: %d last_value: %zu, log_cnt: %zu is_called: %d",
+ nspname, RelationGetRelationName(rel),
+ transactional, created, last_value, log_cnt, is_called);
+ OutputPluginWrite(ctx, true);
+}
+
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 2b2c70a26e..a342a54629 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9430,6 +9430,11 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l
<entry>prepared transactions</entry>
</row>
+ <row>
+ <entry><link linkend="view-pg-publication-sequences"><structname>pg_publication_sequences</structname></link></entry>
+ <entry>publications and their associated sequences</entry>
+ </row>
+
<row>
<entry><link linkend="view-pg-publication-tables"><structname>pg_publication_tables</structname></link></entry>
<entry>publications and their associated tables</entry>
@@ -11264,6 +11269,72 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
</sect1>
+ <sect1 id="view-pg-publication-sequences">
+ <title><structname>pg_publication_sequences</structname></title>
+
+ <indexterm zone="view-pg-publication-sequences">
+ <primary>pg_publication_sequences</primary>
+ </indexterm>
+
+ <para>
+ The view <structname>pg_publication_sequences</structname> provides
+ information about the mapping between publications and the sequences they
+ contain. Unlike the underlying catalog
+ <link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>,
+ this view expands
+ publications defined as <literal>FOR ALL SEQUENCES</literal>, so for such
+ publications there will be a row for each eligible sequence.
+ </para>
+
+ <table>
+ <title><structname>pg_publication_sequences</structname> Columns</title>
+ <tgroup cols="1">
+ <thead>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ Column Type
+ </para>
+ <para>
+ Description
+ </para></entry>
+ </row>
+ </thead>
+
+ <tbody>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>pubname</structfield> <type>name</type>
+ (references <link linkend="catalog-pg-publication"><structname>pg_publication</structname></link>.<structfield>pubname</structfield>)
+ </para>
+ <para>
+ Name of publication
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>schemaname</structfield> <type>name</type>
+ (references <link linkend="catalog-pg-namespace"><structname>pg_namespace</structname></link>.<structfield>nspname</structfield>)
+ </para>
+ <para>
+ Name of schema containing sequence
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>sequencename</structfield> <type>name</type>
+ (references <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>relname</structfield>)
+ </para>
+ <para>
+ Name of sequence
+ </para></entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ </sect1>
+
<sect1 id="view-pg-publication-tables">
<title><structname>pg_publication_tables</structname></title>
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index faa114b2c6..c68a1573de 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -24,6 +24,9 @@ PostgreSQL documentation
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
@@ -50,7 +53,18 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
</para>
<para>
- The fourth variant of this command listed in the synopsis can change
+ The next three variants change which sequences are part of the publication.
+ The <literal>SET SEQUENCE</literal> clause will replace the list of sequences
+ in the publication with the specified one. The <literal>ADD SEQUENCE</literal>
+ and <literal>DROP SEQUENCE</literal> clauses will add and remove one or more
+ sequences from the publication. Note that adding sequences to a publication
+ that is already subscribed to will require a <literal>ALTER SUBSCRIPTION
+ ... REFRESH PUBLICATION</literal> action on the subscribing side in order
+ to become effective.
+ </para>
+
+ <para>
+ The seventh variant of this command listed in the synopsis can change
all of the publication properties specified in
<xref linkend="sql-createpublication"/>. Properties not mentioned in the
command retain their previous settings.
@@ -62,7 +76,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
<para>
You must own the publication to use <command>ALTER PUBLICATION</command>.
- Adding a table to a publication additionally requires owning that table.
+ Adding a table to a publication additionally requires owning that table,
+ and the same requirement applies to sequences.
To alter the owner, you must also be a direct or indirect member of the new
owning role. The new owner must have <literal>CREATE</literal> privilege on
the database. Also, the new owner of a <literal>FOR ALL TABLES</literal>
@@ -97,6 +112,15 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><replaceable class="parameter">sequence_name</replaceable></term>
+ <listitem>
+ <para>
+ Name of an existing sequence.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term>
<listitem>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index a6f994450d..ff1dcd3bfd 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -144,8 +144,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<term><literal>REFRESH PUBLICATION</literal></term>
<listitem>
<para>
- Fetch missing table information from publisher. This will start
- replication of tables that were added to the subscribed-to publications
+ Fetch missing table and sequence information from publisher. This will start
+ replication of tables and sequences that were added to the subscribed-to publications
since the last invocation of <command>REFRESH PUBLICATION</command> or
since <command>CREATE SUBSCRIPTION</command>.
</para>
@@ -162,7 +162,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
Specifies whether the existing data in the publications that are
being subscribed to should be copied once the replication starts.
The default is <literal>true</literal>. (Previously subscribed
- tables are not copied.)
+ tables and sequences are not copied.)
</para>
</listitem>
</varlistentry>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 2a2fe03c13..a66a5a2aec 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -51,7 +51,8 @@ check_publication_add_relation(Relation targetrel)
{
/* Must be a regular or partitioned table */
if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
- RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
+ RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE &&
+ RelationGetForm(targetrel)->relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot add relation \"%s\" to publication",
@@ -98,7 +99,8 @@ static bool
is_publishable_class(Oid relid, Form_pg_class reltuple)
{
return (reltuple->relkind == RELKIND_RELATION ||
- reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
+ reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
+ reltuple->relkind == RELKIND_SEQUENCE) &&
!IsCatalogRelationOid(relid) &&
reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
relid >= FirstNormalObjectId;
@@ -271,6 +273,10 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+ /* skip sequences here */
+ if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+ continue;
+
if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
pub_partopt != PUBLICATION_PART_ROOT)
{
@@ -304,6 +310,49 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
return result;
}
+/*
+ * Gets list of relation oids for a publication (sequences only).
+ *
+ * This should only be used for normal publications, the FOR ALL TABLES
+ * should use GetAllSequencesPublicationRelations().
+ */
+List *
+GetPublicationSequenceRelations(Oid pubid)
+{
+ List *result;
+ Relation pubrelsrel;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ HeapTuple tup;
+
+ /* Find all publications associated with the relation. */
+ pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
+
+ ScanKeyInit(&scankey,
+ Anum_pg_publication_rel_prpubid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(pubid));
+
+ scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
+ true, NULL, 1, &scankey);
+
+ result = NIL;
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_publication_rel pubrel;
+
+ pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+ if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+ result = lappend_oid(result, pubrel->prrelid);
+ }
+
+ systable_endscan(scan);
+ table_close(pubrelsrel, AccessShareLock);
+
+ return result;
+}
+
/*
* Gets list of publication oids for publications marked as FOR ALL TABLES.
*/
@@ -404,6 +453,46 @@ GetAllTablesPublicationRelations(bool pubviaroot)
return result;
}
+/*
+ * Gets list of all relation published by FOR ALL TABLES publication(s).
+ *
+ * If the publication publishes partition changes via their respective root
+ * partitioned tables, we must exclude partitions in favor of including the
+ * root partitioned tables.
+ */
+List *
+GetAllSequencesPublicationRelations(void)
+{
+ Relation classRel;
+ ScanKeyData key[1];
+ TableScanDesc scan;
+ HeapTuple tuple;
+ List *result = NIL;
+
+ classRel = table_open(RelationRelationId, AccessShareLock);
+
+ ScanKeyInit(&key[0],
+ Anum_pg_class_relkind,
+ BTEqualStrategyNumber, F_CHAREQ,
+ CharGetDatum(RELKIND_SEQUENCE));
+
+ scan = table_beginscan_catalog(classRel, 1, key);
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+ Oid relid = relForm->oid;
+
+ if (is_publishable_class(relid, relForm))
+ result = lappend_oid(result, relid);
+ }
+
+ table_endscan(scan);
+
+ table_close(classRel, AccessShareLock);
+ return result;
+}
+
/*
* Get publication using oid
*
@@ -426,10 +515,12 @@ GetPublication(Oid pubid)
pub->oid = pubid;
pub->name = pstrdup(NameStr(pubform->pubname));
pub->alltables = pubform->puballtables;
+ pub->allsequences = pubform->puballsequences;
pub->pubactions.pubinsert = pubform->pubinsert;
pub->pubactions.pubupdate = pubform->pubupdate;
pub->pubactions.pubdelete = pubform->pubdelete;
pub->pubactions.pubtruncate = pubform->pubtruncate;
+ pub->pubactions.pubsequence = pubform->pubsequence;
pub->pubviaroot = pubform->pubviaroot;
ReleaseSysCache(tup);
@@ -555,3 +646,56 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+
+/*
+ * Returns Oids of sequences in a publication.
+ */
+Datum
+pg_get_publication_sequences(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ Publication *publication;
+ List *sequences;
+
+ /* stuff done only on the first call of the function */
+ if (SRF_IS_FIRSTCALL())
+ {
+ MemoryContext oldcontext;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /* switch to memory context appropriate for multiple function calls */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ publication = GetPublicationByName(pubname, false);
+
+ /*
+ * Publications support partitioned tables, although all changes are
+ * replicated using leaf partition identity and schema, so we only
+ * need those.
+ */
+ if (publication->allsequences)
+ sequences = GetAllSequencesPublicationRelations();
+ else
+ sequences = GetPublicationSequenceRelations(publication->oid);
+
+ funcctx->user_fctx = (void *) sequences;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ /* stuff done on every call of the function */
+ funcctx = SRF_PERCALL_SETUP();
+ sequences = (List *) funcctx->user_fctx;
+
+ if (funcctx->call_cntr < list_length(sequences))
+ {
+ Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
+
+ SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 55f6e3711d..9bec49bc3e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -372,6 +372,16 @@ CREATE VIEW pg_publication_tables AS
pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE C.oid = GPT.relid;
+CREATE VIEW pg_publication_sequences AS
+ SELECT
+ P.pubname AS pubname,
+ N.nspname AS schemaname,
+ C.relname AS sequencename
+ FROM pg_publication P,
+ LATERAL pg_get_publication_sequences(P.pubname) GPT,
+ pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
+ WHERE C.oid = GPT.relid;
+
CREATE VIEW pg_locks AS
SELECT * FROM pg_lock_status() AS L;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 8487eeb7e6..90ad83d4a1 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -16,6 +16,7 @@
#include "access/genam.h"
#include "access/htup_details.h"
+#include "access/relation.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
@@ -54,6 +55,12 @@ static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
AlterPublicationStmt *stmt);
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
+static List *OpenSequenceList(List *sequences);
+static void CloseSequenceList(List *rels);
+static void PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+ AlterPublicationStmt *stmt);
+static void PublicationDropSequences(Oid pubid, List *rels, bool missing_ok);
+
static void
parse_publication_options(ParseState *pstate,
List *options,
@@ -72,6 +79,7 @@ parse_publication_options(ParseState *pstate,
pubactions->pubupdate = true;
pubactions->pubdelete = true;
pubactions->pubtruncate = true;
+ pubactions->pubsequence = true;
*publish_via_partition_root = false;
/* Parse options */
@@ -96,6 +104,7 @@ parse_publication_options(ParseState *pstate,
pubactions->pubupdate = false;
pubactions->pubdelete = false;
pubactions->pubtruncate = false;
+ pubactions->pubsequence = false;
*publish_given = true;
publish = defGetString(defel);
@@ -118,6 +127,8 @@ parse_publication_options(ParseState *pstate,
pubactions->pubdelete = true;
else if (strcmp(publish_opt, "truncate") == 0)
pubactions->pubtruncate = true;
+ else if (strcmp(publish_opt, "sequence") == 0)
+ pubactions->pubsequence = true;
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -208,6 +219,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
BoolGetDatum(pubactions.pubdelete);
values[Anum_pg_publication_pubtruncate - 1] =
BoolGetDatum(pubactions.pubtruncate);
+ values[Anum_pg_publication_pubsequence - 1] =
+ BoolGetDatum(pubactions.pubsequence);
values[Anum_pg_publication_pubviaroot - 1] =
BoolGetDatum(publish_via_partition_root);
@@ -373,9 +386,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
rels = OpenTableList(stmt->tables);
- if (stmt->tableAction == DEFELEM_ADD)
+ if (stmt->action == DEFELEM_ADD)
PublicationAddTables(pubid, rels, false, stmt);
- else if (stmt->tableAction == DEFELEM_DROP)
+ else if (stmt->action == DEFELEM_DROP)
PublicationDropTables(pubid, rels, false);
else /* DEFELEM_SET */
{
@@ -426,6 +439,82 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
CloseTableList(rels);
}
+/*
+ * Add or remove sequence to/from publication.
+ */
+static void
+AlterPublicationSequences(AlterPublicationStmt *stmt, Relation rel,
+ HeapTuple tup)
+{
+ List *rels = NIL;
+ Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+ Oid pubid = pubform->oid;
+
+ /* Check that user is allowed to manipulate the publication tables. */
+ if (pubform->puballsequences)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
+ NameStr(pubform->pubname)),
+ errdetail("Sequences cannot be added to or dropped from FOR ALL TABLES publications.")));
+
+ Assert(list_length(stmt->sequences) > 0);
+
+ rels = OpenSequenceList(stmt->sequences);
+
+ if (stmt->action == DEFELEM_ADD)
+ PublicationAddSequences(pubid, rels, false, stmt);
+ else if (stmt->action == DEFELEM_DROP)
+ PublicationDropSequences(pubid, rels, false);
+ else /* DEFELEM_SET */
+ {
+ List *oldrelids = GetPublicationRelations(pubid,
+ PUBLICATION_PART_ROOT);
+ List *delrels = NIL;
+ ListCell *oldlc;
+
+ /* Calculate which relations to drop. */
+ foreach(oldlc, oldrelids)
+ {
+ Oid oldrelid = lfirst_oid(oldlc);
+ ListCell *newlc;
+ bool found = false;
+
+ foreach(newlc, rels)
+ {
+ Relation newrel = (Relation) lfirst(newlc);
+
+ if (RelationGetRelid(newrel) == oldrelid)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ {
+ Relation oldrel = relation_open(oldrelid,
+ ShareUpdateExclusiveLock);
+
+ delrels = lappend(delrels, oldrel);
+ }
+ }
+
+ /* And drop them. */
+ PublicationDropSequences(pubid, delrels, true);
+
+ /*
+ * Don't bother calculating the difference for adding, we'll catch and
+ * skip existing ones when doing catalog update.
+ */
+ PublicationAddSequences(pubid, rels, true, stmt);
+
+ CloseSequenceList(delrels);
+ }
+
+ CloseSequenceList(rels);
+}
+
/*
* Alter the existing publication.
*
@@ -459,8 +548,12 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
if (stmt->options)
AlterPublicationOptions(pstate, stmt, rel, tup);
- else
+ else if (stmt->tables)
AlterPublicationTables(stmt, rel, tup);
+ else if (stmt->sequences)
+ AlterPublicationSequences(stmt, rel, tup);
+ else
+ Assert(false);
/* Cleanup. */
heap_freetuple(tup);
@@ -665,6 +758,139 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
}
}
+/*
+ * Open relations specified by a RangeVar list.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
+ */
+static List *
+OpenSequenceList(List *sequences)
+{
+ List *relids = NIL;
+ List *rels = NIL;
+ ListCell *lc;
+
+ /*
+ * Open, share-lock, and check all the explicitly-specified relations
+ */
+ foreach(lc, sequences)
+ {
+ RangeVar *rv = castNode(RangeVar, lfirst(lc));
+ Relation rel;
+ Oid myrelid;
+
+ /* Allow query cancel in case this takes a long time */
+ CHECK_FOR_INTERRUPTS();
+
+ rel = relation_openrv(rv, ShareUpdateExclusiveLock);
+ myrelid = RelationGetRelid(rel);
+
+ /*
+ * Filter out duplicates if user specifies "foo, foo".
+ *
+ * Note that this algorithm is known to not be very efficient (O(N^2))
+ * but given that it only works on list of sequences given to us by user
+ * it's deemed acceptable.
+ */
+ if (list_member_oid(relids, myrelid))
+ {
+ relation_close(rel, ShareUpdateExclusiveLock);
+ continue;
+ }
+
+ rels = lappend(rels, rel);
+ relids = lappend_oid(relids, myrelid);
+ }
+
+ list_free(relids);
+
+ return rels;
+}
+
+/*
+ * Close all relations in the list.
+ */
+static void
+CloseSequenceList(List *rels)
+{
+ ListCell *lc;
+
+ foreach(lc, rels)
+ {
+ Relation rel = (Relation) lfirst(lc);
+
+ relation_close(rel, NoLock);
+ }
+}
+
+/*
+ * Add listed tables to the publication.
+ */
+static void
+PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+ AlterPublicationStmt *stmt)
+{
+ ListCell *lc;
+
+ Assert(!stmt || !stmt->for_all_sequences);
+
+ foreach(lc, rels)
+ {
+ Relation rel = (Relation) lfirst(lc);
+ ObjectAddress obj;
+
+ /* Must be owner of the table or superuser. */
+ if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
+ RelationGetRelationName(rel));
+
+ obj = publication_add_relation(pubid, rel, if_not_exists);
+ if (stmt)
+ {
+ EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+ (Node *) stmt);
+
+ InvokeObjectPostCreateHook(PublicationRelRelationId,
+ obj.objectId, 0);
+ }
+ }
+}
+
+/*
+ * Remove listed sequences from the publication.
+ */
+static void
+PublicationDropSequences(Oid pubid, List *rels, bool missing_ok)
+{
+ ObjectAddress obj;
+ ListCell *lc;
+ Oid prid;
+
+ foreach(lc, rels)
+ {
+ Relation rel = (Relation) lfirst(lc);
+ Oid relid = RelationGetRelid(rel);
+
+ prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(pubid));
+ if (!OidIsValid(prid))
+ {
+ if (missing_ok)
+ continue;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("relation \"%s\" is not part of the publication",
+ RelationGetRelationName(rel))));
+ }
+
+ ObjectAddressSet(obj, PublicationRelRelationId, prid);
+ performDeletion(&obj, DROP_CASCADE, 0);
+ }
+}
+
+
/*
* Internal workhorse for changing a publication owner
*/
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 72bfdc07a4..2fc644c74c 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -94,7 +94,7 @@ static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */
*/
static SeqTableData *last_used_seq = NULL;
-static void fill_seq_with_data(Relation rel, HeapTuple tuple);
+static void fill_seq_with_data(Relation rel, HeapTuple tuple, bool create);
static Relation lock_and_open_sequence(SeqTable seq);
static void create_seq_hashtable(void);
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
@@ -222,7 +222,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
/* now initialize the sequence's data */
tuple = heap_form_tuple(tupDesc, value, null);
- fill_seq_with_data(rel, tuple);
+ fill_seq_with_data(rel, tuple, true);
/* process OWNED BY if given */
if (owned_by)
@@ -327,7 +327,86 @@ ResetSequence(Oid seq_relid)
/*
* Insert the modified tuple into the new storage file.
*/
- fill_seq_with_data(seq_rel, tuple);
+ fill_seq_with_data(seq_rel, tuple, true);
+
+ /* Clear local cache so that we don't think we have cached numbers */
+ /* Note that we do not change the currval() state */
+ elm->cached = elm->last;
+
+ relation_close(seq_rel, NoLock);
+}
+
+/*
+ * Reset a sequence to its initial value.
+ *
+ * The change is made transactionally, so that on failure of the current
+ * transaction, the sequence will be restored to its previous state.
+ * We do that by creating a whole new relfilenode for the sequence; so this
+ * works much like the rewriting forms of ALTER TABLE.
+ *
+ * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
+ * which must not be released until end of transaction. Caller is also
+ * responsible for permissions checking.
+ */
+void
+ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
+{
+ Relation seq_rel;
+ SeqTable elm;
+ Form_pg_sequence_data seq;
+ Buffer buf;
+ HeapTupleData seqdatatuple;
+ HeapTuple tuple;
+
+ /*
+ * Read the old sequence. This does a bit more work than really
+ * necessary, but it's simple, and we do want to double-check that it's
+ * indeed a sequence.
+ */
+ init_sequence(seq_relid, &elm, &seq_rel);
+ (void) read_seq_tuple(seq_rel, &buf, &seqdatatuple);
+
+ /*
+ * Copy the existing sequence tuple.
+ */
+ tuple = heap_copytuple(&seqdatatuple);
+
+ /* Now we're done with the old page */
+ UnlockReleaseBuffer(buf);
+
+ /*
+ * Modify the copied tuple to execute the restart (compare the RESTART
+ * action in AlterSequence)
+ */
+ seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+ seq->last_value = last_value;
+ seq->is_called = is_called;
+ seq->log_cnt = log_cnt;
+
+ /*
+ * Create a new storage file for the sequence.
+ */
+ RelationSetNewRelfilenode(seq_rel, seq_rel->rd_rel->relpersistence);
+
+ /*
+ * Ensure sequence's relfrozenxid is at 0, since it won't contain any
+ * unfrozen XIDs. Same with relminmxid, since a sequence will never
+ * contain multixacts.
+ */
+ Assert(seq_rel->rd_rel->relfrozenxid == InvalidTransactionId);
+ Assert(seq_rel->rd_rel->relminmxid == InvalidMultiXactId);
+
+ /*
+ * Insert the modified tuple into the new storage file.
+ *
+ * XXX Maybe this should also use created=true, just like the other places
+ * calling fill_seq_with_data. That's probably needed for correct cascading
+ * replication.
+ *
+ * XXX That'd mean all fill_seq_with_data callers use created=true, making
+ * the parameter unnecessary.
+ */
+ fill_seq_with_data(seq_rel, tuple, false);
/* Clear local cache so that we don't think we have cached numbers */
/* Note that we do not change the currval() state */
@@ -340,7 +419,7 @@ ResetSequence(Oid seq_relid)
* Initialize a sequence's relation with the specified tuple as content
*/
static void
-fill_seq_with_data(Relation rel, HeapTuple tuple)
+fill_seq_with_data(Relation rel, HeapTuple tuple, bool create)
{
Buffer buf;
Page page;
@@ -378,8 +457,21 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
/* check the comment above nextval_internal()'s equivalent call. */
if (RelationNeedsWAL(rel))
+ {
GetTopTransactionId();
+ /*
+ * Ensure we have a proper XID, which will be included in the XLOG
+ * record by XLogRecordAssemble. Otherwise the first nextval() in
+ * a subxact (without any preceding changes) would get XID 0,
+ * and it'd be impossible to decide which top xact it belongs to.
+ * It'd also trigger assert in DecodeSequence.
+ *
+ * XXX Not sure if this is the best solution.
+ */
+ GetCurrentTransactionId();
+ }
+
START_CRIT_SECTION();
MarkBufferDirty(buf);
@@ -399,6 +491,7 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
xlrec.node = rel->rd_node;
+ xlrec.created = create;
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) tuple->t_data, tuple->t_len);
@@ -502,7 +595,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
/*
* Insert the modified tuple into the new storage file.
*/
- fill_seq_with_data(seqrel, newdatatuple);
+ fill_seq_with_data(seqrel, newdatatuple, true);
}
/* process OWNED BY if given */
@@ -766,8 +859,21 @@ nextval_internal(Oid relid, bool check_permissions)
* (Have to do that here, so we're outside the critical section)
*/
if (logit && RelationNeedsWAL(seqrel))
+ {
GetTopTransactionId();
+ /*
+ * Ensure we have a proper XID, which will be included in the XLOG
+ * record by XLogRecordAssemble. Otherwise the first nextval() in
+ * a subxact (without any preceding changes) would get XID 0,
+ * and it'd be impossible to decide which top xact it belongs to.
+ * It'd also trigger assert in DecodeSequence.
+ *
+ * XXX Not sure if this is the best solution.
+ */
+ GetCurrentTransactionId();
+ }
+
/* ready to change the on-disk (or really, in-buffer) tuple */
START_CRIT_SECTION();
@@ -803,6 +909,7 @@ nextval_internal(Oid relid, bool check_permissions)
seq->log_cnt = 0;
xlrec.node = seqrel->rd_node;
+ xlrec.created = false;
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -977,8 +1084,21 @@ do_setval(Oid relid, int64 next, bool iscalled)
/* check the comment above nextval_internal()'s equivalent call. */
if (RelationNeedsWAL(seqrel))
+ {
GetTopTransactionId();
+ /*
+ * Ensure we have a proper XID, which will be included in the XLOG
+ * record by XLogRecordAssemble. Otherwise the first nextval() in
+ * a subxact (without any preceding changes) would get XID 0,
+ * and it'd be impossible to decide which top xact it belongs to.
+ * It'd also trigger assert in DecodeSequence.
+ *
+ * XXX Not sure if this is the best solution.
+ */
+ GetCurrentTransactionId();
+ }
+
/* ready to change the on-disk (or really, in-buffer) tuple */
START_CRIT_SECTION();
@@ -999,6 +1119,8 @@ do_setval(Oid relid, int64 next, bool iscalled)
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
xlrec.node = seqrel->rd_node;
+ xlrec.created = false;
+
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 22ae982328..2d03cf011e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -84,6 +84,7 @@ typedef struct SubOpts
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -521,6 +522,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
char *err;
WalReceiverConn *wrconn;
List *tables;
+ List *sequences;
ListCell *lc;
char table_state;
@@ -559,6 +561,26 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
InvalidXLogRecPtr);
}
+ /*
+ * Get the sequence list from publisher and build local sequence
+ * status info.
+ */
+ sequences = fetch_sequence_list(wrconn, publications);
+ foreach(lc, sequences)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+ /* Check for supported relkind. */
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+
+ AddSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
+ }
+
/*
* If requested, create permanent slot for the subscription. We
* won't use the initial snapshot for anything, so no need to
@@ -731,6 +753,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
{
Oid relid = subrel_local_oids[off];
+ /* XXX ignore sequences - maybe do this in GetSubscriptionRelations? */
+ if (get_rel_relkind(relid) == RELKIND_SEQUENCE)
+ continue;
+
if (!bsearch(&relid, pubrel_local_oids,
list_length(pubrel_names), sizeof(Oid), oid_cmp))
{
@@ -822,6 +848,183 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
}
}
+
+ /*
+ * XXX now do the same thing for sequences, maybe before the preceding
+ * block, or earlier?
+ */
+
+ /* Get the table list from publisher. */
+ pubrel_names = fetch_sequence_list(wrconn, sub->publications);
+
+ /* Get local table list. */
+ subrel_states = GetSubscriptionRelations(sub->oid);
+
+ /*
+ * Build qsorted array of local table oids for faster lookup. This can
+ * potentially contain all tables in the database so speed of lookup
+ * is important.
+ */
+ subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+ off = 0;
+ foreach(lc, subrel_states)
+ {
+ SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+
+ subrel_local_oids[off++] = relstate->relid;
+ }
+ qsort(subrel_local_oids, list_length(subrel_states),
+ sizeof(Oid), oid_cmp);
+
+ /*
+ * Rels that we want to remove from subscription and drop any slots
+ * and origins corresponding to them.
+ */
+ sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+
+ /*
+ * Walk over the remote tables and try to match them to locally known
+ * tables. If the table is not known locally create a new state for
+ * it.
+ *
+ * Also builds array of local oids of remote tables for the next step.
+ */
+ off = 0;
+ pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+ foreach(lc, pubrel_names)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+ /* Check for supported relkind. */
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+
+ pubrel_local_oids[off++] = relid;
+
+ if (!bsearch(&relid, subrel_local_oids,
+ list_length(subrel_states), sizeof(Oid), oid_cmp))
+ {
+ AddSubscriptionRelState(sub->oid, relid,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
+ rv->schemaname, rv->relname, sub->name)));
+ }
+ }
+
+ /*
+ * Next remove state for tables we should not care about anymore using
+ * the data we collected above
+ */
+ qsort(pubrel_local_oids, list_length(pubrel_names),
+ sizeof(Oid), oid_cmp);
+
+ remove_rel_len = 0;
+ for (off = 0; off < list_length(subrel_states); off++)
+ {
+ Oid relid = subrel_local_oids[off];
+
+ /* XXX ignore non-sequences - maybe do this in GetSubscriptionRelations? */
+ if (get_rel_relkind(relid) != RELKIND_SEQUENCE)
+ continue;
+
+ if (!bsearch(&relid, pubrel_local_oids,
+ list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ {
+ char state;
+ XLogRecPtr statelsn;
+
+ /*
+ * Lock pg_subscription_rel with AccessExclusiveLock to
+ * prevent any race conditions with the apply worker
+ * re-launching workers at the same time this code is trying
+ * to remove those tables.
+ *
+ * Even if new worker for this particular rel is restarted it
+ * won't be able to make any progress as we hold exclusive
+ * lock on subscription_rel till the transaction end. It will
+ * simply exit as there is no corresponding rel entry.
+ *
+ * This locking also ensures that the state of rels won't
+ * change till we are done with this refresh operation.
+ */
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+ /* Last known rel state. */
+ state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+
+ sub_remove_rels[remove_rel_len].relid = relid;
+ sub_remove_rels[remove_rel_len++].state = state;
+
+ RemoveSubscriptionRel(sub->oid, relid);
+
+ logicalrep_worker_stop(sub->oid, relid);
+
+ /*
+ * For READY state, we would have already dropped the
+ * tablesync origin.
+ */
+ if (state != SUBREL_STATE_READY)
+ {
+ char originname[NAMEDATALEN];
+
+ /*
+ * Drop the tablesync's origin tracking if exists.
+ *
+ * It is possible that the origin is not yet created for
+ * tablesync worker, this can happen for the states before
+ * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+ * concurrently try to drop the origin and by this time
+ * the origin might be already removed. For these reasons,
+ * passing missing_ok = true.
+ */
+ ReplicationOriginNameForTablesync(sub->oid, relid, originname,
+ sizeof(originname));
+ replorigin_drop_by_name(originname, true, false);
+ }
+
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name)));
+ }
+ }
+
+ /*
+ * Drop the tablesync slots associated with removed tables. This has
+ * to be at the end because otherwise if there is an error while doing
+ * the database operations we won't be able to rollback dropped slots.
+ */
+ for (off = 0; off < remove_rel_len; off++)
+ {
+ if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
+ sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ /*
+ * For READY/SYNCDONE states we know the tablesync slot has
+ * already been dropped by the tablesync worker.
+ *
+ * For other states, there is no certainty, maybe the slot
+ * does not exist yet. Also, if we fail after removing some of
+ * the slots, next time, it will again try to drop already
+ * dropped slots and fail. For these reasons, we allow
+ * missing_ok = true for the drop.
+ */
+ ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
+ syncslotname, sizeof(syncslotname));
+ ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+ }
+ }
+
}
PG_FINALLY();
{
@@ -1635,6 +1838,75 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
return tablelist;
}
+/*
+ * Get the list of sequences which belong to specified publications on the
+ * publisher connection.
+ */
+static List *
+fetch_sequence_list(WalReceiverConn *wrconn, List *publications)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[2] = {TEXTOID, TEXTOID};
+ ListCell *lc;
+ bool first;
+ List *tablelist = NIL;
+
+ Assert(list_length(publications) > 0);
+
+ initStringInfo(&cmd);
+ appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n"
+ " FROM pg_catalog.pg_publication_sequences s\n"
+ " WHERE s.pubname IN (");
+ first = true;
+ foreach(lc, publications)
+ {
+ char *pubname = strVal(lfirst(lc));
+
+ if (first)
+ first = false;
+ else
+ appendStringInfoString(&cmd, ", ");
+
+ appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+ }
+ appendStringInfoChar(&cmd, ')');
+
+ res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not receive list of replicated tables from the publisher: %s",
+ res->err)));
+
+ /* Process tables. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *nspname;
+ char *relname;
+ bool isnull;
+ RangeVar *rv;
+
+ nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+ relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ rv = makeRangeVar(nspname, relname, -1);
+ tablelist = lappend(tablelist, rv);
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+
+ return tablelist;
+}
+
/*
* This is to report the connection failure while dropping replication slots.
* Here, we report the WARNING for all tablesync slots so that user can drop
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 574d7d27fd..6f0d42d551 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -608,7 +608,7 @@ void
CheckSubscriptionRelkind(char relkind, const char *nspname,
const char *relname)
{
- if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+ if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot use relation \"%s.%s\" as logical replication target",
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 29020c908e..04bb9a27a8 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4834,8 +4834,10 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from)
COPY_STRING_FIELD(pubname);
COPY_NODE_FIELD(options);
COPY_NODE_FIELD(tables);
+ COPY_NODE_FIELD(sequences);
COPY_SCALAR_FIELD(for_all_tables);
- COPY_SCALAR_FIELD(tableAction);
+ COPY_SCALAR_FIELD(for_all_sequences);
+ COPY_SCALAR_FIELD(action);
return newnode;
}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 8a1762000c..ebf48f88f1 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2308,8 +2308,10 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a,
COMPARE_STRING_FIELD(pubname);
COMPARE_NODE_FIELD(options);
COMPARE_NODE_FIELD(tables);
+ COMPARE_NODE_FIELD(sequences);
COMPARE_SCALAR_FIELD(for_all_tables);
- COMPARE_SCALAR_FIELD(tableAction);
+ COMPARE_SCALAR_FIELD(for_all_sequences);
+ COMPARE_SCALAR_FIELD(action);
return true;
}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 39a2849eba..d42be08c0c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -434,6 +434,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <node> group_by_item empty_grouping_set rollup_clause cube_clause
%type <node> grouping_sets_clause
%type <node> opt_publication_for_tables publication_for_tables
+/* %type <node> opt_publication_for_sequences publication_for_sequences */
%type <list> opt_fdw_options fdw_options
%type <defelt> fdw_option
@@ -9596,6 +9597,7 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec
*****************************************************************************/
CreatePublicationStmt:
+ /* CREATE PUBLICATION name opt_publication_for_tables opt_publication_for_sequences opt_definition */
CREATE PUBLICATION name opt_publication_for_tables opt_definition
{
CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
@@ -9630,6 +9632,12 @@ publication_for_tables:
}
;
+/*
+ * FIXME
+ *
+ * opt_publication_for_sequences and publication_for_sequences should be
+ * copies for sequences
+ */
/*****************************************************************************
*
@@ -9641,6 +9649,12 @@ publication_for_tables:
*
* ALTER PUBLICATION name SET TABLE table [, table2]
*
+ * ALTER PUBLICATION name ADD SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name DROP SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name SET SEQUENCE sequence [, sequence2]
+ *
*****************************************************************************/
AlterPublicationStmt:
@@ -9656,7 +9670,7 @@ AlterPublicationStmt:
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
- n->tableAction = DEFELEM_ADD;
+ n->action = DEFELEM_ADD;
$$ = (Node *)n;
}
| ALTER PUBLICATION name SET TABLE relation_expr_list
@@ -9664,7 +9678,7 @@ AlterPublicationStmt:
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
- n->tableAction = DEFELEM_SET;
+ n->action = DEFELEM_SET;
$$ = (Node *)n;
}
| ALTER PUBLICATION name DROP TABLE relation_expr_list
@@ -9672,7 +9686,31 @@ AlterPublicationStmt:
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
- n->tableAction = DEFELEM_DROP;
+ n->action = DEFELEM_DROP;
+ $$ = (Node *)n;
+ }
+ | ALTER PUBLICATION name ADD_P SEQUENCE relation_expr_list
+ {
+ AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+ n->pubname = $3;
+ n->sequences = $6;
+ n->action = DEFELEM_ADD;
+ $$ = (Node *)n;
+ }
+ | ALTER PUBLICATION name SET SEQUENCE relation_expr_list
+ {
+ AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+ n->pubname = $3;
+ n->sequences = $6;
+ n->action = DEFELEM_SET;
+ $$ = (Node *)n;
+ }
+ | ALTER PUBLICATION name DROP SEQUENCE relation_expr_list
+ {
+ AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+ n->pubname = $3;
+ n->sequences = $6;
+ n->action = DEFELEM_DROP;
$$ = (Node *)n;
}
;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 2874dc0612..98d0edefb0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -42,6 +42,7 @@
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "storage/standby.h"
+#include "commands/sequence.h"
typedef struct XLogRecordBuffer
{
@@ -74,10 +75,11 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
bool two_phase);
static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_prepare *parsed);
-
+static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
+static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
/* helper functions for decoding transactions */
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -158,6 +160,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
DecodeLogicalMsgOp(ctx, &buf);
break;
+ case RM_SEQ_ID:
+ DecodeSequence(ctx, &buf);
+ break;
+
/*
* Rmgrs irrelevant for logical decoding; they describe stuff not
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -173,7 +179,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
case RM_HASH_ID:
case RM_GIN_ID:
case RM_GIST_ID:
- case RM_SEQ_ID:
case RM_SPGIST_ID:
case RM_BRIN_ID:
case RM_COMMIT_TS_ID:
@@ -1312,3 +1317,125 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
ctx->fast_forward || FilterByOrigin(ctx, origin_id));
}
+
+/*
+ * Decode Sequence Tuple
+ */
+static void
+DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
+{
+ int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
+
+ Assert(datalen >= 0);
+
+ tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;;
+
+ ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+ tuple->tuple.t_tableOid = InvalidOid;
+
+ memcpy(((char *) tuple->tuple.t_data),
+ data + sizeof(xl_seq_rec),
+ SizeofHeapTupleHeader);
+
+ memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
+ data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
+ datalen);
+}
+
+/*
+ * Handle sequence decode
+ *
+ * Decoding sequences is a bit tricky, because while most sequence actions
+ * are non-transactional (not subject to rollback), some need to be handled
+ * as transactional.
+ *
+ * By default, a sequence increment is non-transactional - we must not queue
+ * it in a transaction as other changes, because the transaction might get
+ * rolled back and we'd discard the increment. The downstream would not be
+ * notified about the increment, which is wrong.
+ *
+ * On the other hand, the sequence may be created in a transaction. In this
+ * case we *should* queue the change as other changes in the transaction,
+ * because we don't want to send the increments for unknown sequence to the
+ * plugin - it might get confused about which sequence it's related to etc.
+ */
+static void
+DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBufferTupleBuf *tuplebuf;
+ RelFileNode target_node;
+ XLogReaderState *r = buf->record;
+ char *tupledata = NULL;
+ Size tuplelen;
+ Size datalen = 0;
+ TransactionId xid = XLogRecGetXid(r);
+ uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+ xl_seq_rec *xlrec;
+ Snapshot snapshot;
+ RepOriginId origin_id = XLogRecGetOrigin(r);
+ bool transactional;
+
+ /* only decode changes flagged with XLOG_SEQ_LOG */
+ if (info != XLOG_SEQ_LOG)
+ elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
+
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding messages.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
+ return;
+
+ /* only interested in our database */
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ tupledata = XLogRecGetData(r);
+ datalen = XLogRecGetDataLen(r);
+ tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
+
+ /* extract the WAL record, with "created" flag */
+ xlrec = (xl_seq_rec *) XLogRecGetData(r);
+
+ /* XXX how could we have sequence change without data? */
+ if(!datalen || !tupledata)
+ return;
+
+ tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+ DecodeSeqTuple(tupledata, datalen, tuplebuf);
+
+ /*
+ * Should we handle the sequence increment as transactional or not?
+ *
+ * If the sequence was created in a still-running transaction, treat
+ * it as transactional and queue the increments. Otherwise it needs
+ * to be treated as non-transactional, in which case we send it to
+ * the plugin right away.
+ */
+ transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
+ target_node,
+ xlrec->created);
+
+ /* Skip the change if already processed (per the snapshot). */
+ if (transactional &&
+ !SnapBuildProcessChange(builder, xid, buf->origptr))
+ return;
+ else if (!transactional &&
+ (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+ SnapBuildXactNeedsSkip(builder, buf->origptr)))
+ return;
+
+ /* Queue the increment (or send immediately if not transactional). */
+ snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+ ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
+ origin_id, target_node, transactional,
+ xlrec->created, tuplebuf);
+}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8dcb564af8..a74dd75fde 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called);
/* streaming callbacks */
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -217,6 +221,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->apply_truncate = truncate_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
+ ctx->reorder->sequence = sequence_cb_wrapper;
/*
* To support streaming, we require start/stop/abort/commit/change
@@ -1198,6 +1203,43 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ if (ctx->callbacks.sequence_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "sequence";
+ state.report_location = sequence_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = sequence_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+ created, last_value, log_cnt, is_called);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr first_lsn)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 2d774567e0..d42c749cfa 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -622,6 +622,58 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
pq_sendbytes(out, message, sz);
}
+/*
+ * Write SEQUENCE to stream
+ */
+void
+logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
+ XLogRecPtr lsn, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ uint8 flags = 0;
+ char *relname;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ pq_sendint8(out, flags);
+ pq_sendint64(out, lsn);
+
+ logicalrep_write_namespace(out, RelationGetNamespace(rel));
+ relname = RelationGetRelationName(rel);
+ pq_sendstring(out, relname);
+
+ pq_sendint8(out, transactional);
+ pq_sendint8(out, created);
+ pq_sendint64(out, last_value);
+ pq_sendint64(out, log_cnt);
+ pq_sendint8(out, is_called);
+}
+
+/*
+ * Read SEQUENCE from the stream.
+ */
+void
+logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
+{
+ /* XXX skipping flags and lsn */
+ pq_getmsgint(in, 1);
+ pq_getmsgint64(in);
+
+ /* Read relation name from stream */
+ seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
+ seqdata->seqname = pstrdup(pq_getmsgstring(in));
+
+ seqdata->transactional = pq_getmsgint(in, 1);
+ seqdata->created = pq_getmsgint(in, 1);
+ seqdata->last_value = pq_getmsgint64(in);
+ seqdata->log_cnt = pq_getmsgint64(in);
+ seqdata->is_called = pq_getmsgint(in, 1);
+}
+
/*
* Write relation description to the output stream.
*/
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7378beb684..f99e45f22d 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -116,6 +116,13 @@ typedef struct ReorderBufferTXNByIdEnt
ReorderBufferTXN *txn;
} ReorderBufferTXNByIdEnt;
+/* entry for hash table we use to track sequences created in running xacts */
+typedef struct ReorderBufferSequenceEnt
+{
+ RelFileNode rnode;
+ TransactionId xid;
+} ReorderBufferSequenceEnt;
+
/* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
typedef struct ReorderBufferTupleCidKey
{
@@ -338,6 +345,14 @@ ReorderBufferAllocate(void)
buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ /* hash table of sequences, mapping relfilenode to XID of transaction */
+ hash_ctl.keysize = sizeof(RelFileNode);
+ hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
+ hash_ctl.hcxt = buffer->context;
+
+ buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
buffer->by_txn_last_xid = InvalidTransactionId;
buffer->by_txn_last_txn = NULL;
@@ -528,6 +543,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
break;
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ if (change->data.sequence.tuple)
+ {
+ ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
+ change->data.sequence.tuple = NULL;
+ }
+ break;
}
pfree(change);
@@ -856,6 +878,212 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
}
}
+/*
+ * Treat the sequence increment as transactional?
+ */
+bool
+ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+ RelFileNode rnode, bool created)
+{
+ bool found = false;
+
+ if (created)
+ return true;
+
+ hash_search(rb->sequences,
+ (void *) &rnode,
+ HASH_FIND,
+ &found);
+
+ return found;
+}
+
+
+/*
+ * A transactional sequence increment is queued to be processed upon commit
+ * and a non-transactional increment gets processed immediately.
+ *
+ * A sequence update may be both transactional and non-transactional. When
+ * created in a running transaction, treat it as transactional and queue
+ * the change in it. Otherwise treat it as non-transactional, so that we
+ * don't forget the increment in case of a rollback.
+ */
+void
+ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+ Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+ RelFileNode rnode, bool transactional, bool created,
+ ReorderBufferTupleBuf *tuplebuf)
+{
+ /*
+ * Change needs to be handled as transactional, because the sequence was
+ * created in a transaction that is still running. In that case all the
+ * changes need to be queued in that transaction, we must not send them
+ * to the downstream until the transaction commits.
+ *
+ * There's a bit of a trouble with subtransactions - we can't queue it
+ * into the subxact, because it might be rolled back and we'd lose the
+ * increment. We need to queue it into the same (sub)xact that created
+ * the sequence, which is why we track the XID in the hash table.
+ */
+ if (transactional)
+ {
+ MemoryContext oldcontext;
+ ReorderBufferChange *change;
+
+ /* lookup sequence by relfilenode */
+ ReorderBufferSequenceEnt *ent;
+ bool found;
+
+ /* transactional changes require a transaction */
+ Assert(xid != InvalidTransactionId);
+
+ /* search the lookup table (we ignore the return value, found is enough) */
+ ent = hash_search(rb->sequences,
+ (void *) &rnode,
+ created ? HASH_ENTER : HASH_FIND,
+ &found);
+
+ /*
+ * If this is the "create" increment, we must not have found any
+ * pre-existing entry in the hash table (i.e. there must not be
+ * any conflicting sequence).
+ */
+ Assert(!(created && found));
+
+ /* But we must have either created or found an existing entry. */
+ Assert(created || found);
+
+ /*
+ * When creating the sequence, remember the XID of the transaction
+ * that created id.
+ */
+ if (created)
+ ent->xid = xid;
+
+ /* XXX Maybe check that we're still in the same top-level xact? */
+
+ /* OK, allocate and queue the change */
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ change = ReorderBufferGetChange(rb);
+
+ change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
+ change->origin_id = origin_id;
+
+ memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
+
+ change->data.sequence.created = created;
+ change->data.sequence.tuple = tuplebuf;
+
+ /* add it to the same subxact that created the sequence */
+ ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ /*
+ * This increment is for a sequence that was not created in any
+ * running transaction, so we treat it as non-transactional and
+ * just send it to the output plugin directly.
+ */
+ ReorderBufferTXN *txn = NULL;
+ volatile Snapshot snapshot_now = snapshot;
+ bool using_subtxn;
+
+#ifdef USE_ASSERT_CHECKING
+ /* All "creates" have to be handled as transactional. */
+ Assert(!created);
+
+ /* Make sure the sequence is not in the hash table. */
+ {
+ bool found;
+ hash_search(rb->sequences,
+ (void *) &rnode,
+ HASH_FIND, &found);
+ Assert(!found);
+ }
+#endif
+
+ if (xid != InvalidTransactionId)
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ /* setup snapshot to allow catalog access */
+ SetupHistoricSnapshot(snapshot_now, NULL);
+
+ /*
+ * Decoding needs access to syscaches et al., which in turn use
+ * heavyweight locks and such. Thus we need to have enough state around to
+ * keep track of those. The easiest way is to simply use a transaction
+ * internally. That also allows us to easily enforce that nothing writes
+ * to the database by checking for xid assignments.
+ *
+ * When we're called via the SQL SRF there's already a transaction
+ * started, so start an explicit subtransaction there.
+ */
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ PG_TRY();
+ {
+ Relation relation;
+ HeapTuple tuple;
+ bool isnull;
+ int64 last_value, log_cnt;
+ bool is_called;
+ Oid reloid;
+
+ if (using_subtxn)
+ BeginInternalSubTransaction("sequence");
+ else
+ StartTransactionCommand();
+
+ reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
+
+ if (reloid == InvalidOid)
+ elog(ERROR, "could not map filenode \"%s\" to relation OID",
+ relpathperm(rnode,
+ MAIN_FORKNUM));
+
+ relation = RelationIdGetRelation(reloid);
+ tuple = &tuplebuf->tuple;
+
+ /*
+ * Extract the internal sequence values, describing the state.
+ *
+ * XXX Seems a bit strange to access it directly. Maybe there's
+ * a better / more correct way?
+ */
+ last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull);
+ log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
+ is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
+
+ rb->sequence(rb, txn, lsn, relation, transactional, created,
+ last_value, log_cnt, is_called);
+
+ RelationClose(relation);
+
+ TeardownHistoricSnapshot(false);
+
+ AbortCurrentTransaction();
+
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+ }
+ PG_CATCH();
+ {
+ TeardownHistoricSnapshot(true);
+
+ AbortCurrentTransaction();
+
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+}
+
/*
* AssertTXNLsnOrder
* Verify LSN ordering of transaction lists in the reorderbuffer
@@ -1532,6 +1760,31 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
&found);
Assert(found);
+ /*
+ * Remove sequences created in this transaction (if any).
+ *
+ * There's no way to search by XID, so we simply do a seqscan of all
+ * the entries in the hash table. Hopefully there are only a couple
+ * entries in most cases - people generally don't create many new
+ * sequences over and over.
+ */
+ {
+ HASH_SEQ_STATUS scan_status;
+ ReorderBufferSequenceEnt *ent;
+
+ hash_seq_init(&scan_status, rb->sequences);
+ while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
+ {
+ /* skip sequences not from this transaction */
+ if (ent->xid != txn->xid)
+ continue;
+
+ (void) hash_search(rb->sequences,
+ (void *) &(ent->rnode),
+ HASH_REMOVE, NULL);
+ }
+ }
+
/* remove entries spilled to disk */
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
@@ -1947,6 +2200,39 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
change->data.msg.message);
}
+/*
+ * Helper function for ReorderBufferProcessTXN for applying sequences.
+ */
+static inline void
+ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change,
+ bool streaming)
+{
+ HeapTuple tuple;
+ bool isnull;
+ int64 last_value, log_cnt;
+ bool is_called;
+
+ /* FIXME support streaming */
+ Assert(!streaming);
+
+ tuple = &change->data.sequence.tuple->tuple;
+
+ /*
+ * Extract the internal sequence values, describing the state.
+ *
+ * XXX Seems a bit strange to access it directly. Maybe there's
+ * a better / more correct way?
+ */
+ last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull);
+ log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
+ is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
+
+ rb->sequence(rb, txn, change->lsn, relation, true, /* gotta be transactional */
+ change->data.sequence.created,
+ last_value, log_cnt, is_called);
+}
+
/*
* Function to store the command id and snapshot at the end of the current
* stream so that we can reuse the same while sending the next stream.
@@ -2389,6 +2675,31 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
elog(ERROR, "tuplecid value in changequeue");
break;
+
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ Assert(snapshot_now);
+
+ reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
+ change->data.sequence.relnode.relNode);
+
+ if (reloid == InvalidOid)
+ elog(ERROR, "could not map filenode \"%s\" to relation OID",
+ relpathperm(change->data.sequence.relnode,
+ MAIN_FORKNUM));
+
+ relation = RelationIdGetRelation(reloid);
+
+ if (!RelationIsValid(relation))
+ elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+ reloid,
+ relpathperm(change->data.sequence.relnode,
+ MAIN_FORKNUM));
+
+ if (RelationIsLogicallyLogged(relation))
+ ReorderBufferApplySequence(rb, txn, relation, change, streaming);
+
+ RelationClose(relation);
+ break;
}
}
@@ -3776,6 +4087,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
memcpy(data, change->data.truncate.relids, size);
data += size;
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ {
+ char *data;
+ ReorderBufferTupleBuf *tup;
+ Size len = 0;
+
+ tup = change->data.sequence.tuple;
+
+ if (tup)
+ {
+ sz += sizeof(HeapTupleData);
+ len = tup->tuple.t_len;
+ sz += len;
+ }
+
+ /* make sure we have enough space */
+ ReorderBufferSerializeReserve(rb, sz);
+
+ data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+ /* might have been reallocated above */
+ ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+ if (len)
+ {
+ memcpy(data, &tup->tuple, sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ memcpy(data, tup->tuple.t_data, len);
+ data += len;
+ }
+
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4040,6 +4384,22 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
{
sz += sizeof(Oid) * change->data.truncate.nrelids;
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ {
+ ReorderBufferTupleBuf *tup;
+ Size len = 0;
+
+ tup = change->data.sequence.tuple;
+
+ if (tup)
+ {
+ sz += sizeof(HeapTupleData);
+ len = tup->tuple.t_len;
+ sz += len;
+ }
+
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4341,6 +4701,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
+
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ if (change->data.sequence.tuple)
+ {
+ uint32 tuplelen = ((HeapTuple) data)->t_len;
+
+ change->data.sequence.tuple =
+ ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
+
+ /* restore ->tuple */
+ memcpy(&change->data.sequence.tuple->tuple, data,
+ sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ /* reset t_data pointer into the new tuplebuf */
+ change->data.sequence.tuple->tuple.t_data =
+ ReorderBufferTupleBufData(change->data.sequence.tuple);
+
+ /* restore tuple data itself */
+ memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
+ data += tuplelen;
+ }
+ break;
+
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..24369a1522 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
+#include "commands/sequence.h"
#include "miscadmin.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
@@ -357,6 +358,12 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
*
* If the synchronization position is reached (SYNCDONE), then the table can
* be marked as READY and is no longer tracked.
+ *
+ * XXX This needs to handle sequences too - after AlterSubscription_refresh
+ * starts caring about sequences, GetSubscriptionNotReadyRelations won't
+ * return just tables, and we'll have to sync them here. Not sure it's worth
+ * creating a new "sync" worker per sequence, maybe we should just sync them
+ * in the current process (it's pretty light-weight).
*/
static void
process_syncing_tables_for_apply(XLogRecPtr current_lsn)
@@ -871,6 +878,99 @@ copy_table(Relation rel)
logicalrep_rel_close(relmapentry, NoLock);
}
+
+
+/*
+ * FIXME add comment
+ */
+static void
+fetch_sequence_data(char *nspname, char *relname,
+ int64 *last_value, int64 *log_cnt, bool *is_called)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID};
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
+ " FROM %s", quote_qualified_identifier(nspname, relname));
+
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not receive list of replicated tables from the publisher: %s",
+ res->err)));
+
+ /* Process the sequence. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ bool isnull;
+
+ *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+
+ *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull));
+ Assert(!isnull);
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+}
+
+/*
+ * Copy existing data of a sequence from publisher.
+ *
+ * Caller is responsible for locking the local relation.
+ */
+static void
+copy_sequence(Relation rel)
+{
+ LogicalRepRelMapEntry *relmapentry;
+ LogicalRepRelation lrel;
+ StringInfoData cmd;
+ int64 last_value = 0,
+ log_cnt = 0;
+ bool is_called = 0;
+
+ /* Get the publisher relation info. */
+ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
+ RelationGetRelationName(rel), &lrel);
+
+ /* Put the relation into relmap. */
+ logicalrep_relmap_update(&lrel);
+
+ /* Map the publisher relation to local one. */
+ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
+ Assert(rel == relmapentry->localrel);
+
+ /* Start copy on the publisher. */
+ initStringInfo(&cmd);
+
+ Assert(lrel.relkind == RELKIND_SEQUENCE);
+
+ fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
+
+ elog(WARNING, "sequence %s info last_value %ld log_cnt %ld is_called %d",
+ quote_qualified_identifier(lrel.nspname, lrel.relname),
+ last_value, log_cnt, is_called);
+
+ ResetSequence2(RelationGetRelid(rel), last_value, log_cnt, is_called);
+
+ logicalrep_rel_close(relmapentry, NoLock);
+}
+
+
+
+
/*
* Determine the tablesync slot name.
*
@@ -1106,10 +1206,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
originname)));
}
- /* Now do the initial data copy */
- PushActiveSnapshot(GetTransactionSnapshot());
- copy_table(rel);
- PopActiveSnapshot();
+ if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
+ {
+ /* Now do the initial sequence copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_sequence(rel);
+ PopActiveSnapshot();
+ }
+ else
+ {
+ /* Now do the initial data copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_table(rel);
+ PopActiveSnapshot();
+ }
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 3f499b11f7..f4dbf5ed71 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -144,6 +144,7 @@
#include "catalog/pg_tablespace.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
+#include "commands/sequence.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
@@ -1073,6 +1074,61 @@ apply_handle_origin(StringInfo s)
errmsg_internal("ORIGIN message sent out of order")));
}
+/*
+ * Handle SEQUENCE message.
+ */
+static void
+apply_handle_sequence(StringInfo s)
+{
+ LogicalRepSequence seq;
+ Oid relid;
+
+ if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
+ return;
+
+ logicalrep_read_sequence(s, &seq);
+
+ /*
+ * Non-transactional sequence updates should not be part of a remote
+ * transaction. There should not be any running transaction.
+ */
+ Assert((!seq.transactional) || in_remote_transaction);
+ Assert(!(!seq.transactional && in_remote_transaction));
+ Assert(!(!seq.transactional && IsTransactionState()));
+
+ /*
+ * Make sure we're in a transaction (needed by ResetSequence2). For
+ * non-transactional updates we're guaranteed to start a new one,
+ * and we'll commit it at the end.
+ */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ maybe_reread_subscription();
+ }
+
+ relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
+ seq.seqname, -1),
+ RowExclusiveLock, false);
+
+ /* lock the sequence in AccessExclusiveLock, as expected by ResetSequence2 */
+ elog(WARNING, "locking sequence %d in exclusive mode", relid);
+ LockRelationOid(relid, AccessExclusiveLock);
+
+ elog(WARNING, "applying sequence %s.%s transactional %d created %d last_value %ld log_cnt %ld is_called %d",
+ seq.nspname, seq.seqname, seq.transactional, seq.created, seq.last_value, seq.log_cnt, seq.is_called);
+
+ /* apply the sequence change */
+ ResetSequence2(relid, seq.last_value, seq.log_cnt, seq.is_called);
+
+ /*
+ * Commit the per-stream transaction (we only do this when not in
+ * remote transaction, i.e. for non-transactional sequence updates.
+ */
+ if (!in_remote_transaction)
+ CommitTransactionCommand();
+}
+
/*
* Handle STREAM START message.
*/
@@ -2327,6 +2383,10 @@ apply_dispatch(StringInfo s)
*/
return;
+ case LOGICAL_REP_MSG_SEQUENCE:
+ apply_handle_sequence(s);
+ return;
+
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
return;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e4314af13a..4366c275dc 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -49,6 +49,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pgoutput_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
@@ -157,6 +161,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
+ cb->sequence_cb = pgoutput_sequence;
cb->commit_cb = pgoutput_commit_txn;
cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -186,6 +191,7 @@ parse_output_parameters(List *options, PGOutputData *data)
bool publication_names_given = false;
bool binary_option_given = false;
bool messages_option_given = false;
+ bool sequences_option_given = false;
bool streaming_given = false;
bool two_phase_option_given = false;
@@ -193,6 +199,7 @@ parse_output_parameters(List *options, PGOutputData *data)
data->streaming = false;
data->messages = false;
data->two_phase = false;
+ data->sequences = true;
foreach(lc, options)
{
@@ -258,6 +265,16 @@ parse_output_parameters(List *options, PGOutputData *data)
data->messages = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "sequences") == 0)
+ {
+ if (sequences_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ sequences_option_given = true;
+
+ data->sequences = defGetBoolean(defel);
+ }
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
@@ -865,6 +882,47 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+static void
+pgoutput_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ TransactionId xid = InvalidTransactionId;
+ RelationSyncEntry *relentry;
+
+ if (!data->sequences)
+ return;
+
+ if (!is_publishable_relation(rel))
+ return;
+
+ /* XXX handle (in_streaming) here */
+
+ relentry = get_rel_sync_entry(data, RelationGetRelid(rel));
+
+ /*
+ * First check the sequence filter.
+ *
+ * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here.
+ */
+ if (!relentry->pubactions.pubsequence)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_sequence(ctx->out,
+ rel,
+ xid,
+ sequence_lsn,
+ transactional,
+ created,
+ last_value,
+ log_cnt,
+ is_called);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* Currently we always forward.
*/
@@ -1128,7 +1186,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->streamed_txns = NIL;
entry->replicate_valid = false;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
- entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+ entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
+ entry->pubactions.pubsequence = false;
entry->publish_as_relid = InvalidOid;
entry->map = NULL; /* will be set by maybe_send_schema() if
* needed */
@@ -1140,6 +1199,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
List *pubids = GetRelationPublications(relid);
ListCell *lc;
Oid publish_as_relid = relid;
+ bool is_sequence = (get_rel_relkind(relid) == RELKIND_SEQUENCE);
/* Reload publications if needed before use. */
if (!publications_valid)
@@ -1163,12 +1223,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
Publication *pub = lfirst(lc);
bool publish = false;
- if (pub->alltables)
+ if (pub->alltables && (!is_sequence))
{
publish = true;
if (pub->pubviaroot && am_partition)
publish_as_relid = llast_oid(get_partition_ancestors(relid));
}
+ else if (pub->allsequences && is_sequence)
+ {
+ publish = true;
+ }
+
+ /* if a sequence, just cross-check the list of publications */
+ if (!publish && is_sequence)
+ {
+ if (list_member_oid(pubids, pub->oid))
+ publish = true;
+ }
if (!publish)
{
@@ -1219,10 +1290,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+ entry->pubactions.pubsequence |= pub->pubactions.pubsequence;
}
if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
- entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
+ entry->pubactions.pubdelete && entry->pubactions.pubtruncate &&
+ entry->pubactions.pubsequence)
break;
}
@@ -1367,6 +1440,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
+ entry->pubactions.pubsequence = false;
}
}
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 13d9994af3..3dd39054e6 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5495,6 +5495,7 @@ GetRelationPublicationActions(Relation relation)
pubactions->pubupdate |= pubform->pubupdate;
pubactions->pubdelete |= pubform->pubdelete;
pubactions->pubtruncate |= pubform->pubtruncate;
+ pubactions->pubsequence |= pubform->pubsequence;
ReleaseSysCache(tup);
@@ -5503,7 +5504,8 @@ GetRelationPublicationActions(Relation relation)
* other publications.
*/
if (pubactions->pubinsert && pubactions->pubupdate &&
- pubactions->pubdelete && pubactions->pubtruncate)
+ pubactions->pubdelete && pubactions->pubtruncate &&
+ pubactions->pubsequence)
break;
}
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 064892bade..fac6f9a563 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1640,7 +1640,7 @@ psql_completion(const char *text, int start, int end)
/* ALTER PUBLICATION <name> */
else if (Matches("ALTER", "PUBLICATION", MatchAny))
- COMPLETE_WITH("ADD TABLE", "DROP TABLE", "OWNER TO", "RENAME TO", "SET");
+ COMPLETE_WITH("ADD TABLE", "DROP TABLE", "ADD SEQUENCE", "DROP SEQUENCE", "OWNER TO", "RENAME TO", "SET");
/* ALTER PUBLICATION <name> SET */
else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET"))
COMPLETE_WITH("(", "TABLE");
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 8cd0252082..8700a72dea 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11445,6 +11445,11 @@
provolatile => 's', prorettype => 'oid', proargtypes => 'text',
proallargtypes => '{text,oid}', proargmodes => '{i,o}',
proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' },
+{ oid => '8000', descr => 'get OIDs of sequences in a publication',
+ proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't',
+ provolatile => 's', prorettype => 'oid', proargtypes => 'text',
+ proallargtypes => '{text,oid}', proargmodes => '{i,o}',
+ proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' },
{ oid => '6121',
descr => 'returns whether a relation can be part of a publication',
proname => 'pg_relation_is_publishable', provolatile => 's',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index f332bad4d4..150b8922cf 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -40,6 +40,12 @@ CATALOG(pg_publication,6104,PublicationRelationId)
*/
bool puballtables;
+ /*
+ * indicates that this is special publication which should encompass all
+ * sequences in the database (except for the unlogged and temp ones)
+ */
+ bool puballsequences;
+
/* true if inserts are published */
bool pubinsert;
@@ -52,6 +58,9 @@ CATALOG(pg_publication,6104,PublicationRelationId)
/* true if truncates are published */
bool pubtruncate;
+ /* true if sequences are published */
+ bool pubsequence;
+
/* true if partition changes are published using root schema */
bool pubviaroot;
} FormData_pg_publication;
@@ -72,6 +81,7 @@ typedef struct PublicationActions
bool pubupdate;
bool pubdelete;
bool pubtruncate;
+ bool pubsequence;
} PublicationActions;
typedef struct Publication
@@ -79,6 +89,7 @@ typedef struct Publication
Oid oid;
char *name;
bool alltables;
+ bool allsequences;
bool pubviaroot;
PublicationActions pubactions;
} Publication;
@@ -107,6 +118,9 @@ extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
extern List *GetAllTablesPublications(void);
extern List *GetAllTablesPublicationRelations(bool pubviaroot);
+extern List *GetAllSequencesPublicationRelations(void);
+extern List *GetPublicationSequenceRelations(Oid pubid);
+
extern bool is_publishable_relation(Relation rel);
extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
bool if_not_exists);
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 40544dd4c7..c28e8695cb 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -48,6 +48,7 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data;
typedef struct xl_seq_rec
{
RelFileNode node;
+ bool created; /* is this a CREATE SEQUENCE */
/* SEQUENCE TUPLE DATA FOLLOWS AT THE END */
} xl_seq_rec;
@@ -59,6 +60,7 @@ extern ObjectAddress DefineSequence(ParseState *pstate, CreateSeqStmt *stmt);
extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
extern void DeleteSequenceTuple(Oid relid);
extern void ResetSequence(Oid seq_relid);
+extern void ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called);
extern void ResetSequenceCaches(void);
extern void seq_redo(XLogReaderState *rptr);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index e28248af32..6e76c40f98 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3644,8 +3644,10 @@ typedef struct AlterPublicationStmt
/* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
List *tables; /* List of tables to add/drop */
+ List *sequences; /* List of sequences to add/drop */
bool for_all_tables; /* Special publication for all tables in db */
- DefElemAction tableAction; /* What action to perform with the tables */
+ bool for_all_sequences; /* Special publication for all tables in db */
+ DefElemAction action; /* What action to perform with the tables/sequences */
} AlterPublicationStmt;
typedef struct CreateSubscriptionStmt
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 63de90d94a..931bc0722a 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -60,6 +60,7 @@ typedef enum LogicalRepMsgType
LOGICAL_REP_MSG_RELATION = 'R',
LOGICAL_REP_MSG_TYPE = 'Y',
LOGICAL_REP_MSG_MESSAGE = 'M',
+ LOGICAL_REP_MSG_SEQUENCE = 'X', /* FIXME change */
LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
LOGICAL_REP_MSG_PREPARE = 'P',
LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
@@ -116,6 +117,19 @@ typedef struct LogicalRepTyp
char *typname; /* name of the remote type */
} LogicalRepTyp;
+/* Sequence info */
+typedef struct LogicalRepSequence
+{
+ Oid remoteid; /* unique id of the remote sequence */
+ char *nspname; /* schema name of remote sequence */
+ char *seqname; /* name of the remote sequence */
+ bool transactional;
+ bool created;
+ int64 last_value;
+ int64 log_cnt; /* XXX probably not needed? */
+ bool is_called; /* XXX probably not needed? */
+} LogicalRepSequence;
+
/* Transaction info */
typedef struct LogicalRepBeginData
{
@@ -223,6 +237,12 @@ extern List *logicalrep_read_truncate(StringInfo in,
bool *cascade, bool *restart_seqs);
extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
bool transactional, const char *prefix, Size sz, const char *message);
+extern void logicalrep_write_sequence(StringInfo out, Relation rel,
+ TransactionId xid, XLogRecPtr lsn,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt,
+ bool is_called);
+extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 810495ed0e..e3e1b03f32 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -88,6 +88,19 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
Size message_size,
const char *message);
+/*
+ * Called for the generic logical decoding sequences.
+ */
+typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional,
+ bool created,
+ int64 last_value,
+ int64 log_cnt,
+ bool is_called);
+
/*
* Filter changes by origin.
*/
@@ -219,6 +232,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
+ LogicalDecodeSequenceCB sequence_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 0dc460fb70..97eb2a7ef1 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -28,6 +28,7 @@ typedef struct PGOutputData
bool streaming;
bool messages;
bool two_phase;
+ bool sequences;
} PGOutputData;
#endif /* PGOUTPUT_H */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 5b40ff75f7..4512ed679b 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -64,7 +64,8 @@ enum ReorderBufferChangeType
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
- REORDER_BUFFER_CHANGE_TRUNCATE
+ REORDER_BUFFER_CHANGE_TRUNCATE,
+ REORDER_BUFFER_CHANGE_SEQUENCE
};
/* forward declaration */
@@ -158,6 +159,14 @@ typedef struct ReorderBufferChange
uint32 ninvalidations; /* Number of messages */
SharedInvalidationMessage *invalidations; /* invalidation message */
} inval;
+
+ /* Context data for Sequence changes */
+ struct
+ {
+ RelFileNode relnode;
+ bool created;
+ ReorderBufferTupleBuf *tuple;
+ } sequence;
} data;
/*
@@ -430,6 +439,15 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
const char *prefix, Size sz,
const char *message);
+/* sequence callback signature */
+typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt,
+ bool is_called);
+
/* begin prepare callback signature */
typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn);
@@ -511,6 +529,12 @@ struct ReorderBuffer
*/
HTAB *by_txn;
+ /*
+ * relfilenode => XID lookup table for sequences created in a transaction
+ * (also includes altered sequences, which assigns new relfilenode)
+ */
+ HTAB *sequences;
+
/*
* Transactions that could be a toplevel xact, ordered by LSN of the first
* record bearing that xid.
@@ -541,6 +565,7 @@ struct ReorderBuffer
ReorderBufferApplyTruncateCB apply_truncate;
ReorderBufferCommitCB commit;
ReorderBufferMessageCB message;
+ ReorderBufferSequenceCB sequence;
/*
* Callbacks to be called when streaming a transaction at prepare time.
@@ -635,6 +660,10 @@ void ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size message_size, const char *message);
+void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+ Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+ RelFileNode rnode, bool transactional, bool created,
+ ReorderBufferTupleBuf *tuplebuf);
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
@@ -682,4 +711,7 @@ void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
void StartupReorderBuffer(void);
+bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+ RelFileNode rnode, bool created);
+
#endif
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index e5ab11275d..393be871be 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,6 +1451,14 @@ pg_prepared_xacts| SELECT p.transaction,
FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_publication_sequences| SELECT p.pubname,
+ n.nspname AS schemaname,
+ c.relname AS sequencename
+ FROM pg_publication p,
+ LATERAL pg_get_publication_sequences((p.pubname)::text) gpt(relid),
+ (pg_class c
+ JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
+ WHERE (c.oid = gpt.relid);
pg_publication_tables| SELECT p.pubname,
n.nspname AS schemaname,
c.relname AS tablename
diff --git a/src/test/subscription/t/023_sequences.pl b/src/test/subscription/t/023_sequences.pl
new file mode 100644
index 0000000000..1f7768acd6
--- /dev/null
+++ b/src/test/subscription/t/023_sequences.pl
@@ -0,0 +1,196 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# This tests that sequences are replicated correctly by logical replication
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 6;
+
+# Initialize publisher node
+my $node_publisher = PostgresNode->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgresNode->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+my $ddl = qq(
+ CREATE SEQUENCE s;
+);
+
+# Setup structure on the publisher
+$node_publisher->safe_psql('postgres', $ddl);
+
+# Create some the same structure on subscriber, and an extra sequence that
+# we'll create on the publisher later
+$ddl = qq(
+ CREATE SEQUENCE s;
+ CREATE SEQUENCE s2;
+);
+
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION seq_pub");
+
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION seq_pub ADD SEQUENCE s");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub WITH (slot_name = seq_sub_slot)"
+);
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Wait for initial sync to finish as well
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Insert initial test data
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ -- generate a number of values using the sequence
+ SELECT nextval('s') FROM generate_series(1,100);
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+my $result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s;
+));
+
+is( $result, '132|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+# advance the sequence in a rolled-back transaction - should be replicated
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ SELECT nextval('s') FROM generate_series(1,100);
+ ROLLBACK;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s;
+));
+
+is( $result, '231|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+# create a new sequence and roll it back - should not be replicated, due to
+# the transactional behavior
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ CREATE SEQUENCE s2;
+ ALTER PUBLICATION seq_pub ADD SEQUENCE s2;
+ SELECT nextval('s2') FROM generate_series(1,100);
+ ROLLBACK;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s2;
+));
+
+is( $result, '1|0|f',
+ 'check replicated sequence values on subscriber');
+
+
+# create a new sequence, advance it in a rolled-back transaction, but commit
+# the create - the advance should be replicated nevertheless
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ CREATE SEQUENCE s2;
+ ALTER PUBLICATION seq_pub ADD SEQUENCE s2;
+ SAVEPOINT sp1;
+ SELECT nextval('s2') FROM generate_series(1,100);
+ ROLLBACK TO sp1;
+ COMMIT;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Wait for sync of the second sequence we just added to finish
+$synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s2;
+));
+
+is( $result, '132|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+# advance the new sequence in a transaction, and roll it back - in this case
+# it should be replicated as the behavior is non-transactional
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ SELECT nextval('s2') FROM generate_series(1,100);
+ ROLLBACK;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s2;
+));
+
+is( $result, '231|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+# advance the sequence in a subtransaction - the subtransaction gets rolled
+# back, but commit the main one - the changes should still be replicated
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ SAVEPOINT s1;
+ SELECT nextval('s2') FROM generate_series(1,100);
+ ROLLBACK TO s1;
+ COMMIT;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s2;
+));
+
+is( $result, '330|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');