Here's a rebased version of the patch, no other changes. I think the crucial aspect of this that needs discussion/feedback the most is the transactional vs. non-transactional behavior. All the other questions are less important / cosmetic.
regards -- Tomas Vondra EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
>From f61f2cabf6221451af8f427a5d4b4f7f4583e618 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <to...@2ndquadrant.com> Date: Tue, 20 Jul 2021 23:20:10 +0200 Subject: [PATCH] Logical decoding / replication of sequences --- contrib/test_decoding/Makefile | 3 +- contrib/test_decoding/expected/sequence.out | 327 +++++++++++++++ contrib/test_decoding/sql/sequence.sql | 119 ++++++ contrib/test_decoding/test_decoding.c | 40 ++ doc/src/sgml/catalogs.sgml | 71 ++++ doc/src/sgml/ref/alter_publication.sgml | 28 +- doc/src/sgml/ref/alter_subscription.sgml | 6 +- src/backend/catalog/pg_publication.c | 160 +++++++- src/backend/catalog/system_views.sql | 10 + src/backend/commands/publicationcmds.c | 232 ++++++++++- src/backend/commands/sequence.c | 132 +++++- src/backend/commands/subscriptioncmds.c | 272 +++++++++++++ src/backend/executor/execReplication.c | 4 +- src/backend/nodes/copyfuncs.c | 4 +- src/backend/nodes/equalfuncs.c | 4 +- src/backend/parser/gram.y | 44 +- src/backend/replication/logical/decode.c | 131 +++++- src/backend/replication/logical/logical.c | 42 ++ src/backend/replication/logical/proto.c | 52 +++ .../replication/logical/reorderbuffer.c | 384 ++++++++++++++++++ src/backend/replication/logical/tablesync.c | 118 +++++- src/backend/replication/logical/worker.c | 56 +++ src/backend/replication/pgoutput/pgoutput.c | 80 +++- src/backend/utils/cache/relcache.c | 4 +- src/bin/psql/tab-complete.c | 2 +- src/include/catalog/pg_proc.dat | 5 + src/include/catalog/pg_publication.h | 14 + src/include/commands/sequence.h | 2 + src/include/nodes/parsenodes.h | 4 +- src/include/replication/logicalproto.h | 20 + src/include/replication/output_plugin.h | 14 + src/include/replication/pgoutput.h | 1 + src/include/replication/reorderbuffer.h | 34 +- src/test/regress/expected/publication.out | 8 +- src/test/regress/expected/rules.out | 8 + 35 files changed, 2389 insertions(+), 46 deletions(-) create mode 100644 contrib/test_decoding/expected/sequence.out create mode 100644 contrib/test_decoding/sql/sequence.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 9a31e0b879..56ddc3abae 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,8 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate stream stats twophase twophase_stream + spill slot truncate stream stats twophase twophase_stream \ + sequence ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out new file mode 100644 index 0000000000..07f3e3e92c --- /dev/null +++ b/contrib/test_decoding/expected/sequence.out @@ -0,0 +1,327 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE SEQUENCE test_sequence; +-- test the sequence changes by several nextval() calls +SELECT nextval('test_sequence'); + nextval +--------- + 1 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 2 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 4 +(1 row) + +-- test the sequence changes by several ALTER commands +ALTER SEQUENCE test_sequence INCREMENT BY 10; +SELECT nextval('test_sequence'); + nextval +--------- + 14 +(1 row) + +ALTER SEQUENCE test_sequence START WITH 3000; +ALTER SEQUENCE test_sequence MAXVALUE 10000; +ALTER SEQUENCE test_sequence RESTART WITH 4000; +SELECT nextval('test_sequence'); + nextval +--------- + 4000 +(1 row) + +-- test the sequence changes by several setval() calls +SELECT setval('test_sequence', 3500); + setval +-------- + 3500 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3510 +(1 row) + +SELECT setval('test_sequence', 3500, true); + setval +-------- + 3500 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3510 +(1 row) + +SELECT setval('test_sequence', 3500, false); + setval +-------- + 3500 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3500 +(1 row) + +-- show results and drop sequence +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0'); + data +------------------------------------------------------------------------------------------------------ + BEGIN + sequence: public.test_sequence transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0 + COMMIT + sequence: public.test_sequence transactional: 0 created: 0 last_value: 33, log_cnt: 0 is_called: 1 + BEGIN + sequence: public.test_sequence transactional: 1 created: 1 last_value: 4, log_cnt: 0 is_called: 1 + COMMIT + sequence: public.test_sequence transactional: 0 created: 0 last_value: 334, log_cnt: 0 is_called: 1 + BEGIN + sequence: public.test_sequence transactional: 1 created: 1 last_value: 14, log_cnt: 32 is_called: 1 + COMMIT + BEGIN + sequence: public.test_sequence transactional: 1 created: 1 last_value: 14, log_cnt: 0 is_called: 1 + COMMIT + BEGIN + sequence: public.test_sequence transactional: 1 created: 1 last_value: 4000, log_cnt: 0 is_called: 0 + COMMIT + sequence: public.test_sequence transactional: 0 created: 0 last_value: 4320, log_cnt: 0 is_called: 1 + sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 1 + sequence: public.test_sequence transactional: 0 created: 0 last_value: 3830, log_cnt: 0 is_called: 1 + sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 1 + sequence: public.test_sequence transactional: 0 created: 0 last_value: 3830, log_cnt: 0 is_called: 1 + sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 0 + sequence: public.test_sequence transactional: 0 created: 0 last_value: 3820, log_cnt: 0 is_called: 1 +(24 rows) + +DROP SEQUENCE test_sequence; +-- rollback on sequence creation and update +BEGIN; +CREATE SEQUENCE test_sequence; +CREATE TABLE test_table (a INT); +SELECT nextval('test_sequence'); + nextval +--------- + 1 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 2 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3 +(1 row) + +SELECT setval('test_sequence', 3000); + setval +-------- + 3000 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3001 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3002 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3003 +(1 row) + +ALTER SEQUENCE test_sequence RESTART WITH 6000; +INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) ); +SELECT nextval('test_sequence'); + nextval +--------- + 6001 +(1 row) + +ROLLBACK; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0'); + data +------ +(0 rows) + +-- rollback on table creation with serial column +BEGIN; +CREATE TABLE test_table (a SERIAL, b INT); +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +INSERT INTO test_table (b) VALUES (300); +SELECT setval('test_table_a_seq', 3000); + setval +-------- + 3000 +(1 row) + +INSERT INTO test_table (b) VALUES (400); +INSERT INTO test_table (b) VALUES (500); +INSERT INTO test_table (b) VALUES (600); +ALTER SEQUENCE test_table_a_seq RESTART WITH 6000; +INSERT INTO test_table (b) VALUES (700); +INSERT INTO test_table (b) VALUES (800); +INSERT INTO test_table (b) VALUES (900); +ROLLBACK; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0'); + data +------ +(0 rows) + +-- rollback on table with serial column +CREATE TABLE test_table (a SERIAL, b INT); +BEGIN; +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +INSERT INTO test_table (b) VALUES (300); +SELECT setval('test_table_a_seq', 3000); + setval +-------- + 3000 +(1 row) + +INSERT INTO test_table (b) VALUES (400); +INSERT INTO test_table (b) VALUES (500); +INSERT INTO test_table (b) VALUES (600); +ALTER SEQUENCE test_table_a_seq RESTART WITH 6000; +INSERT INTO test_table (b) VALUES (700); +INSERT INTO test_table (b) VALUES (800); +INSERT INTO test_table (b) VALUES (900); +ROLLBACK; +-- check table and sequence values after rollback +SELECT * from test_table_a_seq; + last_value | log_cnt | is_called +------------+---------+----------- + 3003 | 30 | t +(1 row) + +SELECT nextval('test_table_a_seq'); + nextval +--------- + 3004 +(1 row) + +DROP TABLE test_table; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0'); + data +--------------------------------------------------------------------------------------------------------- + BEGIN + sequence: public.test_table_a_seq transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0 + COMMIT + sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 33, log_cnt: 0 is_called: 1 + sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 3000, log_cnt: 0 is_called: 1 + sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 3033, log_cnt: 0 is_called: 1 + BEGIN + COMMIT +(8 rows) + +-- savepoint test on table with serial column +BEGIN; +CREATE TABLE test_table (a SERIAL, b INT); +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +SAVEPOINT a; +INSERT INTO test_table (b) VALUES (300); +ROLLBACK TO SAVEPOINT a; +DROP TABLE test_table; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0'); + data +------------------------------------------------------------------------------------------------------- + BEGIN + sequence: public.test_table_a_seq transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0 + sequence: public.test_table_a_seq transactional: 1 created: 0 last_value: 33, log_cnt: 0 is_called: 1 + table public.test_table: INSERT: a[integer]:1 b[integer]:100 + table public.test_table: INSERT: a[integer]:2 b[integer]:200 + COMMIT +(6 rows) + +-- savepoint test on table with serial column +BEGIN; +CREATE SEQUENCE test_sequence; +SELECT nextval('test_sequence'); + nextval +--------- + 1 +(1 row) + +SELECT setval('test_sequence', 3000); + setval +-------- + 3000 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3001 +(1 row) + +SAVEPOINT a; +ALTER SEQUENCE test_sequence START WITH 7000; +SELECT setval('test_sequence', 5000); + setval +-------- + 5000 +(1 row) + +ROLLBACK TO SAVEPOINT a; +SELECT * FROM test_sequence; + last_value | log_cnt | is_called +------------+---------+----------- + 3001 | 32 | t +(1 row) + +DROP SEQUENCE test_sequence; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0'); + data +------------------------------------------------------------------------------------------------------ + BEGIN + sequence: public.test_sequence transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0 + sequence: public.test_sequence transactional: 1 created: 0 last_value: 33, log_cnt: 0 is_called: 1 + sequence: public.test_sequence transactional: 1 created: 0 last_value: 3000, log_cnt: 0 is_called: 1 + sequence: public.test_sequence transactional: 1 created: 0 last_value: 3033, log_cnt: 0 is_called: 1 + COMMIT +(6 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql new file mode 100644 index 0000000000..d8a34738f3 --- /dev/null +++ b/contrib/test_decoding/sql/sequence.sql @@ -0,0 +1,119 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE SEQUENCE test_sequence; + +-- test the sequence changes by several nextval() calls +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); + +-- test the sequence changes by several ALTER commands +ALTER SEQUENCE test_sequence INCREMENT BY 10; +SELECT nextval('test_sequence'); + +ALTER SEQUENCE test_sequence START WITH 3000; +ALTER SEQUENCE test_sequence MAXVALUE 10000; +ALTER SEQUENCE test_sequence RESTART WITH 4000; +SELECT nextval('test_sequence'); + +-- test the sequence changes by several setval() calls +SELECT setval('test_sequence', 3500); +SELECT nextval('test_sequence'); +SELECT setval('test_sequence', 3500, true); +SELECT nextval('test_sequence'); +SELECT setval('test_sequence', 3500, false); +SELECT nextval('test_sequence'); + +-- show results and drop sequence +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0'); +DROP SEQUENCE test_sequence; + +-- rollback on sequence creation and update +BEGIN; +CREATE SEQUENCE test_sequence; +CREATE TABLE test_table (a INT); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT setval('test_sequence', 3000); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +ALTER SEQUENCE test_sequence RESTART WITH 6000; +INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) ); +SELECT nextval('test_sequence'); +ROLLBACK; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0'); + +-- rollback on table creation with serial column +BEGIN; +CREATE TABLE test_table (a SERIAL, b INT); +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +INSERT INTO test_table (b) VALUES (300); +SELECT setval('test_table_a_seq', 3000); +INSERT INTO test_table (b) VALUES (400); +INSERT INTO test_table (b) VALUES (500); +INSERT INTO test_table (b) VALUES (600); +ALTER SEQUENCE test_table_a_seq RESTART WITH 6000; +INSERT INTO test_table (b) VALUES (700); +INSERT INTO test_table (b) VALUES (800); +INSERT INTO test_table (b) VALUES (900); +ROLLBACK; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0'); + +-- rollback on table with serial column +CREATE TABLE test_table (a SERIAL, b INT); + +BEGIN; +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +INSERT INTO test_table (b) VALUES (300); +SELECT setval('test_table_a_seq', 3000); +INSERT INTO test_table (b) VALUES (400); +INSERT INTO test_table (b) VALUES (500); +INSERT INTO test_table (b) VALUES (600); +ALTER SEQUENCE test_table_a_seq RESTART WITH 6000; +INSERT INTO test_table (b) VALUES (700); +INSERT INTO test_table (b) VALUES (800); +INSERT INTO test_table (b) VALUES (900); +ROLLBACK; + +-- check table and sequence values after rollback +SELECT * from test_table_a_seq; +SELECT nextval('test_table_a_seq'); + +DROP TABLE test_table; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0'); + +-- savepoint test on table with serial column +BEGIN; +CREATE TABLE test_table (a SERIAL, b INT); +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +SAVEPOINT a; +INSERT INTO test_table (b) VALUES (300); +ROLLBACK TO SAVEPOINT a; +DROP TABLE test_table; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0'); + +-- savepoint test on table with serial column +BEGIN; +CREATE SEQUENCE test_sequence; +SELECT nextval('test_sequence'); +SELECT setval('test_sequence', 3000); +SELECT nextval('test_sequence'); +SAVEPOINT a; +ALTER SEQUENCE test_sequence START WITH 7000; +SELECT setval('test_sequence', 5000); +ROLLBACK TO SAVEPOINT a; +SELECT * FROM test_sequence; +DROP SEQUENCE test_sequence; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0'); + +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index e5cd84e85e..54d734cb85 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -35,6 +35,7 @@ typedef struct bool include_timestamp; bool skip_empty_xacts; bool only_local; + bool skip_sequences; } TestDecodingData; /* @@ -76,6 +77,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_sequence(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, + Relation rel, bool transactional, bool created, + int64 last_value, int64 log_cnt, bool is_called); static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); @@ -141,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + cb->sequence_cb = pg_decode_sequence; cb->filter_prepare_cb = pg_decode_filter_prepare; cb->begin_prepare_cb = pg_decode_begin_prepare_txn; cb->prepare_cb = pg_decode_prepare_txn; @@ -175,6 +181,9 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->skip_empty_xacts = false; data->only_local = false; + /* skip sequences by default for backwards compatibility */ + data->skip_sequences = true; + ctx->output_plugin_private = data; opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; @@ -265,6 +274,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "skip-sequences") == 0) + { + + if (elem->arg == NULL) + continue; /* true by default */ + else if (!parse_bool(strVal(elem->arg), &data->skip_sequences)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -744,6 +764,26 @@ pg_decode_message(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } +static void +pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, bool created, + int64 last_value, int64 log_cnt, bool is_called) +{ + TestDecodingData *data = ctx->output_plugin_private; + char *nspname = get_namespace_name(RelationGetNamespace(rel)); + + /* return if requested to skip_sequences */ + if (data->skip_sequences) + return; + + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "sequence: %s.%s transactional: %d created: %d last_value: %zu, log_cnt: %zu is_called: %d", + nspname, RelationGetRelationName(rel), + transactional, created, last_value, log_cnt, is_called); + OutputPluginWrite(ctx, true); +} + static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 2b2c70a26e..a342a54629 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -9430,6 +9430,11 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l <entry>prepared transactions</entry> </row> + <row> + <entry><link linkend="view-pg-publication-sequences"><structname>pg_publication_sequences</structname></link></entry> + <entry>publications and their associated sequences</entry> + </row> + <row> <entry><link linkend="view-pg-publication-tables"><structname>pg_publication_tables</structname></link></entry> <entry>publications and their associated tables</entry> @@ -11264,6 +11269,72 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx </sect1> + <sect1 id="view-pg-publication-sequences"> + <title><structname>pg_publication_sequences</structname></title> + + <indexterm zone="view-pg-publication-sequences"> + <primary>pg_publication_sequences</primary> + </indexterm> + + <para> + The view <structname>pg_publication_sequences</structname> provides + information about the mapping between publications and the sequences they + contain. Unlike the underlying catalog + <link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>, + this view expands + publications defined as <literal>FOR ALL SEQUENCES</literal>, so for such + publications there will be a row for each eligible sequence. + </para> + + <table> + <title><structname>pg_publication_sequences</structname> Columns</title> + <tgroup cols="1"> + <thead> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + Column Type + </para> + <para> + Description + </para></entry> + </row> + </thead> + + <tbody> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>pubname</structfield> <type>name</type> + (references <link linkend="catalog-pg-publication"><structname>pg_publication</structname></link>.<structfield>pubname</structfield>) + </para> + <para> + Name of publication + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>schemaname</structfield> <type>name</type> + (references <link linkend="catalog-pg-namespace"><structname>pg_namespace</structname></link>.<structfield>nspname</structfield>) + </para> + <para> + Name of schema containing sequence + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>sequencename</structfield> <type>name</type> + (references <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>relname</structfield>) + </para> + <para> + Name of sequence + </para></entry> + </row> + </tbody> + </tgroup> + </table> + </sect1> + <sect1 id="view-pg-publication-tables"> <title><structname>pg_publication_tables</structname></title> diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index faa114b2c6..c68a1573de 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -24,6 +24,9 @@ PostgreSQL documentation ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable> @@ -50,7 +53,18 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r </para> <para> - The fourth variant of this command listed in the synopsis can change + The next three variants change which sequences are part of the publication. + The <literal>SET SEQUENCE</literal> clause will replace the list of sequences + in the publication with the specified one. The <literal>ADD SEQUENCE</literal> + and <literal>DROP SEQUENCE</literal> clauses will add and remove one or more + sequences from the publication. Note that adding sequences to a publication + that is already subscribed to will require a <literal>ALTER SUBSCRIPTION + ... REFRESH PUBLICATION</literal> action on the subscribing side in order + to become effective. + </para> + + <para> + The seventh variant of this command listed in the synopsis can change all of the publication properties specified in <xref linkend="sql-createpublication"/>. Properties not mentioned in the command retain their previous settings. @@ -62,7 +76,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r <para> You must own the publication to use <command>ALTER PUBLICATION</command>. - Adding a table to a publication additionally requires owning that table. + Adding a table to a publication additionally requires owning that table, + and the same requirement applies to sequences. To alter the owner, you must also be a direct or indirect member of the new owning role. The new owner must have <literal>CREATE</literal> privilege on the database. Also, the new owner of a <literal>FOR ALL TABLES</literal> @@ -97,6 +112,15 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r </listitem> </varlistentry> + <varlistentry> + <term><replaceable class="parameter">sequence_name</replaceable></term> + <listitem> + <para> + Name of an existing sequence. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><literal>SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term> <listitem> diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index a6f994450d..ff1dcd3bfd 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -144,8 +144,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < <term><literal>REFRESH PUBLICATION</literal></term> <listitem> <para> - Fetch missing table information from publisher. This will start - replication of tables that were added to the subscribed-to publications + Fetch missing table and sequence information from publisher. This will start + replication of tables and sequences that were added to the subscribed-to publications since the last invocation of <command>REFRESH PUBLICATION</command> or since <command>CREATE SUBSCRIPTION</command>. </para> @@ -162,7 +162,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < Specifies whether the existing data in the publications that are being subscribed to should be copied once the replication starts. The default is <literal>true</literal>. (Previously subscribed - tables are not copied.) + tables and sequences are not copied.) </para> </listitem> </varlistentry> diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 36bfff9706..79f45d892e 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -51,26 +51,27 @@ check_publication_add_relation(Relation targetrel) { /* Must be a regular or partitioned table */ if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && - RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) + RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE && + RelationGetForm(targetrel)->relkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("\"%s\" is not a table", + errmsg("\"%s\" is not a table or sequence", RelationGetRelationName(targetrel)), - errdetail("Only tables can be added to publications."))); + errdetail("Only tables and sequences can be added to publications."))); - /* Can't be system table */ + /* Can't be system table/sequence */ if (IsCatalogRelation(targetrel)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("\"%s\" is a system table", + errmsg("\"%s\" is a system table or sequence", RelationGetRelationName(targetrel)), - errdetail("System tables cannot be added to publications."))); + errdetail("System tables / sequences cannot be added to publications."))); /* UNLOGGED and TEMP relations cannot be part of publication. */ if (!RelationIsPermanent(targetrel)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("table \"%s\" cannot be replicated", + errmsg("table or sequence \"%s\" cannot be replicated", RelationGetRelationName(targetrel)), errdetail("Temporary and unlogged relations cannot be replicated."))); } @@ -98,7 +99,8 @@ static bool is_publishable_class(Oid relid, Form_pg_class reltuple) { return (reltuple->relkind == RELKIND_RELATION || - reltuple->relkind == RELKIND_PARTITIONED_TABLE) && + reltuple->relkind == RELKIND_PARTITIONED_TABLE || + reltuple->relkind == RELKIND_SEQUENCE) && !IsCatalogRelationOid(relid) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; @@ -271,6 +273,10 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + /* skip sequences here */ + if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE) + continue; + if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE && pub_partopt != PUBLICATION_PART_ROOT) { @@ -304,6 +310,49 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) return result; } +/* + * Gets list of relation oids for a publication (sequences only). + * + * This should only be used for normal publications, the FOR ALL TABLES + * should use GetAllSequencesPublicationRelations(). + */ +List * +GetPublicationSequenceRelations(Oid pubid) +{ + List *result; + Relation pubrelsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all publications associated with the relation. */ + pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_rel_prpubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId, + true, NULL, 1, &scankey); + + result = NIL; + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_rel pubrel; + + pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + + if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE) + result = lappend_oid(result, pubrel->prrelid); + } + + systable_endscan(scan); + table_close(pubrelsrel, AccessShareLock); + + return result; +} + /* * Gets list of publication oids for publications marked as FOR ALL TABLES. */ @@ -404,6 +453,46 @@ GetAllTablesPublicationRelations(bool pubviaroot) return result; } +/* + * Gets list of all relation published by FOR ALL TABLES publication(s). + * + * If the publication publishes partition changes via their respective root + * partitioned tables, we must exclude partitions in favor of including the + * root partitioned tables. + */ +List * +GetAllSequencesPublicationRelations(void) +{ + Relation classRel; + ScanKeyData key[1]; + TableScanDesc scan; + HeapTuple tuple; + List *result = NIL; + + classRel = table_open(RelationRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_class_relkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(RELKIND_SEQUENCE)); + + scan = table_beginscan_catalog(classRel, 1, key); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + Oid relid = relForm->oid; + + if (is_publishable_class(relid, relForm)) + result = lappend_oid(result, relid); + } + + table_endscan(scan); + + table_close(classRel, AccessShareLock); + return result; +} + /* * Get publication using oid * @@ -426,10 +515,12 @@ GetPublication(Oid pubid) pub->oid = pubid; pub->name = pstrdup(NameStr(pubform->pubname)); pub->alltables = pubform->puballtables; + pub->allsequences = pubform->puballsequences; pub->pubactions.pubinsert = pubform->pubinsert; pub->pubactions.pubupdate = pubform->pubupdate; pub->pubactions.pubdelete = pubform->pubdelete; pub->pubactions.pubtruncate = pubform->pubtruncate; + pub->pubactions.pubsequence = pubform->pubsequence; pub->pubviaroot = pubform->pubviaroot; ReleaseSysCache(tup); @@ -555,3 +646,56 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } + +/* + * Returns Oids of sequences in a publication. + */ +Datum +pg_get_publication_sequences(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + Publication *publication; + List *sequences; + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + publication = GetPublicationByName(pubname, false); + + /* + * Publications support partitioned tables, although all changes are + * replicated using leaf partition identity and schema, so we only + * need those. + */ + if (publication->allsequences) + sequences = GetAllSequencesPublicationRelations(); + else + sequences = GetPublicationSequenceRelations(publication->oid); + + funcctx->user_fctx = (void *) sequences; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + sequences = (List *) funcctx->user_fctx; + + if (funcctx->call_cntr < list_length(sequences)) + { + Oid relid = list_nth_oid(sequences, funcctx->call_cntr); + + SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid)); + } + + SRF_RETURN_DONE(funcctx); +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 55f6e3711d..9bec49bc3e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -372,6 +372,16 @@ CREATE VIEW pg_publication_tables AS pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) WHERE C.oid = GPT.relid; +CREATE VIEW pg_publication_sequences AS + SELECT + P.pubname AS pubname, + N.nspname AS schemaname, + C.relname AS sequencename + FROM pg_publication P, + LATERAL pg_get_publication_sequences(P.pubname) GPT, + pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) + WHERE C.oid = GPT.relid; + CREATE VIEW pg_locks AS SELECT * FROM pg_lock_status() AS L; diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 8487eeb7e6..90ad83d4a1 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -16,6 +16,7 @@ #include "access/genam.h" #include "access/htup_details.h" +#include "access/relation.h" #include "access/table.h" #include "access/xact.h" #include "catalog/catalog.h" @@ -54,6 +55,12 @@ static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); +static List *OpenSequenceList(List *sequences); +static void CloseSequenceList(List *rels); +static void PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists, + AlterPublicationStmt *stmt); +static void PublicationDropSequences(Oid pubid, List *rels, bool missing_ok); + static void parse_publication_options(ParseState *pstate, List *options, @@ -72,6 +79,7 @@ parse_publication_options(ParseState *pstate, pubactions->pubupdate = true; pubactions->pubdelete = true; pubactions->pubtruncate = true; + pubactions->pubsequence = true; *publish_via_partition_root = false; /* Parse options */ @@ -96,6 +104,7 @@ parse_publication_options(ParseState *pstate, pubactions->pubupdate = false; pubactions->pubdelete = false; pubactions->pubtruncate = false; + pubactions->pubsequence = false; *publish_given = true; publish = defGetString(defel); @@ -118,6 +127,8 @@ parse_publication_options(ParseState *pstate, pubactions->pubdelete = true; else if (strcmp(publish_opt, "truncate") == 0) pubactions->pubtruncate = true; + else if (strcmp(publish_opt, "sequence") == 0) + pubactions->pubsequence = true; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -208,6 +219,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) BoolGetDatum(pubactions.pubdelete); values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); + values[Anum_pg_publication_pubsequence - 1] = + BoolGetDatum(pubactions.pubsequence); values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); @@ -373,9 +386,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, rels = OpenTableList(stmt->tables); - if (stmt->tableAction == DEFELEM_ADD) + if (stmt->action == DEFELEM_ADD) PublicationAddTables(pubid, rels, false, stmt); - else if (stmt->tableAction == DEFELEM_DROP) + else if (stmt->action == DEFELEM_DROP) PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { @@ -426,6 +439,82 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, CloseTableList(rels); } +/* + * Add or remove sequence to/from publication. + */ +static void +AlterPublicationSequences(AlterPublicationStmt *stmt, Relation rel, + HeapTuple tup) +{ + List *rels = NIL; + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + Oid pubid = pubform->oid; + + /* Check that user is allowed to manipulate the publication tables. */ + if (pubform->puballsequences) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES", + NameStr(pubform->pubname)), + errdetail("Sequences cannot be added to or dropped from FOR ALL TABLES publications."))); + + Assert(list_length(stmt->sequences) > 0); + + rels = OpenSequenceList(stmt->sequences); + + if (stmt->action == DEFELEM_ADD) + PublicationAddSequences(pubid, rels, false, stmt); + else if (stmt->action == DEFELEM_DROP) + PublicationDropSequences(pubid, rels, false); + else /* DEFELEM_SET */ + { + List *oldrelids = GetPublicationRelations(pubid, + PUBLICATION_PART_ROOT); + List *delrels = NIL; + ListCell *oldlc; + + /* Calculate which relations to drop. */ + foreach(oldlc, oldrelids) + { + Oid oldrelid = lfirst_oid(oldlc); + ListCell *newlc; + bool found = false; + + foreach(newlc, rels) + { + Relation newrel = (Relation) lfirst(newlc); + + if (RelationGetRelid(newrel) == oldrelid) + { + found = true; + break; + } + } + + if (!found) + { + Relation oldrel = relation_open(oldrelid, + ShareUpdateExclusiveLock); + + delrels = lappend(delrels, oldrel); + } + } + + /* And drop them. */ + PublicationDropSequences(pubid, delrels, true); + + /* + * Don't bother calculating the difference for adding, we'll catch and + * skip existing ones when doing catalog update. + */ + PublicationAddSequences(pubid, rels, true, stmt); + + CloseSequenceList(delrels); + } + + CloseSequenceList(rels); +} + /* * Alter the existing publication. * @@ -459,8 +548,12 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) if (stmt->options) AlterPublicationOptions(pstate, stmt, rel, tup); - else + else if (stmt->tables) AlterPublicationTables(stmt, rel, tup); + else if (stmt->sequences) + AlterPublicationSequences(stmt, rel, tup); + else + Assert(false); /* Cleanup. */ heap_freetuple(tup); @@ -665,6 +758,139 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) } } +/* + * Open relations specified by a RangeVar list. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. + */ +static List * +OpenSequenceList(List *sequences) +{ + List *relids = NIL; + List *rels = NIL; + ListCell *lc; + + /* + * Open, share-lock, and check all the explicitly-specified relations + */ + foreach(lc, sequences) + { + RangeVar *rv = castNode(RangeVar, lfirst(lc)); + Relation rel; + Oid myrelid; + + /* Allow query cancel in case this takes a long time */ + CHECK_FOR_INTERRUPTS(); + + rel = relation_openrv(rv, ShareUpdateExclusiveLock); + myrelid = RelationGetRelid(rel); + + /* + * Filter out duplicates if user specifies "foo, foo". + * + * Note that this algorithm is known to not be very efficient (O(N^2)) + * but given that it only works on list of sequences given to us by user + * it's deemed acceptable. + */ + if (list_member_oid(relids, myrelid)) + { + relation_close(rel, ShareUpdateExclusiveLock); + continue; + } + + rels = lappend(rels, rel); + relids = lappend_oid(relids, myrelid); + } + + list_free(relids); + + return rels; +} + +/* + * Close all relations in the list. + */ +static void +CloseSequenceList(List *rels) +{ + ListCell *lc; + + foreach(lc, rels) + { + Relation rel = (Relation) lfirst(lc); + + relation_close(rel, NoLock); + } +} + +/* + * Add listed tables to the publication. + */ +static void +PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists, + AlterPublicationStmt *stmt) +{ + ListCell *lc; + + Assert(!stmt || !stmt->for_all_sequences); + + foreach(lc, rels) + { + Relation rel = (Relation) lfirst(lc); + ObjectAddress obj; + + /* Must be owner of the table or superuser. */ + if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), + RelationGetRelationName(rel)); + + obj = publication_add_relation(pubid, rel, if_not_exists); + if (stmt) + { + EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, + (Node *) stmt); + + InvokeObjectPostCreateHook(PublicationRelRelationId, + obj.objectId, 0); + } + } +} + +/* + * Remove listed sequences from the publication. + */ +static void +PublicationDropSequences(Oid pubid, List *rels, bool missing_ok) +{ + ObjectAddress obj; + ListCell *lc; + Oid prid; + + foreach(lc, rels) + { + Relation rel = (Relation) lfirst(lc); + Oid relid = RelationGetRelid(rel); + + prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pubid)); + if (!OidIsValid(prid)) + { + if (missing_ok) + continue; + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("relation \"%s\" is not part of the publication", + RelationGetRelationName(rel)))); + } + + ObjectAddressSet(obj, PublicationRelRelationId, prid); + performDeletion(&obj, DROP_CASCADE, 0); + } +} + + /* * Internal workhorse for changing a publication owner */ diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 72bfdc07a4..2fc644c74c 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -94,7 +94,7 @@ static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */ */ static SeqTableData *last_used_seq = NULL; -static void fill_seq_with_data(Relation rel, HeapTuple tuple); +static void fill_seq_with_data(Relation rel, HeapTuple tuple, bool create); static Relation lock_and_open_sequence(SeqTable seq); static void create_seq_hashtable(void); static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel); @@ -222,7 +222,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq) /* now initialize the sequence's data */ tuple = heap_form_tuple(tupDesc, value, null); - fill_seq_with_data(rel, tuple); + fill_seq_with_data(rel, tuple, true); /* process OWNED BY if given */ if (owned_by) @@ -327,7 +327,86 @@ ResetSequence(Oid seq_relid) /* * Insert the modified tuple into the new storage file. */ - fill_seq_with_data(seq_rel, tuple); + fill_seq_with_data(seq_rel, tuple, true); + + /* Clear local cache so that we don't think we have cached numbers */ + /* Note that we do not change the currval() state */ + elm->cached = elm->last; + + relation_close(seq_rel, NoLock); +} + +/* + * Reset a sequence to its initial value. + * + * The change is made transactionally, so that on failure of the current + * transaction, the sequence will be restored to its previous state. + * We do that by creating a whole new relfilenode for the sequence; so this + * works much like the rewriting forms of ALTER TABLE. + * + * Caller is assumed to have acquired AccessExclusiveLock on the sequence, + * which must not be released until end of transaction. Caller is also + * responsible for permissions checking. + */ +void +ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called) +{ + Relation seq_rel; + SeqTable elm; + Form_pg_sequence_data seq; + Buffer buf; + HeapTupleData seqdatatuple; + HeapTuple tuple; + + /* + * Read the old sequence. This does a bit more work than really + * necessary, but it's simple, and we do want to double-check that it's + * indeed a sequence. + */ + init_sequence(seq_relid, &elm, &seq_rel); + (void) read_seq_tuple(seq_rel, &buf, &seqdatatuple); + + /* + * Copy the existing sequence tuple. + */ + tuple = heap_copytuple(&seqdatatuple); + + /* Now we're done with the old page */ + UnlockReleaseBuffer(buf); + + /* + * Modify the copied tuple to execute the restart (compare the RESTART + * action in AlterSequence) + */ + seq = (Form_pg_sequence_data) GETSTRUCT(tuple); + seq->last_value = last_value; + seq->is_called = is_called; + seq->log_cnt = log_cnt; + + /* + * Create a new storage file for the sequence. + */ + RelationSetNewRelfilenode(seq_rel, seq_rel->rd_rel->relpersistence); + + /* + * Ensure sequence's relfrozenxid is at 0, since it won't contain any + * unfrozen XIDs. Same with relminmxid, since a sequence will never + * contain multixacts. + */ + Assert(seq_rel->rd_rel->relfrozenxid == InvalidTransactionId); + Assert(seq_rel->rd_rel->relminmxid == InvalidMultiXactId); + + /* + * Insert the modified tuple into the new storage file. + * + * XXX Maybe this should also use created=true, just like the other places + * calling fill_seq_with_data. That's probably needed for correct cascading + * replication. + * + * XXX That'd mean all fill_seq_with_data callers use created=true, making + * the parameter unnecessary. + */ + fill_seq_with_data(seq_rel, tuple, false); /* Clear local cache so that we don't think we have cached numbers */ /* Note that we do not change the currval() state */ @@ -340,7 +419,7 @@ ResetSequence(Oid seq_relid) * Initialize a sequence's relation with the specified tuple as content */ static void -fill_seq_with_data(Relation rel, HeapTuple tuple) +fill_seq_with_data(Relation rel, HeapTuple tuple, bool create) { Buffer buf; Page page; @@ -378,8 +457,21 @@ fill_seq_with_data(Relation rel, HeapTuple tuple) /* check the comment above nextval_internal()'s equivalent call. */ if (RelationNeedsWAL(rel)) + { GetTopTransactionId(); + /* + * Ensure we have a proper XID, which will be included in the XLOG + * record by XLogRecordAssemble. Otherwise the first nextval() in + * a subxact (without any preceding changes) would get XID 0, + * and it'd be impossible to decide which top xact it belongs to. + * It'd also trigger assert in DecodeSequence. + * + * XXX Not sure if this is the best solution. + */ + GetCurrentTransactionId(); + } + START_CRIT_SECTION(); MarkBufferDirty(buf); @@ -399,6 +491,7 @@ fill_seq_with_data(Relation rel, HeapTuple tuple) XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); xlrec.node = rel->rd_node; + xlrec.created = create; XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) tuple->t_data, tuple->t_len); @@ -502,7 +595,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) /* * Insert the modified tuple into the new storage file. */ - fill_seq_with_data(seqrel, newdatatuple); + fill_seq_with_data(seqrel, newdatatuple, true); } /* process OWNED BY if given */ @@ -766,8 +859,21 @@ nextval_internal(Oid relid, bool check_permissions) * (Have to do that here, so we're outside the critical section) */ if (logit && RelationNeedsWAL(seqrel)) + { GetTopTransactionId(); + /* + * Ensure we have a proper XID, which will be included in the XLOG + * record by XLogRecordAssemble. Otherwise the first nextval() in + * a subxact (without any preceding changes) would get XID 0, + * and it'd be impossible to decide which top xact it belongs to. + * It'd also trigger assert in DecodeSequence. + * + * XXX Not sure if this is the best solution. + */ + GetCurrentTransactionId(); + } + /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); @@ -803,6 +909,7 @@ nextval_internal(Oid relid, bool check_permissions) seq->log_cnt = 0; xlrec.node = seqrel->rd_node; + xlrec.created = false; XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); @@ -977,8 +1084,21 @@ do_setval(Oid relid, int64 next, bool iscalled) /* check the comment above nextval_internal()'s equivalent call. */ if (RelationNeedsWAL(seqrel)) + { GetTopTransactionId(); + /* + * Ensure we have a proper XID, which will be included in the XLOG + * record by XLogRecordAssemble. Otherwise the first nextval() in + * a subxact (without any preceding changes) would get XID 0, + * and it'd be impossible to decide which top xact it belongs to. + * It'd also trigger assert in DecodeSequence. + * + * XXX Not sure if this is the best solution. + */ + GetCurrentTransactionId(); + } + /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); @@ -999,6 +1119,8 @@ do_setval(Oid relid, int64 next, bool iscalled) XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); xlrec.node = seqrel->rd_node; + xlrec.created = false; + XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 22ae982328..2d03cf011e 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -84,6 +84,7 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -521,6 +522,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, char *err; WalReceiverConn *wrconn; List *tables; + List *sequences; ListCell *lc; char table_state; @@ -559,6 +561,26 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, InvalidXLogRecPtr); } + /* + * Get the sequence list from publisher and build local sequence + * status info. + */ + sequences = fetch_sequence_list(wrconn, publications); + foreach(lc, sequences) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); + } + /* * If requested, create permanent slot for the subscription. We * won't use the initial snapshot for anything, so no need to @@ -731,6 +753,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) { Oid relid = subrel_local_oids[off]; + /* XXX ignore sequences - maybe do this in GetSubscriptionRelations? */ + if (get_rel_relkind(relid) == RELKIND_SEQUENCE) + continue; + if (!bsearch(&relid, pubrel_local_oids, list_length(pubrel_names), sizeof(Oid), oid_cmp)) { @@ -822,6 +848,183 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); } } + + /* + * XXX now do the same thing for sequences, maybe before the preceding + * block, or earlier? + */ + + /* Get the table list from publisher. */ + pubrel_names = fetch_sequence_list(wrconn, sub->publications); + + /* Get local table list. */ + subrel_states = GetSubscriptionRelations(sub->oid); + + /* + * Build qsorted array of local table oids for faster lookup. This can + * potentially contain all tables in the database so speed of lookup + * is important. + */ + subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + off = 0; + foreach(lc, subrel_states) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + + subrel_local_oids[off++] = relstate->relid; + } + qsort(subrel_local_oids, list_length(subrel_states), + sizeof(Oid), oid_cmp); + + /* + * Rels that we want to remove from subscription and drop any slots + * and origins corresponding to them. + */ + sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels)); + + /* + * Walk over the remote tables and try to match them to locally known + * tables. If the table is not known locally create a new state for + * it. + * + * Also builds array of local oids of remote tables for the next step. + */ + off = 0; + pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); + + foreach(lc, pubrel_names) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + pubrel_local_oids[off++] = relid; + + if (!bsearch(&relid, subrel_local_oids, + list_length(subrel_states), sizeof(Oid), oid_cmp)) + { + AddSubscriptionRelState(sub->oid, relid, + copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", + rv->schemaname, rv->relname, sub->name))); + } + } + + /* + * Next remove state for tables we should not care about anymore using + * the data we collected above + */ + qsort(pubrel_local_oids, list_length(pubrel_names), + sizeof(Oid), oid_cmp); + + remove_rel_len = 0; + for (off = 0; off < list_length(subrel_states); off++) + { + Oid relid = subrel_local_oids[off]; + + /* XXX ignore non-sequences - maybe do this in GetSubscriptionRelations? */ + if (get_rel_relkind(relid) != RELKIND_SEQUENCE) + continue; + + if (!bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), sizeof(Oid), oid_cmp)) + { + char state; + XLogRecPtr statelsn; + + /* + * Lock pg_subscription_rel with AccessExclusiveLock to + * prevent any race conditions with the apply worker + * re-launching workers at the same time this code is trying + * to remove those tables. + * + * Even if new worker for this particular rel is restarted it + * won't be able to make any progress as we hold exclusive + * lock on subscription_rel till the transaction end. It will + * simply exit as there is no corresponding rel entry. + * + * This locking also ensures that the state of rels won't + * change till we are done with this refresh operation. + */ + if (!rel) + rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock); + + /* Last known rel state. */ + state = GetSubscriptionRelState(sub->oid, relid, &statelsn); + + sub_remove_rels[remove_rel_len].relid = relid; + sub_remove_rels[remove_rel_len++].state = state; + + RemoveSubscriptionRel(sub->oid, relid); + + logicalrep_worker_stop(sub->oid, relid); + + /* + * For READY state, we would have already dropped the + * tablesync origin. + */ + if (state != SUBREL_STATE_READY) + { + char originname[NAMEDATALEN]; + + /* + * Drop the tablesync's origin tracking if exists. + * + * It is possible that the origin is not yet created for + * tablesync worker, this can happen for the states before + * SUBREL_STATE_FINISHEDCOPY. The apply worker can also + * concurrently try to drop the origin and by this time + * the origin might be already removed. For these reasons, + * passing missing_ok = true. + */ + ReplicationOriginNameForTablesync(sub->oid, relid, originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); + } + + ereport(DEBUG1, + (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name))); + } + } + + /* + * Drop the tablesync slots associated with removed tables. This has + * to be at the end because otherwise if there is an error while doing + * the database operations we won't be able to rollback dropped slots. + */ + for (off = 0; off < remove_rel_len; off++) + { + if (sub_remove_rels[off].state != SUBREL_STATE_READY && + sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) + { + char syncslotname[NAMEDATALEN] = {0}; + + /* + * For READY/SYNCDONE states we know the tablesync slot has + * already been dropped by the tablesync worker. + * + * For other states, there is no certainty, maybe the slot + * does not exist yet. Also, if we fail after removing some of + * the slots, next time, it will again try to drop already + * dropped slots and fail. For these reasons, we allow + * missing_ok = true for the drop. + */ + ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, + syncslotname, sizeof(syncslotname)); + ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); + } + } + } PG_FINALLY(); { @@ -1635,6 +1838,75 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } +/* + * Get the list of sequences which belong to specified publications on the + * publisher connection. + */ +static List * +fetch_sequence_list(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + ListCell *lc; + bool first; + List *tablelist = NIL; + + Assert(list_length(publications) > 0); + + initStringInfo(&cmd); + appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n" + " FROM pg_catalog.pg_publication_sequences s\n" + " WHERE s.pubname IN ("); + first = true; + foreach(lc, publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_literal_cstr(pubname)); + } + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of replicated tables from the publisher: %s", + res->err))); + + /* Process tables. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *nspname; + char *relname; + bool isnull; + RangeVar *rv; + + nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + rv = makeRangeVar(nspname, relname, -1); + tablelist = lappend(tablelist, rv); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + return tablelist; +} + /* * This is to report the connection failure while dropping replication slots. * Here, we report the WARNING for all tablesync slots so that user can drop diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 1e285e0349..a1c7880be6 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -619,11 +619,11 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, errdetail("\"%s.%s\" is a foreign table.", nspname, relname))); - if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) + if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && relkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", nspname, relname), - errdetail("\"%s.%s\" is not a table.", + errdetail("\"%s.%s\" is not a table or a sequence.", nspname, relname))); } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 9d4893c504..b8df89863d 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4834,8 +4834,10 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); COPY_NODE_FIELD(tables); + COPY_NODE_FIELD(sequences); COPY_SCALAR_FIELD(for_all_tables); - COPY_SCALAR_FIELD(tableAction); + COPY_SCALAR_FIELD(for_all_sequences); + COPY_SCALAR_FIELD(action); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index b9cc7b199c..9cce502876 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2308,8 +2308,10 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a, COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); COMPARE_NODE_FIELD(tables); + COMPARE_NODE_FIELD(sequences); COMPARE_SCALAR_FIELD(for_all_tables); - COMPARE_SCALAR_FIELD(tableAction); + COMPARE_SCALAR_FIELD(for_all_sequences); + COMPARE_SCALAR_FIELD(action); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 10da5c5c51..3926a7ee9f 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -434,6 +434,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> group_by_item empty_grouping_set rollup_clause cube_clause %type <node> grouping_sets_clause %type <node> opt_publication_for_tables publication_for_tables +/* %type <node> opt_publication_for_sequences publication_for_sequences */ %type <list> opt_fdw_options fdw_options %type <defelt> fdw_option @@ -9588,6 +9589,7 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec *****************************************************************************/ CreatePublicationStmt: + /* CREATE PUBLICATION name opt_publication_for_tables opt_publication_for_sequences opt_definition */ CREATE PUBLICATION name opt_publication_for_tables opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); @@ -9622,6 +9624,12 @@ publication_for_tables: } ; +/* + * FIXME + * + * opt_publication_for_sequences and publication_for_sequences should be + * copies for sequences + */ /***************************************************************************** * @@ -9633,6 +9641,12 @@ publication_for_tables: * * ALTER PUBLICATION name SET TABLE table [, table2] * + * ALTER PUBLICATION name ADD SEQUENCE sequence [, sequence2] + * + * ALTER PUBLICATION name DROP SEQUENCE sequence [, sequence2] + * + * ALTER PUBLICATION name SET SEQUENCE sequence [, sequence2] + * *****************************************************************************/ AlterPublicationStmt: @@ -9648,7 +9662,7 @@ AlterPublicationStmt: AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; n->tables = $6; - n->tableAction = DEFELEM_ADD; + n->action = DEFELEM_ADD; $$ = (Node *)n; } | ALTER PUBLICATION name SET TABLE relation_expr_list @@ -9656,7 +9670,7 @@ AlterPublicationStmt: AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; n->tables = $6; - n->tableAction = DEFELEM_SET; + n->action = DEFELEM_SET; $$ = (Node *)n; } | ALTER PUBLICATION name DROP TABLE relation_expr_list @@ -9664,7 +9678,31 @@ AlterPublicationStmt: AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; n->tables = $6; - n->tableAction = DEFELEM_DROP; + n->action = DEFELEM_DROP; + $$ = (Node *)n; + } + | ALTER PUBLICATION name ADD_P SEQUENCE relation_expr_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->sequences = $6; + n->action = DEFELEM_ADD; + $$ = (Node *)n; + } + | ALTER PUBLICATION name SET SEQUENCE relation_expr_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->sequences = $6; + n->action = DEFELEM_SET; + $$ = (Node *)n; + } + | ALTER PUBLICATION name DROP SEQUENCE relation_expr_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->sequences = $6; + n->action = DEFELEM_DROP; $$ = (Node *)n; } ; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 2874dc0612..98d0edefb0 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -42,6 +42,7 @@ #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" #include "storage/standby.h" +#include "commands/sequence.h" typedef struct XLogRecordBuffer { @@ -74,10 +75,11 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, bool two_phase); static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_prepare *parsed); - +static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); +static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple); /* helper functions for decoding transactions */ static inline bool FilterPrepare(LogicalDecodingContext *ctx, @@ -158,6 +160,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor DecodeLogicalMsgOp(ctx, &buf); break; + case RM_SEQ_ID: + DecodeSequence(ctx, &buf); + break; + /* * Rmgrs irrelevant for logical decoding; they describe stuff not * represented in logical decoding. Add new rmgrs in rmgrlist.h's @@ -173,7 +179,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_HASH_ID: case RM_GIN_ID: case RM_GIST_ID: - case RM_SEQ_ID: case RM_SPGIST_ID: case RM_BRIN_ID: case RM_COMMIT_TS_ID: @@ -1312,3 +1317,125 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || ctx->fast_forward || FilterByOrigin(ctx, origin_id)); } + +/* + * Decode Sequence Tuple + */ +static void +DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) +{ + int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader; + + Assert(datalen >= 0); + + tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;; + + ItemPointerSetInvalid(&tuple->tuple.t_self); + + tuple->tuple.t_tableOid = InvalidOid; + + memcpy(((char *) tuple->tuple.t_data), + data + sizeof(xl_seq_rec), + SizeofHeapTupleHeader); + + memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, + data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader, + datalen); +} + +/* + * Handle sequence decode + * + * Decoding sequences is a bit tricky, because while most sequence actions + * are non-transactional (not subject to rollback), some need to be handled + * as transactional. + * + * By default, a sequence increment is non-transactional - we must not queue + * it in a transaction as other changes, because the transaction might get + * rolled back and we'd discard the increment. The downstream would not be + * notified about the increment, which is wrong. + * + * On the other hand, the sequence may be created in a transaction. In this + * case we *should* queue the change as other changes in the transaction, + * because we don't want to send the increments for unknown sequence to the + * plugin - it might get confused about which sequence it's related to etc. + */ +static void +DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferTupleBuf *tuplebuf; + RelFileNode target_node; + XLogReaderState *r = buf->record; + char *tupledata = NULL; + Size tuplelen; + Size datalen = 0; + TransactionId xid = XLogRecGetXid(r); + uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; + xl_seq_rec *xlrec; + Snapshot snapshot; + RepOriginId origin_id = XLogRecGetOrigin(r); + bool transactional; + + /* only decode changes flagged with XLOG_SEQ_LOG */ + if (info != XLOG_SEQ_LOG) + elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info); + + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding messages. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) + return; + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + tupledata = XLogRecGetData(r); + datalen = XLogRecGetDataLen(r); + tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec); + + /* extract the WAL record, with "created" flag */ + xlrec = (xl_seq_rec *) XLogRecGetData(r); + + /* XXX how could we have sequence change without data? */ + if(!datalen || !tupledata) + return; + + tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + DecodeSeqTuple(tupledata, datalen, tuplebuf); + + /* + * Should we handle the sequence increment as transactional or not? + * + * If the sequence was created in a still-running transaction, treat + * it as transactional and queue the increments. Otherwise it needs + * to be treated as non-transactional, in which case we send it to + * the plugin right away. + */ + transactional = ReorderBufferSequenceIsTransactional(ctx->reorder, + target_node, + xlrec->created); + + /* Skip the change if already processed (per the snapshot). */ + if (transactional && + !SnapBuildProcessChange(builder, xid, buf->origptr)) + return; + else if (!transactional && + (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || + SnapBuildXactNeedsSkip(builder, buf->origptr))) + return; + + /* Queue the increment (or send immediately if not transactional). */ + snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr, + origin_id, target_node, transactional, + xlrec->created, tuplebuf); +} diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index d61ef4cfad..5f6e02fba3 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, bool created, + int64 last_value, int64 log_cnt, bool is_called); /* streaming callbacks */ static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -217,6 +221,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + ctx->reorder->sequence = sequence_cb_wrapper; /* * To support streaming, we require start/stop/abort/commit/change @@ -1198,6 +1203,43 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, bool created, + int64 last_value, int64 log_cnt, bool is_called) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + if (ctx->callbacks.sequence_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "sequence"; + state.report_location = sequence_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = sequence_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional, + created, last_value, log_cnt, is_called); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index a245252529..abe006817e 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -600,6 +600,58 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, pq_sendbytes(out, message, sz); } +/* + * Write SEQUENCE to stream + */ +void +logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid, + XLogRecPtr lsn, bool transactional, bool created, + int64 last_value, int64 log_cnt, bool is_called) +{ + uint8 flags = 0; + char *relname; + + pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE); + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + pq_sendint8(out, flags); + pq_sendint64(out, lsn); + + logicalrep_write_namespace(out, RelationGetNamespace(rel)); + relname = RelationGetRelationName(rel); + pq_sendstring(out, relname); + + pq_sendint8(out, transactional); + pq_sendint8(out, created); + pq_sendint64(out, last_value); + pq_sendint64(out, log_cnt); + pq_sendint8(out, is_called); +} + +/* + * Read SEQUENCE from the stream. + */ +void +logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata) +{ + /* XXX skipping flags and lsn */ + pq_getmsgint(in, 1); + pq_getmsgint64(in); + + /* Read relation name from stream */ + seqdata->nspname = pstrdup(logicalrep_read_namespace(in)); + seqdata->seqname = pstrdup(pq_getmsgstring(in)); + + seqdata->transactional = pq_getmsgint(in, 1); + seqdata->created = pq_getmsgint(in, 1); + seqdata->last_value = pq_getmsgint64(in); + seqdata->log_cnt = pq_getmsgint64(in); + seqdata->is_called = pq_getmsgint(in, 1); +} + /* * Write relation description to the output stream. */ diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7378beb684..f99e45f22d 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -116,6 +116,13 @@ typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXN *txn; } ReorderBufferTXNByIdEnt; +/* entry for hash table we use to track sequences created in running xacts */ +typedef struct ReorderBufferSequenceEnt +{ + RelFileNode rnode; + TransactionId xid; +} ReorderBufferSequenceEnt; + /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */ typedef struct ReorderBufferTupleCidKey { @@ -338,6 +345,14 @@ ReorderBufferAllocate(void) buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + /* hash table of sequences, mapping relfilenode to XID of transaction */ + hash_ctl.keysize = sizeof(RelFileNode); + hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt); + hash_ctl.hcxt = buffer->context; + + buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + buffer->by_txn_last_xid = InvalidTransactionId; buffer->by_txn_last_txn = NULL; @@ -528,6 +543,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_SEQUENCE: + if (change->data.sequence.tuple) + { + ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple); + change->data.sequence.tuple = NULL; + } + break; } pfree(change); @@ -856,6 +878,212 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } } +/* + * Treat the sequence increment as transactional? + */ +bool +ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, + RelFileNode rnode, bool created) +{ + bool found = false; + + if (created) + return true; + + hash_search(rb->sequences, + (void *) &rnode, + HASH_FIND, + &found); + + return found; +} + + +/* + * A transactional sequence increment is queued to be processed upon commit + * and a non-transactional increment gets processed immediately. + * + * A sequence update may be both transactional and non-transactional. When + * created in a running transaction, treat it as transactional and queue + * the change in it. Otherwise treat it as non-transactional, so that we + * don't forget the increment in case of a rollback. + */ +void +ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, + RelFileNode rnode, bool transactional, bool created, + ReorderBufferTupleBuf *tuplebuf) +{ + /* + * Change needs to be handled as transactional, because the sequence was + * created in a transaction that is still running. In that case all the + * changes need to be queued in that transaction, we must not send them + * to the downstream until the transaction commits. + * + * There's a bit of a trouble with subtransactions - we can't queue it + * into the subxact, because it might be rolled back and we'd lose the + * increment. We need to queue it into the same (sub)xact that created + * the sequence, which is why we track the XID in the hash table. + */ + if (transactional) + { + MemoryContext oldcontext; + ReorderBufferChange *change; + + /* lookup sequence by relfilenode */ + ReorderBufferSequenceEnt *ent; + bool found; + + /* transactional changes require a transaction */ + Assert(xid != InvalidTransactionId); + + /* search the lookup table (we ignore the return value, found is enough) */ + ent = hash_search(rb->sequences, + (void *) &rnode, + created ? HASH_ENTER : HASH_FIND, + &found); + + /* + * If this is the "create" increment, we must not have found any + * pre-existing entry in the hash table (i.e. there must not be + * any conflicting sequence). + */ + Assert(!(created && found)); + + /* But we must have either created or found an existing entry. */ + Assert(created || found); + + /* + * When creating the sequence, remember the XID of the transaction + * that created id. + */ + if (created) + ent->xid = xid; + + /* XXX Maybe check that we're still in the same top-level xact? */ + + /* OK, allocate and queue the change */ + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + + change->action = REORDER_BUFFER_CHANGE_SEQUENCE; + change->origin_id = origin_id; + + memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode)); + + change->data.sequence.created = created; + change->data.sequence.tuple = tuplebuf; + + /* add it to the same subxact that created the sequence */ + ReorderBufferQueueChange(rb, ent->xid, lsn, change, false); + + MemoryContextSwitchTo(oldcontext); + } + else + { + /* + * This increment is for a sequence that was not created in any + * running transaction, so we treat it as non-transactional and + * just send it to the output plugin directly. + */ + ReorderBufferTXN *txn = NULL; + volatile Snapshot snapshot_now = snapshot; + bool using_subtxn; + +#ifdef USE_ASSERT_CHECKING + /* All "creates" have to be handled as transactional. */ + Assert(!created); + + /* Make sure the sequence is not in the hash table. */ + { + bool found; + hash_search(rb->sequences, + (void *) &rnode, + HASH_FIND, &found); + Assert(!found); + } +#endif + + if (xid != InvalidTransactionId) + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* setup snapshot to allow catalog access */ + SetupHistoricSnapshot(snapshot_now, NULL); + + /* + * Decoding needs access to syscaches et al., which in turn use + * heavyweight locks and such. Thus we need to have enough state around to + * keep track of those. The easiest way is to simply use a transaction + * internally. That also allows us to easily enforce that nothing writes + * to the database by checking for xid assignments. + * + * When we're called via the SQL SRF there's already a transaction + * started, so start an explicit subtransaction there. + */ + using_subtxn = IsTransactionOrTransactionBlock(); + + PG_TRY(); + { + Relation relation; + HeapTuple tuple; + bool isnull; + int64 last_value, log_cnt; + bool is_called; + Oid reloid; + + if (using_subtxn) + BeginInternalSubTransaction("sequence"); + else + StartTransactionCommand(); + + reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode); + + if (reloid == InvalidOid) + elog(ERROR, "could not map filenode \"%s\" to relation OID", + relpathperm(rnode, + MAIN_FORKNUM)); + + relation = RelationIdGetRelation(reloid); + tuple = &tuplebuf->tuple; + + /* + * Extract the internal sequence values, describing the state. + * + * XXX Seems a bit strange to access it directly. Maybe there's + * a better / more correct way? + */ + last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull); + log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull); + is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull); + + rb->sequence(rb, txn, lsn, relation, transactional, created, + last_value, log_cnt, is_called); + + RelationClose(relation); + + TeardownHistoricSnapshot(false); + + AbortCurrentTransaction(); + + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + } + PG_CATCH(); + { + TeardownHistoricSnapshot(true); + + AbortCurrentTransaction(); + + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + + PG_RE_THROW(); + } + PG_END_TRY(); + } +} + /* * AssertTXNLsnOrder * Verify LSN ordering of transaction lists in the reorderbuffer @@ -1532,6 +1760,31 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) &found); Assert(found); + /* + * Remove sequences created in this transaction (if any). + * + * There's no way to search by XID, so we simply do a seqscan of all + * the entries in the hash table. Hopefully there are only a couple + * entries in most cases - people generally don't create many new + * sequences over and over. + */ + { + HASH_SEQ_STATUS scan_status; + ReorderBufferSequenceEnt *ent; + + hash_seq_init(&scan_status, rb->sequences); + while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL) + { + /* skip sequences not from this transaction */ + if (ent->xid != txn->xid) + continue; + + (void) hash_search(rb->sequences, + (void *) &(ent->rnode), + HASH_REMOVE, NULL); + } + } + /* remove entries spilled to disk */ if (rbtxn_is_serialized(txn)) ReorderBufferRestoreCleanup(rb, txn); @@ -1947,6 +2200,39 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message); } +/* + * Helper function for ReorderBufferProcessTXN for applying sequences. + */ +static inline void +ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change, + bool streaming) +{ + HeapTuple tuple; + bool isnull; + int64 last_value, log_cnt; + bool is_called; + + /* FIXME support streaming */ + Assert(!streaming); + + tuple = &change->data.sequence.tuple->tuple; + + /* + * Extract the internal sequence values, describing the state. + * + * XXX Seems a bit strange to access it directly. Maybe there's + * a better / more correct way? + */ + last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull); + log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull); + is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull); + + rb->sequence(rb, txn, change->lsn, relation, true, /* gotta be transactional */ + change->data.sequence.created, + last_value, log_cnt, is_called); +} + /* * Function to store the command id and snapshot at the end of the current * stream so that we can reuse the same while sending the next stream. @@ -2389,6 +2675,31 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; + + case REORDER_BUFFER_CHANGE_SEQUENCE: + Assert(snapshot_now); + + reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode, + change->data.sequence.relnode.relNode); + + if (reloid == InvalidOid) + elog(ERROR, "could not map filenode \"%s\" to relation OID", + relpathperm(change->data.sequence.relnode, + MAIN_FORKNUM)); + + relation = RelationIdGetRelation(reloid); + + if (!RelationIsValid(relation)) + elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", + reloid, + relpathperm(change->data.sequence.relnode, + MAIN_FORKNUM)); + + if (RelationIsLogicallyLogged(relation)) + ReorderBufferApplySequence(rb, txn, relation, change, streaming); + + RelationClose(relation); + break; } } @@ -3776,6 +4087,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, memcpy(data, change->data.truncate.relids, size); data += size; + break; + } + case REORDER_BUFFER_CHANGE_SEQUENCE: + { + char *data; + ReorderBufferTupleBuf *tup; + Size len = 0; + + tup = change->data.sequence.tuple; + + if (tup) + { + sz += sizeof(HeapTupleData); + len = tup->tuple.t_len; + sz += len; + } + + /* make sure we have enough space */ + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + if (len) + { + memcpy(data, &tup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, tup->tuple.t_data, len); + data += len; + } + break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -4040,6 +4384,22 @@ ReorderBufferChangeSize(ReorderBufferChange *change) { sz += sizeof(Oid) * change->data.truncate.nrelids; + break; + } + case REORDER_BUFFER_CHANGE_SEQUENCE: + { + ReorderBufferTupleBuf *tup; + Size len = 0; + + tup = change->data.sequence.tuple; + + if (tup) + { + sz += sizeof(HeapTupleData); + len = tup->tuple.t_len; + sz += len; + } + break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -4341,6 +4701,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } + + case REORDER_BUFFER_CHANGE_SEQUENCE: + if (change->data.sequence.tuple) + { + uint32 tuplelen = ((HeapTuple) data)->t_len; + + change->data.sequence.tuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); + + /* restore ->tuple */ + memcpy(&change->data.sequence.tuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ + change->data.sequence.tuple->tuple.t_data = + ReorderBufferTupleBufData(change->data.sequence.tuple); + + /* restore tuple data itself */ + memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen); + data += tuplelen; + } + break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f07983a43c..24369a1522 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/sequence.h" #include "miscadmin.h" #include "parser/parse_relation.h" #include "pgstat.h" @@ -357,6 +358,12 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * * If the synchronization position is reached (SYNCDONE), then the table can * be marked as READY and is no longer tracked. + * + * XXX This needs to handle sequences too - after AlterSubscription_refresh + * starts caring about sequences, GetSubscriptionNotReadyRelations won't + * return just tables, and we'll have to sync them here. Not sure it's worth + * creating a new "sync" worker per sequence, maybe we should just sync them + * in the current process (it's pretty light-weight). */ static void process_syncing_tables_for_apply(XLogRecPtr current_lsn) @@ -871,6 +878,99 @@ copy_table(Relation rel) logicalrep_rel_close(relmapentry, NoLock); } + + +/* + * FIXME add comment + */ +static void +fetch_sequence_data(char *nspname, char *relname, + int64 *last_value, int64 *log_cnt, bool *is_called) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID}; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n" + " FROM %s", quote_qualified_identifier(nspname, relname)); + + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of replicated tables from the publisher: %s", + res->err))); + + /* Process the sequence. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + + *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + +/* + * Copy existing data of a sequence from publisher. + * + * Caller is responsible for locking the local relation. + */ +static void +copy_sequence(Relation rel) +{ + LogicalRepRelMapEntry *relmapentry; + LogicalRepRelation lrel; + StringInfoData cmd; + int64 last_value = 0, + log_cnt = 0; + bool is_called = 0; + + /* Get the publisher relation info. */ + fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel), &lrel); + + /* Put the relation into relmap. */ + logicalrep_relmap_update(&lrel); + + /* Map the publisher relation to local one. */ + relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); + Assert(rel == relmapentry->localrel); + + /* Start copy on the publisher. */ + initStringInfo(&cmd); + + Assert(lrel.relkind == RELKIND_SEQUENCE); + + fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called); + + elog(WARNING, "sequence %s info last_value %ld log_cnt %ld is_called %d", + quote_qualified_identifier(lrel.nspname, lrel.relname), + last_value, log_cnt, is_called); + + ResetSequence2(RelationGetRelid(rel), last_value, log_cnt, is_called); + + logicalrep_rel_close(relmapentry, NoLock); +} + + + + /* * Determine the tablesync slot name. * @@ -1106,10 +1206,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) originname))); } - /* Now do the initial data copy */ - PushActiveSnapshot(GetTransactionSnapshot()); - copy_table(rel); - PopActiveSnapshot(); + if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE) + { + /* Now do the initial sequence copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_sequence(rel); + PopActiveSnapshot(); + } + else + { + /* Now do the initial data copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_table(rel); + PopActiveSnapshot(); + } res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b9a7a7ffbb..1776b2d5f8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -144,6 +144,7 @@ #include "catalog/pg_tablespace.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" +#include "commands/sequence.h" #include "commands/trigger.h" #include "executor/executor.h" #include "executor/execPartition.h" @@ -1060,6 +1061,57 @@ apply_handle_origin(StringInfo s) errmsg_internal("ORIGIN message sent out of order"))); } +/* + * Handle SEQUENCE message. + */ +static void +apply_handle_sequence(StringInfo s) +{ + LogicalRepSequence seq; + Oid relid; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s)) + return; + + logicalrep_read_sequence(s, &seq); + + /* + * Non-transactional sequence updates should not be part of a remote + * transaction. There should not be any running transaction. + */ + Assert((!seq.transactional) || in_remote_transaction); + Assert(!(!seq.transactional && in_remote_transaction)); + Assert(!(!seq.transactional && IsTransactionState())); + + /* + * Make sure we're in a transaction (needed by ResetSequence2). For + * non-transactional updates we're guaranteed to start a new one, + * and we'll commit it at the end. + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + maybe_reread_subscription(); + } + + relid = RangeVarGetRelid(makeRangeVar(seq.nspname, + seq.seqname, -1), + RowExclusiveLock, false); + + elog(WARNING, "applying sequence transactional %d created %d last_value %ld log_cnt %ld is_called %d", + seq.transactional, seq.created, seq.last_value, seq.log_cnt, seq.is_called); + + /* apply the sequence change */ + ResetSequence2(relid, seq.last_value, seq.log_cnt, seq.is_called); + + /* + * Commit the per-stream transaction (we only do this when not in + * remote transaction, i.e. for non-transactional sequence updates. + */ + if (!in_remote_transaction) + CommitTransactionCommand(); +} + /* * Handle STREAM START message. */ @@ -2302,6 +2354,10 @@ apply_dispatch(StringInfo s) */ return; + case LOGICAL_REP_MSG_SEQUENCE: + apply_handle_sequence(s); + return; + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); return; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index e4314af13a..4366c275dc 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -49,6 +49,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pgoutput_sequence(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, + Relation rel, bool transactional, bool created, + int64 last_value, int64 log_cnt, bool is_called); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, @@ -157,6 +161,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->message_cb = pgoutput_message; + cb->sequence_cb = pgoutput_sequence; cb->commit_cb = pgoutput_commit_txn; cb->begin_prepare_cb = pgoutput_begin_prepare_txn; @@ -186,6 +191,7 @@ parse_output_parameters(List *options, PGOutputData *data) bool publication_names_given = false; bool binary_option_given = false; bool messages_option_given = false; + bool sequences_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; @@ -193,6 +199,7 @@ parse_output_parameters(List *options, PGOutputData *data) data->streaming = false; data->messages = false; data->two_phase = false; + data->sequences = true; foreach(lc, options) { @@ -258,6 +265,16 @@ parse_output_parameters(List *options, PGOutputData *data) data->messages = defGetBoolean(defel); } + else if (strcmp(defel->defname, "sequences") == 0) + { + if (sequences_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + sequences_option_given = true; + + data->sequences = defGetBoolean(defel); + } else if (strcmp(defel->defname, "streaming") == 0) { if (streaming_given) @@ -865,6 +882,47 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +static void +pgoutput_sequence(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, + Relation rel, bool transactional, bool created, + int64 last_value, int64 log_cnt, bool is_called) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + TransactionId xid = InvalidTransactionId; + RelationSyncEntry *relentry; + + if (!data->sequences) + return; + + if (!is_publishable_relation(rel)) + return; + + /* XXX handle (in_streaming) here */ + + relentry = get_rel_sync_entry(data, RelationGetRelid(rel)); + + /* + * First check the sequence filter. + * + * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here. + */ + if (!relentry->pubactions.pubsequence) + return; + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_sequence(ctx->out, + rel, + xid, + sequence_lsn, + transactional, + created, + last_value, + log_cnt, + is_called); + OutputPluginWrite(ctx, true); +} + /* * Currently we always forward. */ @@ -1128,7 +1186,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->streamed_txns = NIL; entry->replicate_valid = false; entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = + entry->pubactions.pubsequence = false; entry->publish_as_relid = InvalidOid; entry->map = NULL; /* will be set by maybe_send_schema() if * needed */ @@ -1140,6 +1199,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) List *pubids = GetRelationPublications(relid); ListCell *lc; Oid publish_as_relid = relid; + bool is_sequence = (get_rel_relkind(relid) == RELKIND_SEQUENCE); /* Reload publications if needed before use. */ if (!publications_valid) @@ -1163,12 +1223,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Publication *pub = lfirst(lc); bool publish = false; - if (pub->alltables) + if (pub->alltables && (!is_sequence)) { publish = true; if (pub->pubviaroot && am_partition) publish_as_relid = llast_oid(get_partition_ancestors(relid)); } + else if (pub->allsequences && is_sequence) + { + publish = true; + } + + /* if a sequence, just cross-check the list of publications */ + if (!publish && is_sequence) + { + if (list_member_oid(pubids, pub->oid)) + publish = true; + } if (!publish) { @@ -1219,10 +1290,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + entry->pubactions.pubsequence |= pub->pubactions.pubsequence; } if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) + entry->pubactions.pubdelete && entry->pubactions.pubtruncate && + entry->pubactions.pubsequence) break; } @@ -1367,6 +1440,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; + entry->pubactions.pubsequence = false; } } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 13d9994af3..3dd39054e6 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5495,6 +5495,7 @@ GetRelationPublicationActions(Relation relation) pubactions->pubupdate |= pubform->pubupdate; pubactions->pubdelete |= pubform->pubdelete; pubactions->pubtruncate |= pubform->pubtruncate; + pubactions->pubsequence |= pubform->pubsequence; ReleaseSysCache(tup); @@ -5503,7 +5504,8 @@ GetRelationPublicationActions(Relation relation) * other publications. */ if (pubactions->pubinsert && pubactions->pubupdate && - pubactions->pubdelete && pubactions->pubtruncate) + pubactions->pubdelete && pubactions->pubtruncate && + pubactions->pubsequence) break; } diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index d6bf725971..6f77354644 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1640,7 +1640,7 @@ psql_completion(const char *text, int start, int end) /* ALTER PUBLICATION <name> */ else if (Matches("ALTER", "PUBLICATION", MatchAny)) - COMPLETE_WITH("ADD TABLE", "DROP TABLE", "OWNER TO", "RENAME TO", "SET"); + COMPLETE_WITH("ADD TABLE", "DROP TABLE", "ADD SEQUENCE", "DROP SEQUENCE", "OWNER TO", "RENAME TO", "SET"); /* ALTER PUBLICATION <name> SET */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET")) COMPLETE_WITH("(", "TABLE"); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 8cd0252082..8700a72dea 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11445,6 +11445,11 @@ provolatile => 's', prorettype => 'oid', proargtypes => 'text', proallargtypes => '{text,oid}', proargmodes => '{i,o}', proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' }, +{ oid => '8000', descr => 'get OIDs of sequences in a publication', + proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't', + provolatile => 's', prorettype => 'oid', proargtypes => 'text', + proallargtypes => '{text,oid}', proargmodes => '{i,o}', + proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' }, { oid => '6121', descr => 'returns whether a relation can be part of a publication', proname => 'pg_relation_is_publishable', provolatile => 's', diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index f332bad4d4..150b8922cf 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -40,6 +40,12 @@ CATALOG(pg_publication,6104,PublicationRelationId) */ bool puballtables; + /* + * indicates that this is special publication which should encompass all + * sequences in the database (except for the unlogged and temp ones) + */ + bool puballsequences; + /* true if inserts are published */ bool pubinsert; @@ -52,6 +58,9 @@ CATALOG(pg_publication,6104,PublicationRelationId) /* true if truncates are published */ bool pubtruncate; + /* true if sequences are published */ + bool pubsequence; + /* true if partition changes are published using root schema */ bool pubviaroot; } FormData_pg_publication; @@ -72,6 +81,7 @@ typedef struct PublicationActions bool pubupdate; bool pubdelete; bool pubtruncate; + bool pubsequence; } PublicationActions; typedef struct Publication @@ -79,6 +89,7 @@ typedef struct Publication Oid oid; char *name; bool alltables; + bool allsequences; bool pubviaroot; PublicationActions pubactions; } Publication; @@ -107,6 +118,9 @@ extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern List *GetAllSequencesPublicationRelations(void); +extern List *GetPublicationSequenceRelations(Oid pubid); + extern bool is_publishable_relation(Relation rel); extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists); diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 40544dd4c7..c28e8695cb 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -48,6 +48,7 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data; typedef struct xl_seq_rec { RelFileNode node; + bool created; /* is this a CREATE SEQUENCE */ /* SEQUENCE TUPLE DATA FOLLOWS AT THE END */ } xl_seq_rec; @@ -59,6 +60,7 @@ extern ObjectAddress DefineSequence(ParseState *pstate, CreateSeqStmt *stmt); extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt); extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); +extern void ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called); extern void ResetSequenceCaches(void); extern void seq_redo(XLogReaderState *rptr); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index def9651b34..aa602deae5 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3643,8 +3643,10 @@ typedef struct AlterPublicationStmt /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ List *tables; /* List of tables to add/drop */ + List *sequences; /* List of sequences to add/drop */ bool for_all_tables; /* Special publication for all tables in db */ - DefElemAction tableAction; /* What action to perform with the tables */ + bool for_all_sequences; /* Special publication for all tables in db */ + DefElemAction action; /* What action to perform with the tables/sequences */ } AlterPublicationStmt; typedef struct CreateSubscriptionStmt diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 63de90d94a..931bc0722a 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -60,6 +60,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', + LOGICAL_REP_MSG_SEQUENCE = 'X', /* FIXME change */ LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', LOGICAL_REP_MSG_PREPARE = 'P', LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', @@ -116,6 +117,19 @@ typedef struct LogicalRepTyp char *typname; /* name of the remote type */ } LogicalRepTyp; +/* Sequence info */ +typedef struct LogicalRepSequence +{ + Oid remoteid; /* unique id of the remote sequence */ + char *nspname; /* schema name of remote sequence */ + char *seqname; /* name of the remote sequence */ + bool transactional; + bool created; + int64 last_value; + int64 log_cnt; /* XXX probably not needed? */ + bool is_called; /* XXX probably not needed? */ +} LogicalRepSequence; + /* Transaction info */ typedef struct LogicalRepBeginData { @@ -223,6 +237,12 @@ extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); +extern void logicalrep_write_sequence(StringInfo out, Relation rel, + TransactionId xid, XLogRecPtr lsn, + bool transactional, bool created, + int64 last_value, int64 log_cnt, + bool is_called); +extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 810495ed0e..e3e1b03f32 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -88,6 +88,19 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, Size message_size, const char *message); +/* + * Called for the generic logical decoding sequences. + */ +typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + bool created, + int64 last_value, + int64 log_cnt, + bool is_called); + /* * Filter changes by origin. */ @@ -219,6 +232,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeSequenceCB sequence_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 0dc460fb70..97eb2a7ef1 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -28,6 +28,7 @@ typedef struct PGOutputData bool streaming; bool messages; bool two_phase; + bool sequences; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 5b40ff75f7..4512ed679b 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -64,7 +64,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, - REORDER_BUFFER_CHANGE_TRUNCATE + REORDER_BUFFER_CHANGE_TRUNCATE, + REORDER_BUFFER_CHANGE_SEQUENCE }; /* forward declaration */ @@ -158,6 +159,14 @@ typedef struct ReorderBufferChange uint32 ninvalidations; /* Number of messages */ SharedInvalidationMessage *invalidations; /* invalidation message */ } inval; + + /* Context data for Sequence changes */ + struct + { + RelFileNode relnode; + bool created; + ReorderBufferTupleBuf *tuple; + } sequence; } data; /* @@ -430,6 +439,15 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* sequence callback signature */ +typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, bool created, + int64 last_value, int64 log_cnt, + bool is_called); + /* begin prepare callback signature */ typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -511,6 +529,12 @@ struct ReorderBuffer */ HTAB *by_txn; + /* + * relfilenode => XID lookup table for sequences created in a transaction + * (also includes altered sequences, which assigns new relfilenode) + */ + HTAB *sequences; + /* * Transactions that could be a toplevel xact, ordered by LSN of the first * record bearing that xid. @@ -541,6 +565,7 @@ struct ReorderBuffer ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + ReorderBufferSequenceCB sequence; /* * Callbacks to be called when streaming a transaction at prepare time. @@ -635,6 +660,10 @@ void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, + RelFileNode rnode, bool transactional, bool created, + ReorderBufferTupleBuf *tuplebuf); void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); @@ -682,4 +711,7 @@ void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); void StartupReorderBuffer(void); +bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, + RelFileNode rnode, bool created); + #endif diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index b5b065a1b6..963ce8d1df 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -160,8 +160,8 @@ DROP TABLE testpub_parted1; DROP PUBLICATION testpub_forparted, testpub_forparted1; -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; -ERROR: "testpub_view" is not a table -DETAIL: Only tables can be added to publications. +ERROR: "testpub_view" is not a table or sequence +DETAIL: Only tables and sequences can be added to publications. SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1, pub_test.testpub_nopk; RESET client_min_messages; @@ -182,8 +182,8 @@ Tables: -- fail - view ALTER PUBLICATION testpub_default ADD TABLE testpub_view; -ERROR: "testpub_view" is not a table -DETAIL: Only tables can be added to publications. +ERROR: "testpub_view" is not a table or sequence +DETAIL: Only tables and sequences can be added to publications. ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e5ab11275d..393be871be 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1451,6 +1451,14 @@ pg_prepared_xacts| SELECT p.transaction, FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid) LEFT JOIN pg_authid u ON ((p.ownerid = u.oid))) LEFT JOIN pg_database d ON ((p.dbid = d.oid))); +pg_publication_sequences| SELECT p.pubname, + n.nspname AS schemaname, + c.relname AS sequencename + FROM pg_publication p, + LATERAL pg_get_publication_sequences((p.pubname)::text) gpt(relid), + (pg_class c + JOIN pg_namespace n ON ((n.oid = c.relnamespace))) + WHERE (c.oid = gpt.relid); pg_publication_tables| SELECT p.pubname, n.nspname AS schemaname, c.relname AS tablename -- 2.31.1