Hi,
Seems the cfbot was not entirely happy with the patch on some platforms,
so here's a fixed version. There was a bogus call to ensure_transaction
function (which does not exist at all) and a silly bug in transaction
management in apply_handle_sequence.
regards
--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
>From 8c9a36e8e9bd38ddb7e0c449ccdaf0db46ad28d5 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Sun, 13 Jun 2021 23:10:18 +0200
Subject: [PATCH] Logical decoding / replication of sequences
---
contrib/test_decoding/Makefile | 3 +-
contrib/test_decoding/expected/sequence.out | 327 +++++++++++++++
contrib/test_decoding/sql/sequence.sql | 119 ++++++
contrib/test_decoding/test_decoding.c | 40 ++
doc/src/sgml/catalogs.sgml | 71 ++++
doc/src/sgml/ref/alter_publication.sgml | 28 +-
doc/src/sgml/ref/alter_subscription.sgml | 6 +-
src/backend/catalog/pg_publication.c | 160 +++++++-
src/backend/catalog/system_views.sql | 10 +
src/backend/commands/publicationcmds.c | 232 ++++++++++-
src/backend/commands/sequence.c | 132 +++++-
src/backend/commands/subscriptioncmds.c | 274 +++++++++++++
src/backend/executor/execReplication.c | 4 +-
src/backend/nodes/copyfuncs.c | 4 +-
src/backend/nodes/equalfuncs.c | 4 +-
src/backend/parser/gram.y | 44 +-
src/backend/replication/logical/decode.c | 131 +++++-
src/backend/replication/logical/logical.c | 42 ++
src/backend/replication/logical/proto.c | 52 +++
.../replication/logical/reorderbuffer.c | 384 ++++++++++++++++++
src/backend/replication/logical/tablesync.c | 118 +++++-
src/backend/replication/logical/worker.c | 56 +++
src/backend/replication/pgoutput/pgoutput.c | 80 +++-
src/backend/utils/cache/relcache.c | 4 +-
src/bin/psql/tab-complete.c | 2 +-
src/include/catalog/pg_proc.dat | 5 +
src/include/catalog/pg_publication.h | 14 +
src/include/commands/sequence.h | 2 +
src/include/nodes/parsenodes.h | 4 +-
src/include/replication/logicalproto.h | 20 +
src/include/replication/output_plugin.h | 14 +
src/include/replication/pgoutput.h | 1 +
src/include/replication/reorderbuffer.h | 34 +-
src/test/regress/expected/publication.out | 8 +-
src/test/regress/expected/rules.out | 8 +
35 files changed, 2391 insertions(+), 46 deletions(-)
create mode 100644 contrib/test_decoding/expected/sequence.out
create mode 100644 contrib/test_decoding/sql/sequence.sql
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 9a31e0b879..56ddc3abae 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
- spill slot truncate stream stats twophase twophase_stream
+ spill slot truncate stream stats twophase twophase_stream \
+ sequence
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot
diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out
new file mode 100644
index 0000000000..07f3e3e92c
--- /dev/null
+++ b/contrib/test_decoding/expected/sequence.out
@@ -0,0 +1,327 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE SEQUENCE test_sequence;
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 4
+(1 row)
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 14
+(1 row)
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 4000
+(1 row)
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, true);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, false);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3500
+(1 row)
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+ data
+------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 4, log_cnt: 0 is_called: 1
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 334, log_cnt: 0 is_called: 1
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 14, log_cnt: 32 is_called: 1
+ COMMIT
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 14, log_cnt: 0 is_called: 1
+ COMMIT
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 4000, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 4320, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3830, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3830, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 0
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3820, log_cnt: 0 is_called: 1
+(24 rows)
+
+DROP SEQUENCE test_sequence;
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3001
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3002
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3003
+(1 row)
+
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 6001
+(1 row)
+
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data
+------
+(0 rows)
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data
+------
+(0 rows)
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 3003 | 30 | t
+(1 row)
+
+SELECT nextval('test_table_a_seq');
+ nextval
+---------
+ 3004
+(1 row)
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+ data
+---------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_table_a_seq transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 3000, log_cnt: 0 is_called: 1
+ sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 3033, log_cnt: 0 is_called: 1
+ BEGIN
+ COMMIT
+(8 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+ data
+-------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_table_a_seq transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ sequence: public.test_table_a_seq transactional: 1 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ table public.test_table: INSERT: a[integer]:1 b[integer]:100
+ table public.test_table: INSERT: a[integer]:2 b[integer]:200
+ COMMIT
+(6 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3001
+(1 row)
+
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ setval
+--------
+ 5000
+(1 row)
+
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 3001 | 32 | t
+(1 row)
+
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+ data
+------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ sequence: public.test_sequence transactional: 1 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 1 created: 0 last_value: 3000, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 1 created: 0 last_value: 3033, log_cnt: 0 is_called: 1
+ COMMIT
+(6 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql
new file mode 100644
index 0000000000..d8a34738f3
--- /dev/null
+++ b/contrib/test_decoding/sql/sequence.sql
@@ -0,0 +1,119 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE SEQUENCE test_sequence;
+
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, true);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, false);
+SELECT nextval('test_sequence');
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+DROP SEQUENCE test_sequence;
+
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+SELECT nextval('test_table_a_seq');
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index de1b692658..162e3c18f0 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -35,6 +35,7 @@ typedef struct
bool include_timestamp;
bool skip_empty_xacts;
bool only_local;
+ bool skip_sequences;
} TestDecodingData;
/*
@@ -76,6 +77,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called);
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
@@ -141,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
+ cb->sequence_cb = pg_decode_sequence;
cb->filter_prepare_cb = pg_decode_filter_prepare;
cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
cb->prepare_cb = pg_decode_prepare_txn;
@@ -175,6 +181,9 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->skip_empty_xacts = false;
data->only_local = false;
+ /* skip sequences by default for backwards compatibility */
+ data->skip_sequences = true;
+
ctx->output_plugin_private = data;
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
@@ -265,6 +274,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "skip-sequences") == 0)
+ {
+
+ if (elem->arg == NULL)
+ continue; /* true by default */
+ else if (!parse_bool(strVal(elem->arg), &data->skip_sequences))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -744,6 +764,26 @@ pg_decode_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
+static void
+pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ char *nspname = get_namespace_name(RelationGetNamespace(rel));
+
+ /* return if requested to skip_sequences */
+ if (data->skip_sequences)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfo(ctx->out, "sequence: %s.%s transactional: %d created: %d last_value: %zu, log_cnt: %zu is_called: %d",
+ nspname, RelationGetRelationName(rel),
+ transactional, created, last_value, log_cnt, is_called);
+ OutputPluginWrite(ctx, true);
+}
+
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index f517a7d4af..faaa95f56c 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9428,6 +9428,11 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l
<entry>prepared transactions</entry>
</row>
+ <row>
+ <entry><link linkend="view-pg-publication-sequences"><structname>pg_publication_sequences</structname></link></entry>
+ <entry>publications and their associated sequences</entry>
+ </row>
+
<row>
<entry><link linkend="view-pg-publication-tables"><structname>pg_publication_tables</structname></link></entry>
<entry>publications and their associated tables</entry>
@@ -11262,6 +11267,72 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
</sect1>
+ <sect1 id="view-pg-publication-sequences">
+ <title><structname>pg_publication_sequences</structname></title>
+
+ <indexterm zone="view-pg-publication-sequences">
+ <primary>pg_publication_sequences</primary>
+ </indexterm>
+
+ <para>
+ The view <structname>pg_publication_sequences</structname> provides
+ information about the mapping between publications and the sequences they
+ contain. Unlike the underlying catalog
+ <link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>,
+ this view expands
+ publications defined as <literal>FOR ALL SEQUENCES</literal>, so for such
+ publications there will be a row for each eligible sequence.
+ </para>
+
+ <table>
+ <title><structname>pg_publication_sequences</structname> Columns</title>
+ <tgroup cols="1">
+ <thead>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ Column Type
+ </para>
+ <para>
+ Description
+ </para></entry>
+ </row>
+ </thead>
+
+ <tbody>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>pubname</structfield> <type>name</type>
+ (references <link linkend="catalog-pg-publication"><structname>pg_publication</structname></link>.<structfield>pubname</structfield>)
+ </para>
+ <para>
+ Name of publication
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>schemaname</structfield> <type>name</type>
+ (references <link linkend="catalog-pg-namespace"><structname>pg_namespace</structname></link>.<structfield>nspname</structfield>)
+ </para>
+ <para>
+ Name of schema containing sequence
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>sequencename</structfield> <type>name</type>
+ (references <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>relname</structfield>)
+ </para>
+ <para>
+ Name of sequence
+ </para></entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ </sect1>
+
<sect1 id="view-pg-publication-tables">
<title><structname>pg_publication_tables</structname></title>
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index faa114b2c6..c68a1573de 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -24,6 +24,9 @@ PostgreSQL documentation
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
@@ -50,7 +53,18 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
</para>
<para>
- The fourth variant of this command listed in the synopsis can change
+ The next three variants change which sequences are part of the publication.
+ The <literal>SET SEQUENCE</literal> clause will replace the list of sequences
+ in the publication with the specified one. The <literal>ADD SEQUENCE</literal>
+ and <literal>DROP SEQUENCE</literal> clauses will add and remove one or more
+ sequences from the publication. Note that adding sequences to a publication
+ that is already subscribed to will require a <literal>ALTER SUBSCRIPTION
+ ... REFRESH PUBLICATION</literal> action on the subscribing side in order
+ to become effective.
+ </para>
+
+ <para>
+ The seventh variant of this command listed in the synopsis can change
all of the publication properties specified in
<xref linkend="sql-createpublication"/>. Properties not mentioned in the
command retain their previous settings.
@@ -62,7 +76,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
<para>
You must own the publication to use <command>ALTER PUBLICATION</command>.
- Adding a table to a publication additionally requires owning that table.
+ Adding a table to a publication additionally requires owning that table,
+ and the same requirement applies to sequences.
To alter the owner, you must also be a direct or indirect member of the new
owning role. The new owner must have <literal>CREATE</literal> privilege on
the database. Also, the new owner of a <literal>FOR ALL TABLES</literal>
@@ -97,6 +112,15 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><replaceable class="parameter">sequence_name</replaceable></term>
+ <listitem>
+ <para>
+ Name of an existing sequence.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term>
<listitem>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 367ac814f4..3412ca68e9 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -138,8 +138,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<term><literal>REFRESH PUBLICATION</literal></term>
<listitem>
<para>
- Fetch missing table information from publisher. This will start
- replication of tables that were added to the subscribed-to publications
+ Fetch missing table and sequence information from publisher. This will start
+ replication of tables and sequences that were added to the subscribed-to publications
since the last invocation of <command>REFRESH PUBLICATION</command> or
since <command>CREATE SUBSCRIPTION</command>.
</para>
@@ -156,7 +156,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
Specifies whether the existing data in the publications that are
being subscribed to should be copied once the replication starts.
The default is <literal>true</literal>. (Previously subscribed
- tables are not copied.)
+ tables and sequences are not copied.)
</para>
</listitem>
</varlistentry>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 86e415af89..427582f36a 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -51,26 +51,27 @@ check_publication_add_relation(Relation targetrel)
{
/* Must be a regular or partitioned table */
if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
- RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
+ RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE &&
+ RelationGetForm(targetrel)->relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("\"%s\" is not a table",
+ errmsg("\"%s\" is not a table or sequence",
RelationGetRelationName(targetrel)),
- errdetail("Only tables can be added to publications.")));
+ errdetail("Only tables and sequences can be added to publications.")));
- /* Can't be system table */
+ /* Can't be system table/sequence */
if (IsCatalogRelation(targetrel))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("\"%s\" is a system table",
+ errmsg("\"%s\" is a system table or sequence",
RelationGetRelationName(targetrel)),
- errdetail("System tables cannot be added to publications.")));
+ errdetail("System tables / sequences cannot be added to publications.")));
/* UNLOGGED and TEMP relations cannot be part of publication. */
if (!RelationIsPermanent(targetrel))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("table \"%s\" cannot be replicated",
+ errmsg("table or sequence \"%s\" cannot be replicated",
RelationGetRelationName(targetrel)),
errdetail("Temporary and unlogged relations cannot be replicated.")));
}
@@ -98,7 +99,8 @@ static bool
is_publishable_class(Oid relid, Form_pg_class reltuple)
{
return (reltuple->relkind == RELKIND_RELATION ||
- reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
+ reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
+ reltuple->relkind == RELKIND_SEQUENCE) &&
!IsCatalogRelationOid(relid) &&
reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
relid >= FirstNormalObjectId;
@@ -271,6 +273,10 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+ /* skip sequences here */
+ if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+ continue;
+
if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
pub_partopt != PUBLICATION_PART_ROOT)
{
@@ -304,6 +310,49 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
return result;
}
+/*
+ * Gets list of relation oids for a publication (sequences only).
+ *
+ * This should only be used for normal publications, the FOR ALL TABLES
+ * should use GetAllSequencesPublicationRelations().
+ */
+List *
+GetPublicationSequenceRelations(Oid pubid)
+{
+ List *result;
+ Relation pubrelsrel;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ HeapTuple tup;
+
+ /* Find all publications associated with the relation. */
+ pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
+
+ ScanKeyInit(&scankey,
+ Anum_pg_publication_rel_prpubid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(pubid));
+
+ scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
+ true, NULL, 1, &scankey);
+
+ result = NIL;
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_publication_rel pubrel;
+
+ pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+ if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+ result = lappend_oid(result, pubrel->prrelid);
+ }
+
+ systable_endscan(scan);
+ table_close(pubrelsrel, AccessShareLock);
+
+ return result;
+}
+
/*
* Gets list of publication oids for publications marked as FOR ALL TABLES.
*/
@@ -404,6 +453,46 @@ GetAllTablesPublicationRelations(bool pubviaroot)
return result;
}
+/*
+ * Gets list of all relation published by FOR ALL TABLES publication(s).
+ *
+ * If the publication publishes partition changes via their respective root
+ * partitioned tables, we must exclude partitions in favor of including the
+ * root partitioned tables.
+ */
+List *
+GetAllSequencesPublicationRelations(void)
+{
+ Relation classRel;
+ ScanKeyData key[1];
+ TableScanDesc scan;
+ HeapTuple tuple;
+ List *result = NIL;
+
+ classRel = table_open(RelationRelationId, AccessShareLock);
+
+ ScanKeyInit(&key[0],
+ Anum_pg_class_relkind,
+ BTEqualStrategyNumber, F_CHAREQ,
+ CharGetDatum(RELKIND_SEQUENCE));
+
+ scan = table_beginscan_catalog(classRel, 1, key);
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+ Oid relid = relForm->oid;
+
+ if (is_publishable_class(relid, relForm))
+ result = lappend_oid(result, relid);
+ }
+
+ table_endscan(scan);
+
+ table_close(classRel, AccessShareLock);
+ return result;
+}
+
/*
* Get publication using oid
*
@@ -426,10 +515,12 @@ GetPublication(Oid pubid)
pub->oid = pubid;
pub->name = pstrdup(NameStr(pubform->pubname));
pub->alltables = pubform->puballtables;
+ pub->allsequences = pubform->puballsequences;
pub->pubactions.pubinsert = pubform->pubinsert;
pub->pubactions.pubupdate = pubform->pubupdate;
pub->pubactions.pubdelete = pubform->pubdelete;
pub->pubactions.pubtruncate = pubform->pubtruncate;
+ pub->pubactions.pubsequence = pubform->pubsequence;
pub->pubviaroot = pubform->pubviaroot;
ReleaseSysCache(tup);
@@ -555,3 +646,56 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+
+/*
+ * Returns Oids of sequences in a publication.
+ */
+Datum
+pg_get_publication_sequences(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ Publication *publication;
+ List *sequences;
+
+ /* stuff done only on the first call of the function */
+ if (SRF_IS_FIRSTCALL())
+ {
+ MemoryContext oldcontext;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /* switch to memory context appropriate for multiple function calls */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ publication = GetPublicationByName(pubname, false);
+
+ /*
+ * Publications support partitioned tables, although all changes are
+ * replicated using leaf partition identity and schema, so we only
+ * need those.
+ */
+ if (publication->allsequences)
+ sequences = GetAllSequencesPublicationRelations();
+ else
+ sequences = GetPublicationSequenceRelations(publication->oid);
+
+ funcctx->user_fctx = (void *) sequences;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ /* stuff done on every call of the function */
+ funcctx = SRF_PERCALL_SETUP();
+ sequences = (List *) funcctx->user_fctx;
+
+ if (funcctx->call_cntr < list_length(sequences))
+ {
+ Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
+
+ SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 999d984068..16f050023a 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -372,6 +372,16 @@ CREATE VIEW pg_publication_tables AS
pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE C.oid = GPT.relid;
+CREATE VIEW pg_publication_sequences AS
+ SELECT
+ P.pubname AS pubname,
+ N.nspname AS schemaname,
+ C.relname AS sequencename
+ FROM pg_publication P,
+ LATERAL pg_get_publication_sequences(P.pubname) GPT,
+ pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
+ WHERE C.oid = GPT.relid;
+
CREATE VIEW pg_locks AS
SELECT * FROM pg_lock_status() AS L;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 95c253c8e0..5e5c5feab4 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -16,6 +16,7 @@
#include "access/genam.h"
#include "access/htup_details.h"
+#include "access/relation.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
@@ -54,6 +55,12 @@ static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
AlterPublicationStmt *stmt);
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
+static List *OpenSequenceList(List *sequences);
+static void CloseSequenceList(List *rels);
+static void PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+ AlterPublicationStmt *stmt);
+static void PublicationDropSequences(Oid pubid, List *rels, bool missing_ok);
+
static void
parse_publication_options(List *options,
bool *publish_given,
@@ -71,6 +78,7 @@ parse_publication_options(List *options,
pubactions->pubupdate = true;
pubactions->pubdelete = true;
pubactions->pubtruncate = true;
+ pubactions->pubsequence = true;
*publish_via_partition_root = false;
/* Parse options */
@@ -97,6 +105,7 @@ parse_publication_options(List *options,
pubactions->pubupdate = false;
pubactions->pubdelete = false;
pubactions->pubtruncate = false;
+ pubactions->pubsequence = false;
*publish_given = true;
publish = defGetString(defel);
@@ -119,6 +128,8 @@ parse_publication_options(List *options,
pubactions->pubdelete = true;
else if (strcmp(publish_opt, "truncate") == 0)
pubactions->pubtruncate = true;
+ else if (strcmp(publish_opt, "sequence") == 0)
+ pubactions->pubsequence = true;
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -210,6 +221,8 @@ CreatePublication(CreatePublicationStmt *stmt)
BoolGetDatum(pubactions.pubdelete);
values[Anum_pg_publication_pubtruncate - 1] =
BoolGetDatum(pubactions.pubtruncate);
+ values[Anum_pg_publication_pubsequence - 1] =
+ BoolGetDatum(pubactions.pubsequence);
values[Anum_pg_publication_pubviaroot - 1] =
BoolGetDatum(publish_via_partition_root);
@@ -374,9 +387,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
rels = OpenTableList(stmt->tables);
- if (stmt->tableAction == DEFELEM_ADD)
+ if (stmt->action == DEFELEM_ADD)
PublicationAddTables(pubid, rels, false, stmt);
- else if (stmt->tableAction == DEFELEM_DROP)
+ else if (stmt->action == DEFELEM_DROP)
PublicationDropTables(pubid, rels, false);
else /* DEFELEM_SET */
{
@@ -427,6 +440,82 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
CloseTableList(rels);
}
+/*
+ * Add or remove sequence to/from publication.
+ */
+static void
+AlterPublicationSequences(AlterPublicationStmt *stmt, Relation rel,
+ HeapTuple tup)
+{
+ List *rels = NIL;
+ Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+ Oid pubid = pubform->oid;
+
+ /* Check that user is allowed to manipulate the publication tables. */
+ if (pubform->puballsequences)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
+ NameStr(pubform->pubname)),
+ errdetail("Sequences cannot be added to or dropped from FOR ALL TABLES publications.")));
+
+ Assert(list_length(stmt->sequences) > 0);
+
+ rels = OpenSequenceList(stmt->sequences);
+
+ if (stmt->action == DEFELEM_ADD)
+ PublicationAddSequences(pubid, rels, false, stmt);
+ else if (stmt->action == DEFELEM_DROP)
+ PublicationDropSequences(pubid, rels, false);
+ else /* DEFELEM_SET */
+ {
+ List *oldrelids = GetPublicationRelations(pubid,
+ PUBLICATION_PART_ROOT);
+ List *delrels = NIL;
+ ListCell *oldlc;
+
+ /* Calculate which relations to drop. */
+ foreach(oldlc, oldrelids)
+ {
+ Oid oldrelid = lfirst_oid(oldlc);
+ ListCell *newlc;
+ bool found = false;
+
+ foreach(newlc, rels)
+ {
+ Relation newrel = (Relation) lfirst(newlc);
+
+ if (RelationGetRelid(newrel) == oldrelid)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ {
+ Relation oldrel = relation_open(oldrelid,
+ ShareUpdateExclusiveLock);
+
+ delrels = lappend(delrels, oldrel);
+ }
+ }
+
+ /* And drop them. */
+ PublicationDropSequences(pubid, delrels, true);
+
+ /*
+ * Don't bother calculating the difference for adding, we'll catch and
+ * skip existing ones when doing catalog update.
+ */
+ PublicationAddSequences(pubid, rels, true, stmt);
+
+ CloseSequenceList(delrels);
+ }
+
+ CloseSequenceList(rels);
+}
+
/*
* Alter the existing publication.
*
@@ -460,8 +549,12 @@ AlterPublication(AlterPublicationStmt *stmt)
if (stmt->options)
AlterPublicationOptions(stmt, rel, tup);
- else
+ else if (stmt->tables)
AlterPublicationTables(stmt, rel, tup);
+ else if (stmt->sequences)
+ AlterPublicationSequences(stmt, rel, tup);
+ else
+ Assert(false);
/* Cleanup. */
heap_freetuple(tup);
@@ -666,6 +759,139 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
}
}
+/*
+ * Open relations specified by a RangeVar list.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
+ */
+static List *
+OpenSequenceList(List *sequences)
+{
+ List *relids = NIL;
+ List *rels = NIL;
+ ListCell *lc;
+
+ /*
+ * Open, share-lock, and check all the explicitly-specified relations
+ */
+ foreach(lc, sequences)
+ {
+ RangeVar *rv = castNode(RangeVar, lfirst(lc));
+ Relation rel;
+ Oid myrelid;
+
+ /* Allow query cancel in case this takes a long time */
+ CHECK_FOR_INTERRUPTS();
+
+ rel = relation_openrv(rv, ShareUpdateExclusiveLock);
+ myrelid = RelationGetRelid(rel);
+
+ /*
+ * Filter out duplicates if user specifies "foo, foo".
+ *
+ * Note that this algorithm is known to not be very efficient (O(N^2))
+ * but given that it only works on list of sequences given to us by user
+ * it's deemed acceptable.
+ */
+ if (list_member_oid(relids, myrelid))
+ {
+ relation_close(rel, ShareUpdateExclusiveLock);
+ continue;
+ }
+
+ rels = lappend(rels, rel);
+ relids = lappend_oid(relids, myrelid);
+ }
+
+ list_free(relids);
+
+ return rels;
+}
+
+/*
+ * Close all relations in the list.
+ */
+static void
+CloseSequenceList(List *rels)
+{
+ ListCell *lc;
+
+ foreach(lc, rels)
+ {
+ Relation rel = (Relation) lfirst(lc);
+
+ relation_close(rel, NoLock);
+ }
+}
+
+/*
+ * Add listed tables to the publication.
+ */
+static void
+PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+ AlterPublicationStmt *stmt)
+{
+ ListCell *lc;
+
+ Assert(!stmt || !stmt->for_all_sequences);
+
+ foreach(lc, rels)
+ {
+ Relation rel = (Relation) lfirst(lc);
+ ObjectAddress obj;
+
+ /* Must be owner of the table or superuser. */
+ if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
+ RelationGetRelationName(rel));
+
+ obj = publication_add_relation(pubid, rel, if_not_exists);
+ if (stmt)
+ {
+ EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+ (Node *) stmt);
+
+ InvokeObjectPostCreateHook(PublicationRelRelationId,
+ obj.objectId, 0);
+ }
+ }
+}
+
+/*
+ * Remove listed sequences from the publication.
+ */
+static void
+PublicationDropSequences(Oid pubid, List *rels, bool missing_ok)
+{
+ ObjectAddress obj;
+ ListCell *lc;
+ Oid prid;
+
+ foreach(lc, rels)
+ {
+ Relation rel = (Relation) lfirst(lc);
+ Oid relid = RelationGetRelid(rel);
+
+ prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(pubid));
+ if (!OidIsValid(prid))
+ {
+ if (missing_ok)
+ continue;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("relation \"%s\" is not part of the publication",
+ RelationGetRelationName(rel))));
+ }
+
+ ObjectAddressSet(obj, PublicationRelRelationId, prid);
+ performDeletion(&obj, DROP_CASCADE, 0);
+ }
+}
+
+
/*
* Internal workhorse for changing a publication owner
*/
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 0415df9ccb..6898a45365 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -94,7 +94,7 @@ static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */
*/
static SeqTableData *last_used_seq = NULL;
-static void fill_seq_with_data(Relation rel, HeapTuple tuple);
+static void fill_seq_with_data(Relation rel, HeapTuple tuple, bool create);
static Relation lock_and_open_sequence(SeqTable seq);
static void create_seq_hashtable(void);
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
@@ -222,7 +222,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
/* now initialize the sequence's data */
tuple = heap_form_tuple(tupDesc, value, null);
- fill_seq_with_data(rel, tuple);
+ fill_seq_with_data(rel, tuple, true);
/* process OWNED BY if given */
if (owned_by)
@@ -327,7 +327,86 @@ ResetSequence(Oid seq_relid)
/*
* Insert the modified tuple into the new storage file.
*/
- fill_seq_with_data(seq_rel, tuple);
+ fill_seq_with_data(seq_rel, tuple, true);
+
+ /* Clear local cache so that we don't think we have cached numbers */
+ /* Note that we do not change the currval() state */
+ elm->cached = elm->last;
+
+ relation_close(seq_rel, NoLock);
+}
+
+/*
+ * Reset a sequence to its initial value.
+ *
+ * The change is made transactionally, so that on failure of the current
+ * transaction, the sequence will be restored to its previous state.
+ * We do that by creating a whole new relfilenode for the sequence; so this
+ * works much like the rewriting forms of ALTER TABLE.
+ *
+ * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
+ * which must not be released until end of transaction. Caller is also
+ * responsible for permissions checking.
+ */
+void
+ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
+{
+ Relation seq_rel;
+ SeqTable elm;
+ Form_pg_sequence_data seq;
+ Buffer buf;
+ HeapTupleData seqdatatuple;
+ HeapTuple tuple;
+
+ /*
+ * Read the old sequence. This does a bit more work than really
+ * necessary, but it's simple, and we do want to double-check that it's
+ * indeed a sequence.
+ */
+ init_sequence(seq_relid, &elm, &seq_rel);
+ (void) read_seq_tuple(seq_rel, &buf, &seqdatatuple);
+
+ /*
+ * Copy the existing sequence tuple.
+ */
+ tuple = heap_copytuple(&seqdatatuple);
+
+ /* Now we're done with the old page */
+ UnlockReleaseBuffer(buf);
+
+ /*
+ * Modify the copied tuple to execute the restart (compare the RESTART
+ * action in AlterSequence)
+ */
+ seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+ seq->last_value = last_value;
+ seq->is_called = is_called;
+ seq->log_cnt = log_cnt;
+
+ /*
+ * Create a new storage file for the sequence.
+ */
+ RelationSetNewRelfilenode(seq_rel, seq_rel->rd_rel->relpersistence);
+
+ /*
+ * Ensure sequence's relfrozenxid is at 0, since it won't contain any
+ * unfrozen XIDs. Same with relminmxid, since a sequence will never
+ * contain multixacts.
+ */
+ Assert(seq_rel->rd_rel->relfrozenxid == InvalidTransactionId);
+ Assert(seq_rel->rd_rel->relminmxid == InvalidMultiXactId);
+
+ /*
+ * Insert the modified tuple into the new storage file.
+ *
+ * XXX Maybe this should also use created=true, just like the other places
+ * calling fill_seq_with_data. That's probably needed for correct cascading
+ * replication.
+ *
+ * XXX That'd mean all fill_seq_with_data callers use created=true, making
+ * the parameter unnecessary.
+ */
+ fill_seq_with_data(seq_rel, tuple, false);
/* Clear local cache so that we don't think we have cached numbers */
/* Note that we do not change the currval() state */
@@ -340,7 +419,7 @@ ResetSequence(Oid seq_relid)
* Initialize a sequence's relation with the specified tuple as content
*/
static void
-fill_seq_with_data(Relation rel, HeapTuple tuple)
+fill_seq_with_data(Relation rel, HeapTuple tuple, bool create)
{
Buffer buf;
Page page;
@@ -378,8 +457,21 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
/* check the comment above nextval_internal()'s equivalent call. */
if (RelationNeedsWAL(rel))
+ {
GetTopTransactionId();
+ /*
+ * Ensure we have a proper XID, which will be included in the XLOG
+ * record by XLogRecordAssemble. Otherwise the first nextval() in
+ * a subxact (without any preceding changes) would get XID 0,
+ * and it'd be impossible to decide which top xact it belongs to.
+ * It'd also trigger assert in DecodeSequence.
+ *
+ * XXX Not sure if this is the best solution.
+ */
+ GetCurrentTransactionId();
+ }
+
START_CRIT_SECTION();
MarkBufferDirty(buf);
@@ -399,6 +491,7 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
xlrec.node = rel->rd_node;
+ xlrec.created = create;
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) tuple->t_data, tuple->t_len);
@@ -502,7 +595,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
/*
* Insert the modified tuple into the new storage file.
*/
- fill_seq_with_data(seqrel, newdatatuple);
+ fill_seq_with_data(seqrel, newdatatuple, true);
}
/* process OWNED BY if given */
@@ -766,8 +859,21 @@ nextval_internal(Oid relid, bool check_permissions)
* (Have to do that here, so we're outside the critical section)
*/
if (logit && RelationNeedsWAL(seqrel))
+ {
GetTopTransactionId();
+ /*
+ * Ensure we have a proper XID, which will be included in the XLOG
+ * record by XLogRecordAssemble. Otherwise the first nextval() in
+ * a subxact (without any preceding changes) would get XID 0,
+ * and it'd be impossible to decide which top xact it belongs to.
+ * It'd also trigger assert in DecodeSequence.
+ *
+ * XXX Not sure if this is the best solution.
+ */
+ GetCurrentTransactionId();
+ }
+
/* ready to change the on-disk (or really, in-buffer) tuple */
START_CRIT_SECTION();
@@ -803,6 +909,7 @@ nextval_internal(Oid relid, bool check_permissions)
seq->log_cnt = 0;
xlrec.node = seqrel->rd_node;
+ xlrec.created = false;
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -977,8 +1084,21 @@ do_setval(Oid relid, int64 next, bool iscalled)
/* check the comment above nextval_internal()'s equivalent call. */
if (RelationNeedsWAL(seqrel))
+ {
GetTopTransactionId();
+ /*
+ * Ensure we have a proper XID, which will be included in the XLOG
+ * record by XLogRecordAssemble. Otherwise the first nextval() in
+ * a subxact (without any preceding changes) would get XID 0,
+ * and it'd be impossible to decide which top xact it belongs to.
+ * It'd also trigger assert in DecodeSequence.
+ *
+ * XXX Not sure if this is the best solution.
+ */
+ GetCurrentTransactionId();
+ }
+
/* ready to change the on-disk (or really, in-buffer) tuple */
START_CRIT_SECTION();
@@ -999,6 +1119,8 @@ do_setval(Oid relid, int64 next, bool iscalled)
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
xlrec.node = seqrel->rd_node;
+ xlrec.created = false;
+
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 8aa6de1785..cc54be7a99 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -47,6 +47,7 @@
#include "utils/syscache.h"
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -461,6 +462,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
char *err;
WalReceiverConn *wrconn;
List *tables;
+ List *sequences;
ListCell *lc;
char table_state;
@@ -498,6 +500,26 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
InvalidXLogRecPtr);
}
+ /*
+ * Get the sequence list from publisher and build local sequence
+ * status info.
+ */
+ sequences = fetch_sequence_list(wrconn, publications);
+ foreach(lc, sequences)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+ /* Check for supported relkind. */
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+
+ AddSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
+ }
+
/*
* If requested, create permanent slot for the subscription. We
* won't use the initial snapshot for anything, so no need to
@@ -644,6 +666,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
{
Oid relid = subrel_local_oids[off];
+ /* XXX ignore sequences - maybe do this in GetSubscriptionRelations? */
+ if (get_rel_relkind(relid) == RELKIND_SEQUENCE)
+ continue;
+
if (!bsearch(&relid, pubrel_local_oids,
list_length(pubrel_names), sizeof(Oid), oid_cmp))
{
@@ -735,6 +761,185 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
}
}
+
+ /*
+ * XXX now do the same thing for sequences, maybe before the preceding
+ * block, or earlier?
+ */
+
+ /* Get the table list from publisher. */
+ pubrel_names = fetch_sequence_list(wrconn, sub->publications);
+
+ /* Get local table list. */
+ subrel_states = GetSubscriptionRelations(sub->oid);
+
+ /*
+ * Build qsorted array of local table oids for faster lookup. This can
+ * potentially contain all tables in the database so speed of lookup
+ * is important.
+ */
+ subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+ off = 0;
+ foreach(lc, subrel_states)
+ {
+ SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+
+ subrel_local_oids[off++] = relstate->relid;
+ }
+ qsort(subrel_local_oids, list_length(subrel_states),
+ sizeof(Oid), oid_cmp);
+
+ /*
+ * Rels that we want to remove from subscription and drop any slots
+ * and origins corresponding to them.
+ */
+ sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+
+ /*
+ * Walk over the remote tables and try to match them to locally known
+ * tables. If the table is not known locally create a new state for
+ * it.
+ *
+ * Also builds array of local oids of remote tables for the next step.
+ */
+ off = 0;
+ pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+ foreach(lc, pubrel_names)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+ /* Check for supported relkind. */
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+
+ pubrel_local_oids[off++] = relid;
+
+ if (!bsearch(&relid, subrel_local_oids,
+ list_length(subrel_states), sizeof(Oid), oid_cmp))
+ {
+ AddSubscriptionRelState(sub->oid, relid,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
+ rv->schemaname, rv->relname, sub->name)));
+ }
+ }
+
+ /*
+ * Next remove state for tables we should not care about anymore using
+ * the data we collected above
+ */
+ qsort(pubrel_local_oids, list_length(pubrel_names),
+ sizeof(Oid), oid_cmp);
+
+ remove_rel_len = 0;
+ for (off = 0; off < list_length(subrel_states); off++)
+ {
+ Oid relid = subrel_local_oids[off];
+
+ /* XXX ignore non-sequences - maybe do this in GetSubscriptionRelations? */
+ if (get_rel_relkind(relid) != RELKIND_SEQUENCE)
+ continue;
+
+ if (!bsearch(&relid, pubrel_local_oids,
+ list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ {
+ char state;
+ XLogRecPtr statelsn;
+
+ /*
+ * Lock pg_subscription_rel with AccessExclusiveLock to
+ * prevent any race conditions with the apply worker
+ * re-launching workers at the same time this code is trying
+ * to remove those tables.
+ *
+ * Even if new worker for this particular rel is restarted it
+ * won't be able to make any progress as we hold exclusive
+ * lock on subscription_rel till the transaction end. It will
+ * simply exit as there is no corresponding rel entry.
+ *
+ * This locking also ensures that the state of rels won't
+ * change till we are done with this refresh operation.
+ */
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+ /* Last known rel state. */
+ state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+
+ sub_remove_rels[remove_rel_len].relid = relid;
+ sub_remove_rels[remove_rel_len++].state = state;
+
+ elog(WARNING, "B: remove rel %d", relid);
+
+ RemoveSubscriptionRel(sub->oid, relid);
+
+ logicalrep_worker_stop(sub->oid, relid);
+
+ /*
+ * For READY state, we would have already dropped the
+ * tablesync origin.
+ */
+ if (state != SUBREL_STATE_READY)
+ {
+ char originname[NAMEDATALEN];
+
+ /*
+ * Drop the tablesync's origin tracking if exists.
+ *
+ * It is possible that the origin is not yet created for
+ * tablesync worker, this can happen for the states before
+ * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+ * concurrently try to drop the origin and by this time
+ * the origin might be already removed. For these reasons,
+ * passing missing_ok = true.
+ */
+ ReplicationOriginNameForTablesync(sub->oid, relid, originname,
+ sizeof(originname));
+ replorigin_drop_by_name(originname, true, false);
+ }
+
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name)));
+ }
+ }
+
+ /*
+ * Drop the tablesync slots associated with removed tables. This has
+ * to be at the end because otherwise if there is an error while doing
+ * the database operations we won't be able to rollback dropped slots.
+ */
+ for (off = 0; off < remove_rel_len; off++)
+ {
+ if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
+ sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ /*
+ * For READY/SYNCDONE states we know the tablesync slot has
+ * already been dropped by the tablesync worker.
+ *
+ * For other states, there is no certainty, maybe the slot
+ * does not exist yet. Also, if we fail after removing some of
+ * the slots, next time, it will again try to drop already
+ * dropped slots and fail. For these reasons, we allow
+ * missing_ok = true for the drop.
+ */
+ ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
+ syncslotname, sizeof(syncslotname));
+ ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+ }
+ }
+
}
PG_FINALLY();
{
@@ -1534,6 +1739,75 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
return tablelist;
}
+/*
+ * Get the list of sequences which belong to specified publications on the
+ * publisher connection.
+ */
+static List *
+fetch_sequence_list(WalReceiverConn *wrconn, List *publications)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[2] = {TEXTOID, TEXTOID};
+ ListCell *lc;
+ bool first;
+ List *tablelist = NIL;
+
+ Assert(list_length(publications) > 0);
+
+ initStringInfo(&cmd);
+ appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n"
+ " FROM pg_catalog.pg_publication_sequences s\n"
+ " WHERE s.pubname IN (");
+ first = true;
+ foreach(lc, publications)
+ {
+ char *pubname = strVal(lfirst(lc));
+
+ if (first)
+ first = false;
+ else
+ appendStringInfoString(&cmd, ", ");
+
+ appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+ }
+ appendStringInfoChar(&cmd, ')');
+
+ res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not receive list of replicated tables from the publisher: %s",
+ res->err)));
+
+ /* Process tables. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *nspname;
+ char *relname;
+ bool isnull;
+ RangeVar *rv;
+
+ nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+ relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ rv = makeRangeVar(nspname, relname, -1);
+ tablelist = lappend(tablelist, rv);
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+
+ return tablelist;
+}
+
/*
* This is to report the connection failure while dropping replication slots.
* Here, we report the WARNING for all tablesync slots so that user can drop
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 1e285e0349..a1c7880be6 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -619,11 +619,11 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
errdetail("\"%s.%s\" is a foreign table.",
nspname, relname)));
- if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+ if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot use relation \"%s.%s\" as logical replication target",
nspname, relname),
- errdetail("\"%s.%s\" is not a table.",
+ errdetail("\"%s.%s\" is not a table or a sequence.",
nspname, relname)));
}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index bd87f23784..86d92de46e 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4833,8 +4833,10 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from)
COPY_STRING_FIELD(pubname);
COPY_NODE_FIELD(options);
COPY_NODE_FIELD(tables);
+ COPY_NODE_FIELD(sequences);
COPY_SCALAR_FIELD(for_all_tables);
- COPY_SCALAR_FIELD(tableAction);
+ COPY_SCALAR_FIELD(for_all_sequences);
+ COPY_SCALAR_FIELD(action);
return newnode;
}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index dba3e6b31e..384ec10f9f 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2302,8 +2302,10 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a,
COMPARE_STRING_FIELD(pubname);
COMPARE_NODE_FIELD(options);
COMPARE_NODE_FIELD(tables);
+ COMPARE_NODE_FIELD(sequences);
COMPARE_SCALAR_FIELD(for_all_tables);
- COMPARE_SCALAR_FIELD(tableAction);
+ COMPARE_SCALAR_FIELD(for_all_sequences);
+ COMPARE_SCALAR_FIELD(action);
return true;
}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index eb24195438..d91dbd325e 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -434,6 +434,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <node> group_by_item empty_grouping_set rollup_clause cube_clause
%type <node> grouping_sets_clause
%type <node> opt_publication_for_tables publication_for_tables
+/* %type <node> opt_publication_for_sequences publication_for_sequences */
%type <list> opt_fdw_options fdw_options
%type <defelt> fdw_option
@@ -9588,6 +9589,7 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec
*****************************************************************************/
CreatePublicationStmt:
+ /* CREATE PUBLICATION name opt_publication_for_tables opt_publication_for_sequences opt_definition */
CREATE PUBLICATION name opt_publication_for_tables opt_definition
{
CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
@@ -9622,6 +9624,12 @@ publication_for_tables:
}
;
+/*
+ * FIXME
+ *
+ * opt_publication_for_sequences and publication_for_sequences should be
+ * copies for sequences
+ */
/*****************************************************************************
*
@@ -9633,6 +9641,12 @@ publication_for_tables:
*
* ALTER PUBLICATION name SET TABLE table [, table2]
*
+ * ALTER PUBLICATION name ADD SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name DROP SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name SET SEQUENCE sequence [, sequence2]
+ *
*****************************************************************************/
AlterPublicationStmt:
@@ -9648,7 +9662,7 @@ AlterPublicationStmt:
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
- n->tableAction = DEFELEM_ADD;
+ n->action = DEFELEM_ADD;
$$ = (Node *)n;
}
| ALTER PUBLICATION name SET TABLE relation_expr_list
@@ -9656,7 +9670,7 @@ AlterPublicationStmt:
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
- n->tableAction = DEFELEM_SET;
+ n->action = DEFELEM_SET;
$$ = (Node *)n;
}
| ALTER PUBLICATION name DROP TABLE relation_expr_list
@@ -9664,7 +9678,31 @@ AlterPublicationStmt:
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
- n->tableAction = DEFELEM_DROP;
+ n->action = DEFELEM_DROP;
+ $$ = (Node *)n;
+ }
+ | ALTER PUBLICATION name ADD_P SEQUENCE relation_expr_list
+ {
+ AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+ n->pubname = $3;
+ n->sequences = $6;
+ n->action = DEFELEM_ADD;
+ $$ = (Node *)n;
+ }
+ | ALTER PUBLICATION name SET SEQUENCE relation_expr_list
+ {
+ AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+ n->pubname = $3;
+ n->sequences = $6;
+ n->action = DEFELEM_SET;
+ $$ = (Node *)n;
+ }
+ | ALTER PUBLICATION name DROP SEQUENCE relation_expr_list
+ {
+ AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+ n->pubname = $3;
+ n->sequences = $6;
+ n->action = DEFELEM_DROP;
$$ = (Node *)n;
}
;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 70670169ac..f6296941d9 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -42,6 +42,7 @@
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "storage/standby.h"
+#include "commands/sequence.h"
typedef struct XLogRecordBuffer
{
@@ -74,10 +75,11 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
bool two_phase);
static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_prepare *parsed);
-
+static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
+static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
/* helper functions for decoding transactions */
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -158,6 +160,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
DecodeLogicalMsgOp(ctx, &buf);
break;
+ case RM_SEQ_ID:
+ DecodeSequence(ctx, &buf);
+ break;
+
/*
* Rmgrs irrelevant for logical decoding; they describe stuff not
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -173,7 +179,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
case RM_HASH_ID:
case RM_GIN_ID:
case RM_GIST_ID:
- case RM_SEQ_ID:
case RM_SPGIST_ID:
case RM_BRIN_ID:
case RM_COMMIT_TS_ID:
@@ -1315,3 +1320,125 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
ctx->fast_forward || FilterByOrigin(ctx, origin_id));
}
+
+/*
+ * Decode Sequence Tuple
+ */
+static void
+DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
+{
+ int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
+
+ Assert(datalen >= 0);
+
+ tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;;
+
+ ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+ tuple->tuple.t_tableOid = InvalidOid;
+
+ memcpy(((char *) tuple->tuple.t_data),
+ data + sizeof(xl_seq_rec),
+ SizeofHeapTupleHeader);
+
+ memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
+ data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
+ datalen);
+}
+
+/*
+ * Handle sequence decode
+ *
+ * Decoding sequences is a bit tricky, because while most sequence actions
+ * are non-transactional (not subject to rollback), some need to be handled
+ * as transactional.
+ *
+ * By default, a sequence increment is non-transactional - we must not queue
+ * it in a transaction as other changes, because the transaction might get
+ * rolled back and we'd discard the increment. The downstream would not be
+ * notified about the increment, which is wrong.
+ *
+ * On the other hand, the sequence may be created in a transaction. In this
+ * case we *should* queue the change as other changes in the transaction,
+ * because we don't want to send the increments for unknown sequence to the
+ * plugin - it might get confused about which sequence it's related to etc.
+ */
+static void
+DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBufferTupleBuf *tuplebuf;
+ RelFileNode target_node;
+ XLogReaderState *r = buf->record;
+ char *tupledata = NULL;
+ Size tuplelen;
+ Size datalen = 0;
+ TransactionId xid = XLogRecGetXid(r);
+ uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+ xl_seq_rec *xlrec;
+ Snapshot snapshot;
+ RepOriginId origin_id = XLogRecGetOrigin(r);
+ bool transactional;
+
+ /* only decode changes flagged with XLOG_SEQ_LOG */
+ if (info != XLOG_SEQ_LOG)
+ elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
+
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding messages.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
+ return;
+
+ /* only interested in our database */
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ tupledata = XLogRecGetData(r);
+ datalen = XLogRecGetDataLen(r);
+ tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
+
+ /* extract the WAL record, with "created" flag */
+ xlrec = (xl_seq_rec *) XLogRecGetData(r);
+
+ /* XXX how could we have sequence change without data? */
+ if(!datalen || !tupledata)
+ return;
+
+ tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+ DecodeSeqTuple(tupledata, datalen, tuplebuf);
+
+ /*
+ * Should we handle the sequence increment as transactional or not?
+ *
+ * If the sequence was created in a still-running transaction, treat
+ * it as transactional and queue the increments. Otherwise it needs
+ * to be treated as non-transactional, in which case we send it to
+ * the plugin right away.
+ */
+ transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
+ target_node,
+ xlrec->created);
+
+ /* Skip the change if already processed (per the snapshot). */
+ if (transactional &&
+ !SnapBuildProcessChange(builder, xid, buf->origptr))
+ return;
+ else if (!transactional &&
+ (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+ SnapBuildXactNeedsSkip(builder, buf->origptr)))
+ return;
+
+ /* Queue the increment (or send immediately if not transactional). */
+ snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+ ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
+ origin_id, target_node, transactional,
+ xlrec->created, tuplebuf);
+}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ffc6160e9f..66a4a4603a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called);
/* streaming callbacks */
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -217,6 +221,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->apply_truncate = truncate_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
+ ctx->reorder->sequence = sequence_cb_wrapper;
/*
* To support streaming, we require start/stop/abort/commit/change
@@ -1179,6 +1184,43 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ if (ctx->callbacks.sequence_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "sequence";
+ state.report_location = sequence_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = sequence_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+ created, last_value, log_cnt, is_called);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr first_lsn)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1cf59e0fb0..f9e928f906 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -389,6 +389,58 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
pq_sendbytes(out, message, sz);
}
+/*
+ * Write SEQUENCE to stream
+ */
+void
+logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
+ XLogRecPtr lsn, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ uint8 flags = 0;
+ char *relname;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ pq_sendint8(out, flags);
+ pq_sendint64(out, lsn);
+
+ logicalrep_write_namespace(out, RelationGetNamespace(rel));
+ relname = RelationGetRelationName(rel);
+ pq_sendstring(out, relname);
+
+ pq_sendint8(out, transactional);
+ pq_sendint8(out, created);
+ pq_sendint64(out, last_value);
+ pq_sendint64(out, log_cnt);
+ pq_sendint8(out, is_called);
+}
+
+/*
+ * Read SEQUENCE from the stream.
+ */
+void
+logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
+{
+ /* XXX skipping flags and lsn */
+ pq_getmsgint(in, 1);
+ pq_getmsgint64(in);
+
+ /* Read relation name from stream */
+ seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
+ seqdata->seqname = pstrdup(pq_getmsgstring(in));
+
+ seqdata->transactional = pq_getmsgint(in, 1);
+ seqdata->created = pq_getmsgint(in, 1);
+ seqdata->last_value = pq_getmsgint64(in);
+ seqdata->log_cnt = pq_getmsgint64(in);
+ seqdata->is_called = pq_getmsgint(in, 1);
+}
+
/*
* Write relation description to the output stream.
*/
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index f96029f15a..d18fd30ced 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -116,6 +116,13 @@ typedef struct ReorderBufferTXNByIdEnt
ReorderBufferTXN *txn;
} ReorderBufferTXNByIdEnt;
+/* entry for hash table we use to track sequences created in running xacts */
+typedef struct ReorderBufferSequenceEnt
+{
+ RelFileNode rnode;
+ TransactionId xid;
+} ReorderBufferSequenceEnt;
+
/* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
typedef struct ReorderBufferTupleCidKey
{
@@ -337,6 +344,14 @@ ReorderBufferAllocate(void)
buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ /* hash table of sequences, mapping relfilenode to XID of transaction */
+ hash_ctl.keysize = sizeof(RelFileNode);
+ hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
+ hash_ctl.hcxt = buffer->context;
+
+ buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
buffer->by_txn_last_xid = InvalidTransactionId;
buffer->by_txn_last_txn = NULL;
@@ -523,6 +538,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
break;
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ if (change->data.sequence.tuple)
+ {
+ ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
+ change->data.sequence.tuple = NULL;
+ }
+ break;
}
pfree(change);
@@ -850,6 +872,212 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
}
}
+/*
+ * Treat the sequence increment as transactional?
+ */
+bool
+ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+ RelFileNode rnode, bool created)
+{
+ bool found = false;
+
+ if (created)
+ return true;
+
+ hash_search(rb->sequences,
+ (void *) &rnode,
+ HASH_FIND,
+ &found);
+
+ return found;
+}
+
+
+/*
+ * A transactional sequence increment is queued to be processed upon commit
+ * and a non-transactional increment gets processed immediately.
+ *
+ * A sequence update may be both transactional and non-transactional. When
+ * created in a running transaction, treat it as transactional and queue
+ * the change in it. Otherwise treat it as non-transactional, so that we
+ * don't forget the increment in case of a rollback.
+ */
+void
+ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+ Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+ RelFileNode rnode, bool transactional, bool created,
+ ReorderBufferTupleBuf *tuplebuf)
+{
+ /*
+ * Change needs to be handled as transactional, because the sequence was
+ * created in a transaction that is still running. In that case all the
+ * changes need to be queued in that transaction, we must not send them
+ * to the downstream until the transaction commits.
+ *
+ * There's a bit of a trouble with subtransactions - we can't queue it
+ * into the subxact, because it might be rolled back and we'd lose the
+ * increment. We need to queue it into the same (sub)xact that created
+ * the sequence, which is why we track the XID in the hash table.
+ */
+ if (transactional)
+ {
+ MemoryContext oldcontext;
+ ReorderBufferChange *change;
+
+ /* lookup sequence by relfilenode */
+ ReorderBufferSequenceEnt *ent;
+ bool found;
+
+ /* transactional changes require a transaction */
+ Assert(xid != InvalidTransactionId);
+
+ /* search the lookup table (we ignore the return value, found is enough) */
+ ent = hash_search(rb->sequences,
+ (void *) &rnode,
+ created ? HASH_ENTER : HASH_FIND,
+ &found);
+
+ /*
+ * If this is the "create" increment, we must not have found any
+ * pre-existing entry in the hash table (i.e. there must not be
+ * any conflicting sequence).
+ */
+ Assert(!(created && found));
+
+ /* But we must have either created or found an existing entry. */
+ Assert(created || found);
+
+ /*
+ * When creating the sequence, remember the XID of the transaction
+ * that created id.
+ */
+ if (created)
+ ent->xid = xid;
+
+ /* XXX Maybe check that we're still in the same top-level xact? */
+
+ /* OK, allocate and queue the change */
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ change = ReorderBufferGetChange(rb);
+
+ change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
+ change->origin_id = origin_id;
+
+ memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
+
+ change->data.sequence.created = created;
+ change->data.sequence.tuple = tuplebuf;
+
+ /* add it to the same subxact that created the sequence */
+ ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ /*
+ * This increment is for a sequence that was not created in any
+ * running transaction, so we treat it as non-transactional and
+ * just send it to the output plugin directly.
+ */
+ ReorderBufferTXN *txn = NULL;
+ volatile Snapshot snapshot_now = snapshot;
+ bool using_subtxn;
+
+#ifdef USE_ASSERT_CHECKING
+ /* All "creates" have to be handled as transactional. */
+ Assert(!created);
+
+ /* Make sure the sequence is not in the hash table. */
+ {
+ bool found;
+ hash_search(rb->sequences,
+ (void *) &rnode,
+ HASH_FIND, &found);
+ Assert(!found);
+ }
+#endif
+
+ if (xid != InvalidTransactionId)
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ /* setup snapshot to allow catalog access */
+ SetupHistoricSnapshot(snapshot_now, NULL);
+
+ /*
+ * Decoding needs access to syscaches et al., which in turn use
+ * heavyweight locks and such. Thus we need to have enough state around to
+ * keep track of those. The easiest way is to simply use a transaction
+ * internally. That also allows us to easily enforce that nothing writes
+ * to the database by checking for xid assignments.
+ *
+ * When we're called via the SQL SRF there's already a transaction
+ * started, so start an explicit subtransaction there.
+ */
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ PG_TRY();
+ {
+ Relation relation;
+ HeapTuple tuple;
+ bool isnull;
+ int64 last_value, log_cnt;
+ bool is_called;
+ Oid reloid;
+
+ if (using_subtxn)
+ BeginInternalSubTransaction("sequence");
+ else
+ StartTransactionCommand();
+
+ reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
+
+ if (reloid == InvalidOid)
+ elog(ERROR, "could not map filenode \"%s\" to relation OID",
+ relpathperm(rnode,
+ MAIN_FORKNUM));
+
+ relation = RelationIdGetRelation(reloid);
+ tuple = &tuplebuf->tuple;
+
+ /*
+ * Extract the internal sequence values, describing the state.
+ *
+ * XXX Seems a bit strange to access it directly. Maybe there's
+ * a better / more correct way?
+ */
+ last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull);
+ log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
+ is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
+
+ rb->sequence(rb, txn, lsn, relation, transactional, created,
+ last_value, log_cnt, is_called);
+
+ RelationClose(relation);
+
+ TeardownHistoricSnapshot(false);
+
+ AbortCurrentTransaction();
+
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+ }
+ PG_CATCH();
+ {
+ TeardownHistoricSnapshot(true);
+
+ AbortCurrentTransaction();
+
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+}
+
/*
* AssertTXNLsnOrder
* Verify LSN ordering of transaction lists in the reorderbuffer
@@ -1526,6 +1754,31 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
&found);
Assert(found);
+ /*
+ * Remove sequences created in this transaction (if any).
+ *
+ * There's no way to search by XID, so we simply do a seqscan of all
+ * the entries in the hash table. Hopefully there are only a couple
+ * entries in most cases - people generally don't create many new
+ * sequences over and over.
+ */
+ {
+ HASH_SEQ_STATUS scan_status;
+ ReorderBufferSequenceEnt *ent;
+
+ hash_seq_init(&scan_status, rb->sequences);
+ while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
+ {
+ /* skip sequences not from this transaction */
+ if (ent->xid != txn->xid)
+ continue;
+
+ (void) hash_search(rb->sequences,
+ (void *) &(ent->rnode),
+ HASH_REMOVE, NULL);
+ }
+ }
+
/* remove entries spilled to disk */
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
@@ -1941,6 +2194,39 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
change->data.msg.message);
}
+/*
+ * Helper function for ReorderBufferProcessTXN for applying sequences.
+ */
+static inline void
+ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change,
+ bool streaming)
+{
+ HeapTuple tuple;
+ bool isnull;
+ int64 last_value, log_cnt;
+ bool is_called;
+
+ /* FIXME support streaming */
+ Assert(!streaming);
+
+ tuple = &change->data.sequence.tuple->tuple;
+
+ /*
+ * Extract the internal sequence values, describing the state.
+ *
+ * XXX Seems a bit strange to access it directly. Maybe there's
+ * a better / more correct way?
+ */
+ last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull);
+ log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
+ is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
+
+ rb->sequence(rb, txn, change->lsn, relation, true, /* gotta be transactional */
+ change->data.sequence.created,
+ last_value, log_cnt, is_called);
+}
+
/*
* Function to store the command id and snapshot at the end of the current
* stream so that we can reuse the same while sending the next stream.
@@ -2357,6 +2643,31 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
elog(ERROR, "tuplecid value in changequeue");
break;
+
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ Assert(snapshot_now);
+
+ reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
+ change->data.sequence.relnode.relNode);
+
+ if (reloid == InvalidOid)
+ elog(ERROR, "could not map filenode \"%s\" to relation OID",
+ relpathperm(change->data.sequence.relnode,
+ MAIN_FORKNUM));
+
+ relation = RelationIdGetRelation(reloid);
+
+ if (!RelationIsValid(relation))
+ elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+ reloid,
+ relpathperm(change->data.sequence.relnode,
+ MAIN_FORKNUM));
+
+ if (RelationIsLogicallyLogged(relation))
+ ReorderBufferApplySequence(rb, txn, relation, change, streaming);
+
+ RelationClose(relation);
+ break;
}
}
@@ -3751,6 +4062,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
memcpy(data, change->data.truncate.relids, size);
data += size;
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ {
+ char *data;
+ ReorderBufferTupleBuf *tup;
+ Size len = 0;
+
+ tup = change->data.sequence.tuple;
+
+ if (tup)
+ {
+ sz += sizeof(HeapTupleData);
+ len = tup->tuple.t_len;
+ sz += len;
+ }
+
+ /* make sure we have enough space */
+ ReorderBufferSerializeReserve(rb, sz);
+
+ data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+ /* might have been reallocated above */
+ ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+ if (len)
+ {
+ memcpy(data, &tup->tuple, sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ memcpy(data, tup->tuple.t_data, len);
+ data += len;
+ }
+
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4014,6 +4358,22 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
{
sz += sizeof(Oid) * change->data.truncate.nrelids;
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ {
+ ReorderBufferTupleBuf *tup;
+ Size len = 0;
+
+ tup = change->data.sequence.tuple;
+
+ if (tup)
+ {
+ sz += sizeof(HeapTupleData);
+ len = tup->tuple.t_len;
+ sz += len;
+ }
+
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4314,6 +4674,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
+
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ if (change->data.sequence.tuple)
+ {
+ uint32 tuplelen = ((HeapTuple) data)->t_len;
+
+ change->data.sequence.tuple =
+ ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
+
+ /* restore ->tuple */
+ memcpy(&change->data.sequence.tuple->tuple, data,
+ sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ /* reset t_data pointer into the new tuplebuf */
+ change->data.sequence.tuple->tuple.t_data =
+ ReorderBufferTupleBufData(change->data.sequence.tuple);
+
+ /* restore tuple data itself */
+ memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
+ data += tuplelen;
+ }
+ break;
+
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 67f907cdd9..d1ba2a9645 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -99,6 +99,7 @@
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
+#include "commands/sequence.h"
#include "miscadmin.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
@@ -353,6 +354,12 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
*
* If the synchronization position is reached (SYNCDONE), then the table can
* be marked as READY and is no longer tracked.
+ *
+ * XXX This needs to handle sequences too - after AlterSubscription_refresh
+ * starts caring about sequences, GetSubscriptionNotReadyRelations won't
+ * return just tables, and we'll have to sync them here. Not sure it's worth
+ * creating a new "sync" worker per sequence, maybe we should just sync them
+ * in the current process (it's pretty light-weight).
*/
static void
process_syncing_tables_for_apply(XLogRecPtr current_lsn)
@@ -870,6 +877,99 @@ copy_table(Relation rel)
logicalrep_rel_close(relmapentry, NoLock);
}
+
+
+/*
+ * FIXME add comment
+ */
+static void
+fetch_sequence_data(char *nspname, char *relname,
+ int64 *last_value, int64 *log_cnt, bool *is_called)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID};
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
+ " FROM %s", quote_qualified_identifier(nspname, relname));
+
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not receive list of replicated tables from the publisher: %s",
+ res->err)));
+
+ /* Process the sequence. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ bool isnull;
+
+ *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+
+ *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull));
+ Assert(!isnull);
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+}
+
+/*
+ * Copy existing data of a sequence from publisher.
+ *
+ * Caller is responsible for locking the local relation.
+ */
+static void
+copy_sequence(Relation rel)
+{
+ LogicalRepRelMapEntry *relmapentry;
+ LogicalRepRelation lrel;
+ StringInfoData cmd;
+ int64 last_value = 0,
+ log_cnt = 0;
+ bool is_called = 0;
+
+ /* Get the publisher relation info. */
+ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
+ RelationGetRelationName(rel), &lrel);
+
+ /* Put the relation into relmap. */
+ logicalrep_relmap_update(&lrel);
+
+ /* Map the publisher relation to local one. */
+ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
+ Assert(rel == relmapentry->localrel);
+
+ /* Start copy on the publisher. */
+ initStringInfo(&cmd);
+
+ Assert(lrel.relkind == RELKIND_SEQUENCE);
+
+ fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
+
+ elog(WARNING, "sequence %s info last_value %ld log_cnt %ld is_called %d",
+ quote_qualified_identifier(lrel.nspname, lrel.relname),
+ last_value, log_cnt, is_called);
+
+ ResetSequence2(RelationGetRelid(rel), last_value, log_cnt, is_called);
+
+ logicalrep_rel_close(relmapentry, NoLock);
+}
+
+
+
+
/*
* Determine the tablesync slot name.
*
@@ -1102,10 +1202,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
originname)));
}
- /* Now do the initial data copy */
- PushActiveSnapshot(GetTransactionSnapshot());
- copy_table(rel);
- PopActiveSnapshot();
+ if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
+ {
+ /* Now do the initial sequence copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_sequence(rel);
+ PopActiveSnapshot();
+ }
+ else
+ {
+ /* Now do the initial data copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_table(rel);
+ PopActiveSnapshot();
+ }
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4b112593c6..7a84726b85 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -70,6 +70,7 @@
#include "catalog/pg_tablespace.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
+#include "commands/sequence.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
@@ -823,6 +824,57 @@ apply_handle_origin(StringInfo s)
errmsg_internal("ORIGIN message sent out of order")));
}
+/*
+ * Handle SEQUENCE message.
+ */
+static void
+apply_handle_sequence(StringInfo s)
+{
+ LogicalRepSequence seq;
+ Oid relid;
+
+ if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
+ return;
+
+ logicalrep_read_sequence(s, &seq);
+
+ /*
+ * Non-transactional sequence updates should not be part of a remote
+ * transaction. There should not be any running transaction.
+ */
+ Assert((!seq.transactional) || in_remote_transaction);
+ Assert(!(!seq.transactional && in_remote_transaction));
+ Assert(!(!seq.transactional && IsTransactionState()));
+
+ /*
+ * Make sure we're in a transaction (needed by ResetSequence2). For
+ * non-transactional updates we're guaranteed to start a new one,
+ * and we'll commit it at the end.
+ */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ maybe_reread_subscription();
+ }
+
+ relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
+ seq.seqname, -1),
+ RowExclusiveLock, false);
+
+ elog(WARNING, "applying sequence transactional %d created %d last_value %ld log_cnt %ld is_called %d",
+ seq.transactional, seq.created, seq.last_value, seq.log_cnt, seq.is_called);
+
+ /* apply the sequence change */
+ ResetSequence2(relid, seq.last_value, seq.log_cnt, seq.is_called);
+
+ /*
+ * Commit the per-stream transaction (we only do this when not in
+ * remote transaction, i.e. for non-transactional sequence updates.
+ */
+ if (!in_remote_transaction)
+ CommitTransactionCommand();
+}
+
/*
* Handle STREAM START message.
*/
@@ -2067,6 +2119,10 @@ apply_dispatch(StringInfo s)
*/
return;
+ case LOGICAL_REP_MSG_SEQUENCE:
+ apply_handle_sequence(s);
+ return;
+
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
return;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 63f108f960..c5fc6b3793 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -49,6 +49,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pgoutput_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
@@ -144,6 +148,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
+ cb->sequence_cb = pgoutput_sequence;
cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
@@ -166,11 +171,13 @@ parse_output_parameters(List *options, PGOutputData *data)
bool publication_names_given = false;
bool binary_option_given = false;
bool messages_option_given = false;
+ bool sequences_option_given = false;
bool streaming_given = false;
data->binary = false;
data->streaming = false;
data->messages = false;
+ data->sequences = true;
foreach(lc, options)
{
@@ -236,6 +243,16 @@ parse_output_parameters(List *options, PGOutputData *data)
data->messages = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "sequences") == 0)
+ {
+ if (sequences_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ sequences_option_given = true;
+
+ data->sequences = defGetBoolean(defel);
+ }
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
@@ -756,6 +773,47 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+static void
+pgoutput_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ TransactionId xid = InvalidTransactionId;
+ RelationSyncEntry *relentry;
+
+ if (!data->sequences)
+ return;
+
+ if (!is_publishable_relation(rel))
+ return;
+
+ /* XXX handle (in_streaming) here */
+
+ relentry = get_rel_sync_entry(data, RelationGetRelid(rel));
+
+ /*
+ * First check the sequence filter.
+ *
+ * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here.
+ */
+ if (!relentry->pubactions.pubsequence)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_sequence(ctx->out,
+ rel,
+ xid,
+ sequence_lsn,
+ transactional,
+ created,
+ last_value,
+ log_cnt,
+ is_called);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* Currently we always forward.
*/
@@ -1029,7 +1087,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->streamed_txns = NIL;
entry->replicate_valid = false;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
- entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+ entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
+ entry->pubactions.pubsequence = false;
entry->publish_as_relid = InvalidOid;
entry->map = NULL; /* will be set by maybe_send_schema() if needed */
}
@@ -1040,6 +1099,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
List *pubids = GetRelationPublications(relid);
ListCell *lc;
Oid publish_as_relid = relid;
+ bool is_sequence = (get_rel_relkind(relid) == RELKIND_SEQUENCE);
/* Reload publications if needed before use. */
if (!publications_valid)
@@ -1063,12 +1123,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
Publication *pub = lfirst(lc);
bool publish = false;
- if (pub->alltables)
+ if (pub->alltables && (!is_sequence))
{
publish = true;
if (pub->pubviaroot && am_partition)
publish_as_relid = llast_oid(get_partition_ancestors(relid));
}
+ else if (pub->allsequences && is_sequence)
+ {
+ publish = true;
+ }
+
+ /* if a sequence, just cross-check the list of publications */
+ if (!publish && is_sequence)
+ {
+ if (list_member_oid(pubids, pub->oid))
+ publish = true;
+ }
if (!publish)
{
@@ -1119,10 +1190,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+ entry->pubactions.pubsequence |= pub->pubactions.pubsequence;
}
if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
- entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
+ entry->pubactions.pubdelete && entry->pubactions.pubtruncate &&
+ entry->pubactions.pubsequence)
break;
}
@@ -1267,5 +1340,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
+ entry->pubactions.pubsequence = false;
}
}
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index fd05615e76..890829bc0c 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5487,6 +5487,7 @@ GetRelationPublicationActions(Relation relation)
pubactions->pubupdate |= pubform->pubupdate;
pubactions->pubdelete |= pubform->pubdelete;
pubactions->pubtruncate |= pubform->pubtruncate;
+ pubactions->pubsequence |= pubform->pubsequence;
ReleaseSysCache(tup);
@@ -5495,7 +5496,8 @@ GetRelationPublicationActions(Relation relation)
* other publications.
*/
if (pubactions->pubinsert && pubactions->pubupdate &&
- pubactions->pubdelete && pubactions->pubtruncate)
+ pubactions->pubdelete && pubactions->pubtruncate &&
+ pubactions->pubsequence)
break;
}
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index bd8e9ea2f8..b423dd4a42 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1640,7 +1640,7 @@ psql_completion(const char *text, int start, int end)
/* ALTER PUBLICATION <name> */
else if (Matches("ALTER", "PUBLICATION", MatchAny))
- COMPLETE_WITH("ADD TABLE", "DROP TABLE", "OWNER TO", "RENAME TO", "SET");
+ COMPLETE_WITH("ADD TABLE", "DROP TABLE", "ADD SEQUENCE", "DROP SEQUENCE", "OWNER TO", "RENAME TO", "SET");
/* ALTER PUBLICATION <name> SET */
else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET"))
COMPLETE_WITH("(", "TABLE");
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index acbcae4607..e3043e0f63 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11442,6 +11442,11 @@
provolatile => 's', prorettype => 'oid', proargtypes => 'text',
proallargtypes => '{text,oid}', proargmodes => '{i,o}',
proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' },
+{ oid => '8000', descr => 'get OIDs of sequences in a publication',
+ proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't',
+ provolatile => 's', prorettype => 'oid', proargtypes => 'text',
+ proallargtypes => '{text,oid}', proargmodes => '{i,o}',
+ proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' },
{ oid => '6121',
descr => 'returns whether a relation can be part of a publication',
proname => 'pg_relation_is_publishable', provolatile => 's',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 1b31fee9e3..d1c9a18b05 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -40,6 +40,12 @@ CATALOG(pg_publication,6104,PublicationRelationId)
*/
bool puballtables;
+ /*
+ * indicates that this is special publication which should encompass all
+ * sequences in the database (except for the unlogged and temp ones)
+ */
+ bool puballsequences;
+
/* true if inserts are published */
bool pubinsert;
@@ -52,6 +58,9 @@ CATALOG(pg_publication,6104,PublicationRelationId)
/* true if truncates are published */
bool pubtruncate;
+ /* true if sequences are published */
+ bool pubsequence;
+
/* true if partition changes are published using root schema */
bool pubviaroot;
} FormData_pg_publication;
@@ -74,6 +83,7 @@ typedef struct PublicationActions
bool pubupdate;
bool pubdelete;
bool pubtruncate;
+ bool pubsequence;
} PublicationActions;
typedef struct Publication
@@ -81,6 +91,7 @@ typedef struct Publication
Oid oid;
char *name;
bool alltables;
+ bool allsequences;
bool pubviaroot;
PublicationActions pubactions;
} Publication;
@@ -109,6 +120,9 @@ extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
extern List *GetAllTablesPublications(void);
extern List *GetAllTablesPublicationRelations(bool pubviaroot);
+extern List *GetAllSequencesPublicationRelations(void);
+extern List *GetPublicationSequenceRelations(Oid pubid);
+
extern bool is_publishable_relation(Relation rel);
extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
bool if_not_exists);
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 40544dd4c7..c28e8695cb 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -48,6 +48,7 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data;
typedef struct xl_seq_rec
{
RelFileNode node;
+ bool created; /* is this a CREATE SEQUENCE */
/* SEQUENCE TUPLE DATA FOLLOWS AT THE END */
} xl_seq_rec;
@@ -59,6 +60,7 @@ extern ObjectAddress DefineSequence(ParseState *pstate, CreateSeqStmt *stmt);
extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
extern void DeleteSequenceTuple(Oid relid);
extern void ResetSequence(Oid seq_relid);
+extern void ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called);
extern void ResetSequenceCaches(void);
extern void seq_redo(XLogReaderState *rptr);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index def9651b34..aa602deae5 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3643,8 +3643,10 @@ typedef struct AlterPublicationStmt
/* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
List *tables; /* List of tables to add/drop */
+ List *sequences; /* List of sequences to add/drop */
bool for_all_tables; /* Special publication for all tables in db */
- DefElemAction tableAction; /* What action to perform with the tables */
+ bool for_all_sequences; /* Special publication for all tables in db */
+ DefElemAction action; /* What action to perform with the tables/sequences */
} AlterPublicationStmt;
typedef struct CreateSubscriptionStmt
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 55b90c03ea..f3a7d18fca 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -55,6 +55,7 @@ typedef enum LogicalRepMsgType
LOGICAL_REP_MSG_RELATION = 'R',
LOGICAL_REP_MSG_TYPE = 'Y',
LOGICAL_REP_MSG_MESSAGE = 'M',
+ LOGICAL_REP_MSG_SEQUENCE = 'X', /* FIXME change */
LOGICAL_REP_MSG_STREAM_START = 'S',
LOGICAL_REP_MSG_STREAM_END = 'E',
LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
@@ -107,6 +108,19 @@ typedef struct LogicalRepTyp
char *typname; /* name of the remote type */
} LogicalRepTyp;
+/* Sequence info */
+typedef struct LogicalRepSequence
+{
+ Oid remoteid; /* unique id of the remote sequence */
+ char *nspname; /* schema name of remote sequence */
+ char *seqname; /* name of the remote sequence */
+ bool transactional;
+ bool created;
+ int64 last_value;
+ int64 log_cnt; /* XXX probably not needed? */
+ bool is_called; /* XXX probably not needed? */
+} LogicalRepSequence;
+
/* Transaction info */
typedef struct LogicalRepBeginData
{
@@ -154,6 +168,12 @@ extern List *logicalrep_read_truncate(StringInfo in,
bool *cascade, bool *restart_seqs);
extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
bool transactional, const char *prefix, Size sz, const char *message);
+extern void logicalrep_write_sequence(StringInfo out, Relation rel,
+ TransactionId xid, XLogRecPtr lsn,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt,
+ bool is_called);
+extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 810495ed0e..e3e1b03f32 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -88,6 +88,19 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
Size message_size,
const char *message);
+/*
+ * Called for the generic logical decoding sequences.
+ */
+typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional,
+ bool created,
+ int64 last_value,
+ int64 log_cnt,
+ bool is_called);
+
/*
* Filter changes by origin.
*/
@@ -219,6 +232,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
+ LogicalDecodeSequenceCB sequence_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 51e7c0348d..a5aec2928a 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -27,6 +27,7 @@ typedef struct PGOutputData
bool binary;
bool streaming;
bool messages;
+ bool sequences;
} PGOutputData;
#endif /* PGOUTPUT_H */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0c6e9d1cb9..5190cdf196 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -63,7 +63,8 @@ enum ReorderBufferChangeType
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
- REORDER_BUFFER_CHANGE_TRUNCATE
+ REORDER_BUFFER_CHANGE_TRUNCATE,
+ REORDER_BUFFER_CHANGE_SEQUENCE,
};
/* forward declaration */
@@ -157,6 +158,14 @@ typedef struct ReorderBufferChange
uint32 ninvalidations; /* Number of messages */
SharedInvalidationMessage *invalidations; /* invalidation message */
} inval;
+
+ /* Context data for Sequence changes */
+ struct
+ {
+ RelFileNode relnode;
+ bool created;
+ ReorderBufferTupleBuf *tuple;
+ } sequence;
} data;
/*
@@ -425,6 +434,15 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
const char *prefix, Size sz,
const char *message);
+/* sequence callback signature */
+typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt,
+ bool is_called);
+
/* begin prepare callback signature */
typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn);
@@ -506,6 +524,12 @@ struct ReorderBuffer
*/
HTAB *by_txn;
+ /*
+ * relfilenode => XID lookup table for sequences created in a transaction
+ * (also includes altered sequences, which assigns new relfilenode)
+ */
+ HTAB *sequences;
+
/*
* Transactions that could be a toplevel xact, ordered by LSN of the first
* record bearing that xid.
@@ -536,6 +560,7 @@ struct ReorderBuffer
ReorderBufferApplyTruncateCB apply_truncate;
ReorderBufferCommitCB commit;
ReorderBufferMessageCB message;
+ ReorderBufferSequenceCB sequence;
/*
* Callbacks to be called when streaming a transaction at prepare time.
@@ -630,6 +655,10 @@ void ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size message_size, const char *message);
+void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+ Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+ RelFileNode rnode, bool transactional, bool created,
+ ReorderBufferTupleBuf *tuplebuf);
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
@@ -677,4 +706,7 @@ void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
void StartupReorderBuffer(void);
+bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+ RelFileNode rnode, bool created);
+
#endif
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 63d6ab7a4e..2cbad49f2e 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -158,8 +158,8 @@ DROP TABLE testpub_parted1;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- fail - view
CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
-ERROR: "testpub_view" is not a table
-DETAIL: Only tables can be added to publications.
+ERROR: "testpub_view" is not a table or sequence
+DETAIL: Only tables and sequences can be added to publications.
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1, pub_test.testpub_nopk;
RESET client_min_messages;
@@ -180,8 +180,8 @@ Tables:
-- fail - view
ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
-ERROR: "testpub_view" is not a table
-DETAIL: Only tables can be added to publications.
+ERROR: "testpub_view" is not a table or sequence
+DETAIL: Only tables and sequences can be added to publications.
ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index e5ab11275d..393be871be 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,6 +1451,14 @@ pg_prepared_xacts| SELECT p.transaction,
FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_publication_sequences| SELECT p.pubname,
+ n.nspname AS schemaname,
+ c.relname AS sequencename
+ FROM pg_publication p,
+ LATERAL pg_get_publication_sequences((p.pubname)::text) gpt(relid),
+ (pg_class c
+ JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
+ WHERE (c.oid = gpt.relid);
pg_publication_tables| SELECT p.pubname,
n.nspname AS schemaname,
c.relname AS tablename
--
2.31.1