Here's a rebased version of the patch, no other changes.

I think the crucial aspect of this that needs discussion/feedback the
most is the transactional vs. non-transactional behavior. All the other
questions are less important / cosmetic.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
>From f61f2cabf6221451af8f427a5d4b4f7f4583e618 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Tue, 20 Jul 2021 23:20:10 +0200
Subject: [PATCH] Logical decoding / replication of sequences

---
 contrib/test_decoding/Makefile                |   3 +-
 contrib/test_decoding/expected/sequence.out   | 327 +++++++++++++++
 contrib/test_decoding/sql/sequence.sql        | 119 ++++++
 contrib/test_decoding/test_decoding.c         |  40 ++
 doc/src/sgml/catalogs.sgml                    |  71 ++++
 doc/src/sgml/ref/alter_publication.sgml       |  28 +-
 doc/src/sgml/ref/alter_subscription.sgml      |   6 +-
 src/backend/catalog/pg_publication.c          | 160 +++++++-
 src/backend/catalog/system_views.sql          |  10 +
 src/backend/commands/publicationcmds.c        | 232 ++++++++++-
 src/backend/commands/sequence.c               | 132 +++++-
 src/backend/commands/subscriptioncmds.c       | 272 +++++++++++++
 src/backend/executor/execReplication.c        |   4 +-
 src/backend/nodes/copyfuncs.c                 |   4 +-
 src/backend/nodes/equalfuncs.c                |   4 +-
 src/backend/parser/gram.y                     |  44 +-
 src/backend/replication/logical/decode.c      | 131 +++++-
 src/backend/replication/logical/logical.c     |  42 ++
 src/backend/replication/logical/proto.c       |  52 +++
 .../replication/logical/reorderbuffer.c       | 384 ++++++++++++++++++
 src/backend/replication/logical/tablesync.c   | 118 +++++-
 src/backend/replication/logical/worker.c      |  56 +++
 src/backend/replication/pgoutput/pgoutput.c   |  80 +++-
 src/backend/utils/cache/relcache.c            |   4 +-
 src/bin/psql/tab-complete.c                   |   2 +-
 src/include/catalog/pg_proc.dat               |   5 +
 src/include/catalog/pg_publication.h          |  14 +
 src/include/commands/sequence.h               |   2 +
 src/include/nodes/parsenodes.h                |   4 +-
 src/include/replication/logicalproto.h        |  20 +
 src/include/replication/output_plugin.h       |  14 +
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/reorderbuffer.h       |  34 +-
 src/test/regress/expected/publication.out     |   8 +-
 src/test/regress/expected/rules.out           |   8 +
 35 files changed, 2389 insertions(+), 46 deletions(-)
 create mode 100644 contrib/test_decoding/expected/sequence.out
 create mode 100644 contrib/test_decoding/sql/sequence.sql

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 9a31e0b879..56ddc3abae 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
 
 REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	decoding_into_rel binary prepared replorigin time messages \
-	spill slot truncate stream stats twophase twophase_stream
+	spill slot truncate stream stats twophase twophase_stream \
+	sequence
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
 	twophase_snapshot
diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out
new file mode 100644
index 0000000000..07f3e3e92c
--- /dev/null
+++ b/contrib/test_decoding/expected/sequence.out
@@ -0,0 +1,327 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE SEQUENCE test_sequence;
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       3
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       4
+(1 row)
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+      14
+(1 row)
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    4000
+(1 row)
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, true);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, false);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3500
+(1 row)
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                                 data                                                 
+------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 4, log_cnt: 0 is_called: 1
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 334, log_cnt: 0 is_called: 1
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 14, log_cnt: 32 is_called: 1
+ COMMIT
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 14, log_cnt: 0 is_called: 1
+ COMMIT
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 4000, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 4320, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3830, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3830, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 0
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3820, log_cnt: 0 is_called: 1
+(24 rows)
+
+DROP SEQUENCE test_sequence;
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       3
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3001
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3002
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3003
+(1 row)
+
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    6001
+(1 row)
+
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data 
+------
+(0 rows)
+
+-- rollback on table creation with serial column 
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data 
+------
+(0 rows)
+
+-- rollback on table with serial column 
+CREATE TABLE test_table (a SERIAL, b INT);
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+-- check table and sequence values after rollback 
+SELECT * from test_table_a_seq;
+ last_value | log_cnt | is_called 
+------------+---------+-----------
+       3003 |      30 | t
+(1 row)
+
+SELECT nextval('test_table_a_seq');
+ nextval 
+---------
+    3004
+(1 row)
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                                  data                                                   
+---------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_table_a_seq transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 3000, log_cnt: 0 is_called: 1
+ sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 3033, log_cnt: 0 is_called: 1
+ BEGIN
+ COMMIT
+(8 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                                 data                                                  
+-------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_table_a_seq transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ sequence: public.test_table_a_seq transactional: 1 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ table public.test_table: INSERT: a[integer]:1 b[integer]:100
+ table public.test_table: INSERT: a[integer]:2 b[integer]:200
+ COMMIT
+(6 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3001
+(1 row)
+
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ setval 
+--------
+   5000
+(1 row)
+
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+ last_value | log_cnt | is_called 
+------------+---------+-----------
+       3001 |      32 | t
+(1 row)
+
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                                 data                                                 
+------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ sequence: public.test_sequence transactional: 1 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 1 created: 0 last_value: 3000, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 1 created: 0 last_value: 3033, log_cnt: 0 is_called: 1
+ COMMIT
+(6 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql
new file mode 100644
index 0000000000..d8a34738f3
--- /dev/null
+++ b/contrib/test_decoding/sql/sequence.sql
@@ -0,0 +1,119 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE SEQUENCE test_sequence;
+
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, true);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, false);
+SELECT nextval('test_sequence');
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+DROP SEQUENCE test_sequence;
+
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table creation with serial column 
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table with serial column 
+CREATE TABLE test_table (a SERIAL, b INT);
+
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+
+-- check table and sequence values after rollback 
+SELECT * from test_table_a_seq;
+SELECT nextval('test_table_a_seq');
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index e5cd84e85e..54d734cb85 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -35,6 +35,7 @@ typedef struct
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
 	bool		only_local;
+	bool		skip_sequences;
 } TestDecodingData;
 
 /*
@@ -76,6 +77,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
+static void pg_decode_sequence(LogicalDecodingContext *ctx,
+							  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+							  Relation rel, bool transactional, bool created,
+							  int64 last_value, int64 log_cnt, bool is_called);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
 									 TransactionId xid,
 									 const char *gid);
@@ -141,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+	cb->sequence_cb = pg_decode_sequence;
 	cb->filter_prepare_cb = pg_decode_filter_prepare;
 	cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
 	cb->prepare_cb = pg_decode_prepare_txn;
@@ -175,6 +181,9 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->skip_empty_xacts = false;
 	data->only_local = false;
 
+	/* skip sequences by default for backwards compatibility */
+	data->skip_sequences = true;
+
 	ctx->output_plugin_private = data;
 
 	opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
@@ -265,6 +274,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "skip-sequences") == 0)
+		{
+
+			if (elem->arg == NULL)
+				continue;	/* true by default */
+			else if (!parse_bool(strVal(elem->arg), &data->skip_sequences))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -744,6 +764,26 @@ pg_decode_message(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				   XLogRecPtr sequence_lsn, Relation rel,
+				   bool transactional, bool created,
+				   int64 last_value, int64 log_cnt, bool is_called)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+	char	   *nspname = get_namespace_name(RelationGetNamespace(rel));
+
+	/* return if requested to skip_sequences */
+	if (data->skip_sequences)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "sequence: %s.%s transactional: %d created: %d last_value: %zu, log_cnt: %zu is_called: %d",
+					 nspname, RelationGetRelationName(rel),
+					 transactional, created, last_value, log_cnt, is_called);
+	OutputPluginWrite(ctx, true);
+}
+
 static void
 pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 2b2c70a26e..a342a54629 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9430,6 +9430,11 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       <entry>prepared transactions</entry>
      </row>
 
+     <row>
+      <entry><link linkend="view-pg-publication-sequences"><structname>pg_publication_sequences</structname></link></entry>
+      <entry>publications and their associated sequences</entry>
+     </row>
+
      <row>
       <entry><link linkend="view-pg-publication-tables"><structname>pg_publication_tables</structname></link></entry>
       <entry>publications and their associated tables</entry>
@@ -11264,6 +11269,72 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
  </sect1>
 
+ <sect1 id="view-pg-publication-sequences">
+  <title><structname>pg_publication_sequences</structname></title>
+
+  <indexterm zone="view-pg-publication-sequences">
+   <primary>pg_publication_sequences</primary>
+  </indexterm>
+
+  <para>
+   The view <structname>pg_publication_sequences</structname> provides
+   information about the mapping between publications and the sequences they
+   contain.  Unlike the underlying catalog
+   <link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>,
+   this view expands
+   publications defined as <literal>FOR ALL SEQUENCES</literal>, so for such
+   publications there will be a row for each eligible sequence.
+  </para>
+
+  <table>
+   <title><structname>pg_publication_sequences</structname> Columns</title>
+   <tgroup cols="1">
+    <thead>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       Column Type
+      </para>
+      <para>
+       Description
+      </para></entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>pubname</structfield> <type>name</type>
+       (references <link linkend="catalog-pg-publication"><structname>pg_publication</structname></link>.<structfield>pubname</structfield>)
+      </para>
+      <para>
+       Name of publication
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>schemaname</structfield> <type>name</type>
+       (references <link linkend="catalog-pg-namespace"><structname>pg_namespace</structname></link>.<structfield>nspname</structfield>)
+      </para>
+      <para>
+       Name of schema containing sequence
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>sequencename</structfield> <type>name</type>
+       (references <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>relname</structfield>)
+      </para>
+      <para>
+       Name of sequence
+      </para></entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+ </sect1>
+
  <sect1 id="view-pg-publication-tables">
   <title><structname>pg_publication_tables</structname></title>
 
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index faa114b2c6..c68a1573de 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -24,6 +24,9 @@ PostgreSQL documentation
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
@@ -50,7 +53,18 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
   </para>
 
   <para>
-   The fourth variant of this command listed in the synopsis can change
+   The next three variants change which sequences are part of the publication.
+   The <literal>SET SEQUENCE</literal> clause will replace the list of sequences
+   in the publication with the specified one.  The <literal>ADD SEQUENCE</literal>
+   and <literal>DROP SEQUENCE</literal> clauses will add and remove one or more
+   sequences from the publication.  Note that adding sequences to a publication
+   that is already subscribed to will require a <literal>ALTER SUBSCRIPTION
+   ... REFRESH PUBLICATION</literal> action on the subscribing side in order
+   to become effective.
+  </para>
+
+  <para>
+   The seventh variant of this command listed in the synopsis can change
    all of the publication properties specified in
    <xref linkend="sql-createpublication"/>.  Properties not mentioned in the
    command retain their previous settings.
@@ -62,7 +76,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
 
   <para>
    You must own the publication to use <command>ALTER PUBLICATION</command>.
-   Adding a table to a publication additionally requires owning that table.
+   Adding a table to a publication additionally requires owning that table,
+   and the same requirement applies to sequences.
    To alter the owner, you must also be a direct or indirect member of the new
    owning role. The new owner must have <literal>CREATE</literal> privilege on
    the database.  Also, the new owner of a <literal>FOR ALL TABLES</literal>
@@ -97,6 +112,15 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><replaceable class="parameter">sequence_name</replaceable></term>
+    <listitem>
+     <para>
+      Name of an existing sequence.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term>
     <listitem>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index a6f994450d..ff1dcd3bfd 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -144,8 +144,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     <term><literal>REFRESH PUBLICATION</literal></term>
     <listitem>
      <para>
-      Fetch missing table information from publisher.  This will start
-      replication of tables that were added to the subscribed-to publications
+      Fetch missing table and sequence information from publisher.  This will start
+      replication of tables and sequences that were added to the subscribed-to publications
       since the last invocation of <command>REFRESH PUBLICATION</command> or
       since <command>CREATE SUBSCRIPTION</command>.
      </para>
@@ -162,7 +162,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
           Specifies whether the existing data in the publications that are
           being subscribed to should be copied once the replication starts.
           The default is <literal>true</literal>.  (Previously subscribed
-          tables are not copied.)
+          tables and sequences are not copied.)
          </para>
         </listitem>
        </varlistentry>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 36bfff9706..79f45d892e 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -51,26 +51,27 @@ check_publication_add_relation(Relation targetrel)
 {
 	/* Must be a regular or partitioned table */
 	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
-		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
+		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE &&
+		RelationGetForm(targetrel)->relkind != RELKIND_SEQUENCE)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("\"%s\" is not a table",
+				 errmsg("\"%s\" is not a table or sequence",
 						RelationGetRelationName(targetrel)),
-				 errdetail("Only tables can be added to publications.")));
+				 errdetail("Only tables and sequences can be added to publications.")));
 
-	/* Can't be system table */
+	/* Can't be system table/sequence */
 	if (IsCatalogRelation(targetrel))
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("\"%s\" is a system table",
+				 errmsg("\"%s\" is a system table or sequence",
 						RelationGetRelationName(targetrel)),
-				 errdetail("System tables cannot be added to publications.")));
+				 errdetail("System tables / sequences cannot be added to publications.")));
 
 	/* UNLOGGED and TEMP relations cannot be part of publication. */
 	if (!RelationIsPermanent(targetrel))
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("table \"%s\" cannot be replicated",
+				 errmsg("table or sequence \"%s\" cannot be replicated",
 						RelationGetRelationName(targetrel)),
 				 errdetail("Temporary and unlogged relations cannot be replicated.")));
 }
@@ -98,7 +99,8 @@ static bool
 is_publishable_class(Oid relid, Form_pg_class reltuple)
 {
 	return (reltuple->relkind == RELKIND_RELATION ||
-			reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
+			reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
+			reltuple->relkind == RELKIND_SEQUENCE) &&
 		!IsCatalogRelationOid(relid) &&
 		reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
 		relid >= FirstNormalObjectId;
@@ -271,6 +273,10 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 
 		pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
 
+		/* skip sequences here */
+		if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+			continue;
+
 		if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
 			pub_partopt != PUBLICATION_PART_ROOT)
 		{
@@ -304,6 +310,49 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 	return result;
 }
 
+/*
+ * Gets list of relation oids for a publication (sequences only).
+ *
+ * This should only be used for normal publications, the FOR ALL TABLES
+ * should use GetAllSequencesPublicationRelations().
+ */
+List *
+GetPublicationSequenceRelations(Oid pubid)
+{
+	List	   *result;
+	Relation	pubrelsrel;
+	ScanKeyData scankey;
+	SysScanDesc scan;
+	HeapTuple	tup;
+
+	/* Find all publications associated with the relation. */
+	pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&scankey,
+				Anum_pg_publication_rel_prpubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(pubid));
+
+	scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
+							  true, NULL, 1, &scankey);
+
+	result = NIL;
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_publication_rel pubrel;
+
+		pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+		if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+			result = lappend_oid(result, pubrel->prrelid);
+	}
+
+	systable_endscan(scan);
+	table_close(pubrelsrel, AccessShareLock);
+
+	return result;
+}
+
 /*
  * Gets list of publication oids for publications marked as FOR ALL TABLES.
  */
@@ -404,6 +453,46 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 	return result;
 }
 
+/*
+ * Gets list of all relation published by FOR ALL TABLES publication(s).
+ *
+ * If the publication publishes partition changes via their respective root
+ * partitioned tables, we must exclude partitions in favor of including the
+ * root partitioned tables.
+ */
+List *
+GetAllSequencesPublicationRelations(void)
+{
+	Relation	classRel;
+	ScanKeyData key[1];
+	TableScanDesc scan;
+	HeapTuple	tuple;
+	List	   *result = NIL;
+
+	classRel = table_open(RelationRelationId, AccessShareLock);
+
+	ScanKeyInit(&key[0],
+				Anum_pg_class_relkind,
+				BTEqualStrategyNumber, F_CHAREQ,
+				CharGetDatum(RELKIND_SEQUENCE));
+
+	scan = table_beginscan_catalog(classRel, 1, key);
+
+	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+	{
+		Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+		Oid			relid = relForm->oid;
+
+		if (is_publishable_class(relid, relForm))
+			result = lappend_oid(result, relid);
+	}
+
+	table_endscan(scan);
+
+	table_close(classRel, AccessShareLock);
+	return result;
+}
+
 /*
  * Get publication using oid
  *
@@ -426,10 +515,12 @@ GetPublication(Oid pubid)
 	pub->oid = pubid;
 	pub->name = pstrdup(NameStr(pubform->pubname));
 	pub->alltables = pubform->puballtables;
+	pub->allsequences = pubform->puballsequences;
 	pub->pubactions.pubinsert = pubform->pubinsert;
 	pub->pubactions.pubupdate = pubform->pubupdate;
 	pub->pubactions.pubdelete = pubform->pubdelete;
 	pub->pubactions.pubtruncate = pubform->pubtruncate;
+	pub->pubactions.pubsequence = pubform->pubsequence;
 	pub->pubviaroot = pubform->pubviaroot;
 
 	ReleaseSysCache(tup);
@@ -555,3 +646,56 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 
 	SRF_RETURN_DONE(funcctx);
 }
+
+/*
+ * Returns Oids of sequences in a publication.
+ */
+Datum
+pg_get_publication_sequences(PG_FUNCTION_ARGS)
+{
+	FuncCallContext *funcctx;
+	char	   *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	Publication *publication;
+	List	   *sequences;
+
+	/* stuff done only on the first call of the function */
+	if (SRF_IS_FIRSTCALL())
+	{
+		MemoryContext oldcontext;
+
+		/* create a function context for cross-call persistence */
+		funcctx = SRF_FIRSTCALL_INIT();
+
+		/* switch to memory context appropriate for multiple function calls */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		publication = GetPublicationByName(pubname, false);
+
+		/*
+		 * Publications support partitioned tables, although all changes are
+		 * replicated using leaf partition identity and schema, so we only
+		 * need those.
+		 */
+		if (publication->allsequences)
+			sequences = GetAllSequencesPublicationRelations();
+		else
+			sequences = GetPublicationSequenceRelations(publication->oid);
+
+		funcctx->user_fctx = (void *) sequences;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	/* stuff done on every call of the function */
+	funcctx = SRF_PERCALL_SETUP();
+	sequences = (List *) funcctx->user_fctx;
+
+	if (funcctx->call_cntr < list_length(sequences))
+	{
+		Oid			relid = list_nth_oid(sequences, funcctx->call_cntr);
+
+		SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 55f6e3711d..9bec49bc3e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -372,6 +372,16 @@ CREATE VIEW pg_publication_tables AS
          pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
     WHERE C.oid = GPT.relid;
 
+CREATE VIEW pg_publication_sequences AS
+    SELECT
+        P.pubname AS pubname,
+        N.nspname AS schemaname,
+        C.relname AS sequencename
+    FROM pg_publication P,
+         LATERAL pg_get_publication_sequences(P.pubname) GPT,
+         pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
+    WHERE C.oid = GPT.relid;
+
 CREATE VIEW pg_locks AS
     SELECT * FROM pg_lock_status() AS L;
 
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 8487eeb7e6..90ad83d4a1 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -16,6 +16,7 @@
 
 #include "access/genam.h"
 #include "access/htup_details.h"
+#include "access/relation.h"
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
@@ -54,6 +55,12 @@ static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 								 AlterPublicationStmt *stmt);
 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
 
+static List *OpenSequenceList(List *sequences);
+static void CloseSequenceList(List *rels);
+static void PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+								 AlterPublicationStmt *stmt);
+static void PublicationDropSequences(Oid pubid, List *rels, bool missing_ok);
+
 static void
 parse_publication_options(ParseState *pstate,
 						  List *options,
@@ -72,6 +79,7 @@ parse_publication_options(ParseState *pstate,
 	pubactions->pubupdate = true;
 	pubactions->pubdelete = true;
 	pubactions->pubtruncate = true;
+	pubactions->pubsequence = true;
 	*publish_via_partition_root = false;
 
 	/* Parse options */
@@ -96,6 +104,7 @@ parse_publication_options(ParseState *pstate,
 			pubactions->pubupdate = false;
 			pubactions->pubdelete = false;
 			pubactions->pubtruncate = false;
+			pubactions->pubsequence = false;
 
 			*publish_given = true;
 			publish = defGetString(defel);
@@ -118,6 +127,8 @@ parse_publication_options(ParseState *pstate,
 					pubactions->pubdelete = true;
 				else if (strcmp(publish_opt, "truncate") == 0)
 					pubactions->pubtruncate = true;
+				else if (strcmp(publish_opt, "sequence") == 0)
+					pubactions->pubsequence = true;
 				else
 					ereport(ERROR,
 							(errcode(ERRCODE_SYNTAX_ERROR),
@@ -208,6 +219,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 		BoolGetDatum(pubactions.pubdelete);
 	values[Anum_pg_publication_pubtruncate - 1] =
 		BoolGetDatum(pubactions.pubtruncate);
+	values[Anum_pg_publication_pubsequence - 1] =
+		BoolGetDatum(pubactions.pubsequence);
 	values[Anum_pg_publication_pubviaroot - 1] =
 		BoolGetDatum(publish_via_partition_root);
 
@@ -373,9 +386,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	rels = OpenTableList(stmt->tables);
 
-	if (stmt->tableAction == DEFELEM_ADD)
+	if (stmt->action == DEFELEM_ADD)
 		PublicationAddTables(pubid, rels, false, stmt);
-	else if (stmt->tableAction == DEFELEM_DROP)
+	else if (stmt->action == DEFELEM_DROP)
 		PublicationDropTables(pubid, rels, false);
 	else						/* DEFELEM_SET */
 	{
@@ -426,6 +439,82 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 	CloseTableList(rels);
 }
 
+/*
+ * Add or remove sequence to/from publication.
+ */
+static void
+AlterPublicationSequences(AlterPublicationStmt *stmt, Relation rel,
+						  HeapTuple tup)
+{
+	List	   *rels = NIL;
+	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+	Oid			pubid = pubform->oid;
+
+	/* Check that user is allowed to manipulate the publication tables. */
+	if (pubform->puballsequences)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
+						NameStr(pubform->pubname)),
+				 errdetail("Sequences cannot be added to or dropped from FOR ALL TABLES publications.")));
+
+	Assert(list_length(stmt->sequences) > 0);
+
+	rels = OpenSequenceList(stmt->sequences);
+
+	if (stmt->action == DEFELEM_ADD)
+		PublicationAddSequences(pubid, rels, false, stmt);
+	else if (stmt->action == DEFELEM_DROP)
+		PublicationDropSequences(pubid, rels, false);
+	else						/* DEFELEM_SET */
+	{
+		List	   *oldrelids = GetPublicationRelations(pubid,
+														PUBLICATION_PART_ROOT);
+		List	   *delrels = NIL;
+		ListCell   *oldlc;
+
+		/* Calculate which relations to drop. */
+		foreach(oldlc, oldrelids)
+		{
+			Oid			oldrelid = lfirst_oid(oldlc);
+			ListCell   *newlc;
+			bool		found = false;
+
+			foreach(newlc, rels)
+			{
+				Relation	newrel = (Relation) lfirst(newlc);
+
+				if (RelationGetRelid(newrel) == oldrelid)
+				{
+					found = true;
+					break;
+				}
+			}
+
+			if (!found)
+			{
+				Relation	oldrel = relation_open(oldrelid,
+												ShareUpdateExclusiveLock);
+
+				delrels = lappend(delrels, oldrel);
+			}
+		}
+
+		/* And drop them. */
+		PublicationDropSequences(pubid, delrels, true);
+
+		/*
+		 * Don't bother calculating the difference for adding, we'll catch and
+		 * skip existing ones when doing catalog update.
+		 */
+		PublicationAddSequences(pubid, rels, true, stmt);
+
+		CloseSequenceList(delrels);
+	}
+
+	CloseSequenceList(rels);
+}
+
 /*
  * Alter the existing publication.
  *
@@ -459,8 +548,12 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
 
 	if (stmt->options)
 		AlterPublicationOptions(pstate, stmt, rel, tup);
-	else
+	else if (stmt->tables)
 		AlterPublicationTables(stmt, rel, tup);
+	else if (stmt->sequences)
+		AlterPublicationSequences(stmt, rel, tup);
+	else
+		Assert(false);
 
 	/* Cleanup. */
 	heap_freetuple(tup);
@@ -665,6 +758,139 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 	}
 }
 
+/*
+ * Open relations specified by a RangeVar list.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
+ */
+static List *
+OpenSequenceList(List *sequences)
+{
+	List	   *relids = NIL;
+	List	   *rels = NIL;
+	ListCell   *lc;
+
+	/*
+	 * Open, share-lock, and check all the explicitly-specified relations
+	 */
+	foreach(lc, sequences)
+	{
+		RangeVar   *rv = castNode(RangeVar, lfirst(lc));
+		Relation	rel;
+		Oid			myrelid;
+
+		/* Allow query cancel in case this takes a long time */
+		CHECK_FOR_INTERRUPTS();
+
+		rel = relation_openrv(rv, ShareUpdateExclusiveLock);
+		myrelid = RelationGetRelid(rel);
+
+		/*
+		 * Filter out duplicates if user specifies "foo, foo".
+		 *
+		 * Note that this algorithm is known to not be very efficient (O(N^2))
+		 * but given that it only works on list of sequences given to us by user
+		 * it's deemed acceptable.
+		 */
+		if (list_member_oid(relids, myrelid))
+		{
+			relation_close(rel, ShareUpdateExclusiveLock);
+			continue;
+		}
+
+		rels = lappend(rels, rel);
+		relids = lappend_oid(relids, myrelid);
+	}
+
+	list_free(relids);
+
+	return rels;
+}
+
+/*
+ * Close all relations in the list.
+ */
+static void
+CloseSequenceList(List *rels)
+{
+	ListCell   *lc;
+
+	foreach(lc, rels)
+	{
+		Relation	rel = (Relation) lfirst(lc);
+
+		relation_close(rel, NoLock);
+	}
+}
+
+/*
+ * Add listed tables to the publication.
+ */
+static void
+PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+						AlterPublicationStmt *stmt)
+{
+	ListCell   *lc;
+
+	Assert(!stmt || !stmt->for_all_sequences);
+
+	foreach(lc, rels)
+	{
+		Relation	rel = (Relation) lfirst(lc);
+		ObjectAddress obj;
+
+		/* Must be owner of the table or superuser. */
+		if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
+						   RelationGetRelationName(rel));
+
+		obj = publication_add_relation(pubid, rel, if_not_exists);
+		if (stmt)
+		{
+			EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+											 (Node *) stmt);
+
+			InvokeObjectPostCreateHook(PublicationRelRelationId,
+									   obj.objectId, 0);
+		}
+	}
+}
+
+/*
+ * Remove listed sequences from the publication.
+ */
+static void
+PublicationDropSequences(Oid pubid, List *rels, bool missing_ok)
+{
+	ObjectAddress obj;
+	ListCell   *lc;
+	Oid			prid;
+
+	foreach(lc, rels)
+	{
+		Relation	rel = (Relation) lfirst(lc);
+		Oid			relid = RelationGetRelid(rel);
+
+		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
+							   ObjectIdGetDatum(relid),
+							   ObjectIdGetDatum(pubid));
+		if (!OidIsValid(prid))
+		{
+			if (missing_ok)
+				continue;
+
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+					 errmsg("relation \"%s\" is not part of the publication",
+							RelationGetRelationName(rel))));
+		}
+
+		ObjectAddressSet(obj, PublicationRelRelationId, prid);
+		performDeletion(&obj, DROP_CASCADE, 0);
+	}
+}
+
+
 /*
  * Internal workhorse for changing a publication owner
  */
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 72bfdc07a4..2fc644c74c 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -94,7 +94,7 @@ static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */
  */
 static SeqTableData *last_used_seq = NULL;
 
-static void fill_seq_with_data(Relation rel, HeapTuple tuple);
+static void fill_seq_with_data(Relation rel, HeapTuple tuple, bool create);
 static Relation lock_and_open_sequence(SeqTable seq);
 static void create_seq_hashtable(void);
 static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
@@ -222,7 +222,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
 
 	/* now initialize the sequence's data */
 	tuple = heap_form_tuple(tupDesc, value, null);
-	fill_seq_with_data(rel, tuple);
+	fill_seq_with_data(rel, tuple, true);
 
 	/* process OWNED BY if given */
 	if (owned_by)
@@ -327,7 +327,86 @@ ResetSequence(Oid seq_relid)
 	/*
 	 * Insert the modified tuple into the new storage file.
 	 */
-	fill_seq_with_data(seq_rel, tuple);
+	fill_seq_with_data(seq_rel, tuple, true);
+
+	/* Clear local cache so that we don't think we have cached numbers */
+	/* Note that we do not change the currval() state */
+	elm->cached = elm->last;
+
+	relation_close(seq_rel, NoLock);
+}
+
+/*
+ * Reset a sequence to its initial value.
+ *
+ * The change is made transactionally, so that on failure of the current
+ * transaction, the sequence will be restored to its previous state.
+ * We do that by creating a whole new relfilenode for the sequence; so this
+ * works much like the rewriting forms of ALTER TABLE.
+ *
+ * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
+ * which must not be released until end of transaction.  Caller is also
+ * responsible for permissions checking.
+ */
+void
+ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
+{
+	Relation	seq_rel;
+	SeqTable	elm;
+	Form_pg_sequence_data seq;
+	Buffer		buf;
+	HeapTupleData seqdatatuple;
+	HeapTuple	tuple;
+
+	/*
+	 * Read the old sequence.  This does a bit more work than really
+	 * necessary, but it's simple, and we do want to double-check that it's
+	 * indeed a sequence.
+	 */
+	init_sequence(seq_relid, &elm, &seq_rel);
+	(void) read_seq_tuple(seq_rel, &buf, &seqdatatuple);
+
+	/*
+	 * Copy the existing sequence tuple.
+	 */
+	tuple = heap_copytuple(&seqdatatuple);
+
+	/* Now we're done with the old page */
+	UnlockReleaseBuffer(buf);
+
+	/*
+	 * Modify the copied tuple to execute the restart (compare the RESTART
+	 * action in AlterSequence)
+	 */
+	seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+	seq->last_value = last_value;
+	seq->is_called = is_called;
+	seq->log_cnt = log_cnt;
+
+	/*
+	 * Create a new storage file for the sequence.
+	 */
+	RelationSetNewRelfilenode(seq_rel, seq_rel->rd_rel->relpersistence);
+
+	/*
+	 * Ensure sequence's relfrozenxid is at 0, since it won't contain any
+	 * unfrozen XIDs.  Same with relminmxid, since a sequence will never
+	 * contain multixacts.
+	 */
+	Assert(seq_rel->rd_rel->relfrozenxid == InvalidTransactionId);
+	Assert(seq_rel->rd_rel->relminmxid == InvalidMultiXactId);
+
+	/*
+	 * Insert the modified tuple into the new storage file.
+	 *
+	 * XXX Maybe this should also use created=true, just like the other places
+	 * calling fill_seq_with_data. That's probably needed for correct cascading
+	 * replication.
+	 *
+	 * XXX That'd mean all fill_seq_with_data callers use created=true, making
+	 * the parameter unnecessary.
+	 */
+	fill_seq_with_data(seq_rel, tuple, false);
 
 	/* Clear local cache so that we don't think we have cached numbers */
 	/* Note that we do not change the currval() state */
@@ -340,7 +419,7 @@ ResetSequence(Oid seq_relid)
  * Initialize a sequence's relation with the specified tuple as content
  */
 static void
-fill_seq_with_data(Relation rel, HeapTuple tuple)
+fill_seq_with_data(Relation rel, HeapTuple tuple, bool create)
 {
 	Buffer		buf;
 	Page		page;
@@ -378,8 +457,21 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
 
 	/* check the comment above nextval_internal()'s equivalent call. */
 	if (RelationNeedsWAL(rel))
+	{
 		GetTopTransactionId();
 
+		/*
+		 * Ensure we have a proper XID, which will be included in the XLOG
+		 * record by XLogRecordAssemble. Otherwise the first nextval() in
+		 * a subxact (without any preceding changes) would get XID 0,
+		 * and it'd be impossible to decide which top xact it belongs to.
+		 * It'd also trigger assert in DecodeSequence.
+		 *
+		 * XXX Not sure if this is the best solution.
+		 */
+		GetCurrentTransactionId();
+	}
+
 	START_CRIT_SECTION();
 
 	MarkBufferDirty(buf);
@@ -399,6 +491,7 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.node = rel->rd_node;
+		xlrec.created = create;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) tuple->t_data, tuple->t_len);
@@ -502,7 +595,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
 		/*
 		 * Insert the modified tuple into the new storage file.
 		 */
-		fill_seq_with_data(seqrel, newdatatuple);
+		fill_seq_with_data(seqrel, newdatatuple, true);
 	}
 
 	/* process OWNED BY if given */
@@ -766,8 +859,21 @@ nextval_internal(Oid relid, bool check_permissions)
 	 * (Have to do that here, so we're outside the critical section)
 	 */
 	if (logit && RelationNeedsWAL(seqrel))
+	{
 		GetTopTransactionId();
 
+		/*
+		 * Ensure we have a proper XID, which will be included in the XLOG
+		 * record by XLogRecordAssemble. Otherwise the first nextval() in
+		 * a subxact (without any preceding changes) would get XID 0,
+		 * and it'd be impossible to decide which top xact it belongs to.
+		 * It'd also trigger assert in DecodeSequence.
+		 *
+		 * XXX Not sure if this is the best solution.
+		 */
+		GetCurrentTransactionId();
+	}
+
 	/* ready to change the on-disk (or really, in-buffer) tuple */
 	START_CRIT_SECTION();
 
@@ -803,6 +909,7 @@ nextval_internal(Oid relid, bool check_permissions)
 		seq->log_cnt = 0;
 
 		xlrec.node = seqrel->rd_node;
+		xlrec.created = false;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -977,8 +1084,21 @@ do_setval(Oid relid, int64 next, bool iscalled)
 
 	/* check the comment above nextval_internal()'s equivalent call. */
 	if (RelationNeedsWAL(seqrel))
+	{
 		GetTopTransactionId();
 
+		/*
+		 * Ensure we have a proper XID, which will be included in the XLOG
+		 * record by XLogRecordAssemble. Otherwise the first nextval() in
+		 * a subxact (without any preceding changes) would get XID 0,
+		 * and it'd be impossible to decide which top xact it belongs to.
+		 * It'd also trigger assert in DecodeSequence.
+		 *
+		 * XXX Not sure if this is the best solution.
+		 */
+		GetCurrentTransactionId();
+	}
+
 	/* ready to change the on-disk (or really, in-buffer) tuple */
 	START_CRIT_SECTION();
 
@@ -999,6 +1119,8 @@ do_setval(Oid relid, int64 next, bool iscalled)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.node = seqrel->rd_node;
+		xlrec.created = false;
+
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 22ae982328..2d03cf011e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -84,6 +84,7 @@ typedef struct SubOpts
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -521,6 +522,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		char	   *err;
 		WalReceiverConn *wrconn;
 		List	   *tables;
+		List	   *sequences;
 		ListCell   *lc;
 		char		table_state;
 
@@ -559,6 +561,26 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 										InvalidXLogRecPtr);
 			}
 
+			/*
+			 * Get the sequence list from publisher and build local sequence
+			 * status info.
+			 */
+			sequences = fetch_sequence_list(wrconn, publications);
+			foreach(lc, sequences)
+			{
+				RangeVar   *rv = (RangeVar *) lfirst(lc);
+				Oid			relid;
+
+				relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+				/* Check for supported relkind. */
+				CheckSubscriptionRelkind(get_rel_relkind(relid),
+										 rv->schemaname, rv->relname);
+
+				AddSubscriptionRelState(subid, relid, table_state,
+										InvalidXLogRecPtr);
+			}
+
 			/*
 			 * If requested, create permanent slot for the subscription. We
 			 * won't use the initial snapshot for anything, so no need to
@@ -731,6 +753,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		{
 			Oid			relid = subrel_local_oids[off];
 
+			/* XXX ignore sequences - maybe do this in GetSubscriptionRelations? */
+			if (get_rel_relkind(relid) == RELKIND_SEQUENCE)
+				continue;
+
 			if (!bsearch(&relid, pubrel_local_oids,
 						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
 			{
@@ -822,6 +848,183 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
 			}
 		}
+
+		/*
+		 * XXX now do the same thing for sequences, maybe before the preceding
+		 * block, or earlier?
+		 */
+
+		/* Get the table list from publisher. */
+		pubrel_names = fetch_sequence_list(wrconn, sub->publications);
+
+		/* Get local table list. */
+		subrel_states = GetSubscriptionRelations(sub->oid);
+
+		/*
+		 * Build qsorted array of local table oids for faster lookup. This can
+		 * potentially contain all tables in the database so speed of lookup
+		 * is important.
+		 */
+		subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+		off = 0;
+		foreach(lc, subrel_states)
+		{
+			SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+
+			subrel_local_oids[off++] = relstate->relid;
+		}
+		qsort(subrel_local_oids, list_length(subrel_states),
+			  sizeof(Oid), oid_cmp);
+
+		/*
+		 * Rels that we want to remove from subscription and drop any slots
+		 * and origins corresponding to them.
+		 */
+		sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+
+		/*
+		 * Walk over the remote tables and try to match them to locally known
+		 * tables. If the table is not known locally create a new state for
+		 * it.
+		 *
+		 * Also builds array of local oids of remote tables for the next step.
+		 */
+		off = 0;
+		pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+		foreach(lc, pubrel_names)
+		{
+			RangeVar   *rv = (RangeVar *) lfirst(lc);
+			Oid			relid;
+
+			relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+			/* Check for supported relkind. */
+			CheckSubscriptionRelkind(get_rel_relkind(relid),
+									 rv->schemaname, rv->relname);
+
+			pubrel_local_oids[off++] = relid;
+
+			if (!bsearch(&relid, subrel_local_oids,
+						 list_length(subrel_states), sizeof(Oid), oid_cmp))
+			{
+				AddSubscriptionRelState(sub->oid, relid,
+										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+										InvalidXLogRecPtr);
+				ereport(DEBUG1,
+						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
+										 rv->schemaname, rv->relname, sub->name)));
+			}
+		}
+
+		/*
+		 * Next remove state for tables we should not care about anymore using
+		 * the data we collected above
+		 */
+		qsort(pubrel_local_oids, list_length(pubrel_names),
+			  sizeof(Oid), oid_cmp);
+
+		remove_rel_len = 0;
+		for (off = 0; off < list_length(subrel_states); off++)
+		{
+			Oid			relid = subrel_local_oids[off];
+
+			/* XXX ignore non-sequences - maybe do this in GetSubscriptionRelations? */
+			if (get_rel_relkind(relid) != RELKIND_SEQUENCE)
+				continue;
+
+			if (!bsearch(&relid, pubrel_local_oids,
+						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
+			{
+				char		state;
+				XLogRecPtr	statelsn;
+
+				/*
+				 * Lock pg_subscription_rel with AccessExclusiveLock to
+				 * prevent any race conditions with the apply worker
+				 * re-launching workers at the same time this code is trying
+				 * to remove those tables.
+				 *
+				 * Even if new worker for this particular rel is restarted it
+				 * won't be able to make any progress as we hold exclusive
+				 * lock on subscription_rel till the transaction end. It will
+				 * simply exit as there is no corresponding rel entry.
+				 *
+				 * This locking also ensures that the state of rels won't
+				 * change till we are done with this refresh operation.
+				 */
+				if (!rel)
+					rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+				/* Last known rel state. */
+				state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+
+				sub_remove_rels[remove_rel_len].relid = relid;
+				sub_remove_rels[remove_rel_len++].state = state;
+
+				RemoveSubscriptionRel(sub->oid, relid);
+
+				logicalrep_worker_stop(sub->oid, relid);
+
+				/*
+				 * For READY state, we would have already dropped the
+				 * tablesync origin.
+				 */
+				if (state != SUBREL_STATE_READY)
+				{
+					char		originname[NAMEDATALEN];
+
+					/*
+					 * Drop the tablesync's origin tracking if exists.
+					 *
+					 * It is possible that the origin is not yet created for
+					 * tablesync worker, this can happen for the states before
+					 * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+					 * concurrently try to drop the origin and by this time
+					 * the origin might be already removed. For these reasons,
+					 * passing missing_ok = true.
+					 */
+					ReplicationOriginNameForTablesync(sub->oid, relid, originname,
+													  sizeof(originname));
+					replorigin_drop_by_name(originname, true, false);
+				}
+
+				ereport(DEBUG1,
+						(errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+										 get_namespace_name(get_rel_namespace(relid)),
+										 get_rel_name(relid),
+										 sub->name)));
+			}
+		}
+
+		/*
+		 * Drop the tablesync slots associated with removed tables. This has
+		 * to be at the end because otherwise if there is an error while doing
+		 * the database operations we won't be able to rollback dropped slots.
+		 */
+		for (off = 0; off < remove_rel_len; off++)
+		{
+			if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
+				sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+			{
+				char		syncslotname[NAMEDATALEN] = {0};
+
+				/*
+				 * For READY/SYNCDONE states we know the tablesync slot has
+				 * already been dropped by the tablesync worker.
+				 *
+				 * For other states, there is no certainty, maybe the slot
+				 * does not exist yet. Also, if we fail after removing some of
+				 * the slots, next time, it will again try to drop already
+				 * dropped slots and fail. For these reasons, we allow
+				 * missing_ok = true for the drop.
+				 */
+				ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
+												syncslotname, sizeof(syncslotname));
+				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+			}
+		}
+
 	}
 	PG_FINALLY();
 	{
@@ -1635,6 +1838,75 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	return tablelist;
 }
 
+/*
+ * Get the list of sequences which belong to specified publications on the
+ * publisher connection.
+ */
+static List *
+fetch_sequence_list(WalReceiverConn *wrconn, List *publications)
+{
+	WalRcvExecResult *res;
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	ListCell   *lc;
+	bool		first;
+	List	   *tablelist = NIL;
+
+	Assert(list_length(publications) > 0);
+
+	initStringInfo(&cmd);
+	appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n"
+						   "  FROM pg_catalog.pg_publication_sequences s\n"
+						   " WHERE s.pubname IN (");
+	first = true;
+	foreach(lc, publications)
+	{
+		char	   *pubname = strVal(lfirst(lc));
+
+		if (first)
+			first = false;
+		else
+			appendStringInfoString(&cmd, ", ");
+
+		appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+	}
+	appendStringInfoChar(&cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	pfree(cmd.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not receive list of replicated tables from the publisher: %s",
+						res->err)));
+
+	/* Process tables. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *nspname;
+		char	   *relname;
+		bool		isnull;
+		RangeVar   *rv;
+
+		nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+		relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		rv = makeRangeVar(nspname, relname, -1);
+		tablelist = lappend(tablelist, rv);
+
+		ExecClearTuple(slot);
+	}
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+
+	return tablelist;
+}
+
 /*
  * This is to report the connection failure while dropping replication slots.
  * Here, we report the WARNING for all tablesync slots so that user can drop
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 1e285e0349..a1c7880be6 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -619,11 +619,11 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
 				 errdetail("\"%s.%s\" is a foreign table.",
 						   nspname, relname)));
 
-	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && relkind != RELKIND_SEQUENCE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
 						nspname, relname),
-				 errdetail("\"%s.%s\" is not a table.",
+				 errdetail("\"%s.%s\" is not a table or a sequence.",
 						   nspname, relname)));
 }
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 9d4893c504..b8df89863d 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4834,8 +4834,10 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from)
 	COPY_STRING_FIELD(pubname);
 	COPY_NODE_FIELD(options);
 	COPY_NODE_FIELD(tables);
+	COPY_NODE_FIELD(sequences);
 	COPY_SCALAR_FIELD(for_all_tables);
-	COPY_SCALAR_FIELD(tableAction);
+	COPY_SCALAR_FIELD(for_all_sequences);
+	COPY_SCALAR_FIELD(action);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index b9cc7b199c..9cce502876 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2308,8 +2308,10 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a,
 	COMPARE_STRING_FIELD(pubname);
 	COMPARE_NODE_FIELD(options);
 	COMPARE_NODE_FIELD(tables);
+	COMPARE_NODE_FIELD(sequences);
 	COMPARE_SCALAR_FIELD(for_all_tables);
-	COMPARE_SCALAR_FIELD(tableAction);
+	COMPARE_SCALAR_FIELD(for_all_sequences);
+	COMPARE_SCALAR_FIELD(action);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 10da5c5c51..3926a7ee9f 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -434,6 +434,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
 %type <node>	opt_publication_for_tables publication_for_tables
+/* %type <node>	opt_publication_for_sequences publication_for_sequences */
 
 %type <list>	opt_fdw_options fdw_options
 %type <defelt>	fdw_option
@@ -9588,6 +9589,7 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec
  *****************************************************************************/
 
 CreatePublicationStmt:
+			/* CREATE PUBLICATION name opt_publication_for_tables opt_publication_for_sequences opt_definition */
 			CREATE PUBLICATION name opt_publication_for_tables opt_definition
 				{
 					CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
@@ -9622,6 +9624,12 @@ publication_for_tables:
 				}
 		;
 
+/*
+ * FIXME
+ *
+ * opt_publication_for_sequences and publication_for_sequences should be
+ * copies for sequences
+ */
 
 /*****************************************************************************
  *
@@ -9633,6 +9641,12 @@ publication_for_tables:
  *
  * ALTER PUBLICATION name SET TABLE table [, table2]
  *
+ * ALTER PUBLICATION name ADD SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name DROP SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name SET SEQUENCE sequence [, sequence2]
+ *
  *****************************************************************************/
 
 AlterPublicationStmt:
@@ -9648,7 +9662,7 @@ AlterPublicationStmt:
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
 					n->tables = $6;
-					n->tableAction = DEFELEM_ADD;
+					n->action = DEFELEM_ADD;
 					$$ = (Node *)n;
 				}
 			| ALTER PUBLICATION name SET TABLE relation_expr_list
@@ -9656,7 +9670,7 @@ AlterPublicationStmt:
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
 					n->tables = $6;
-					n->tableAction = DEFELEM_SET;
+					n->action = DEFELEM_SET;
 					$$ = (Node *)n;
 				}
 			| ALTER PUBLICATION name DROP TABLE relation_expr_list
@@ -9664,7 +9678,31 @@ AlterPublicationStmt:
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
 					n->tables = $6;
-					n->tableAction = DEFELEM_DROP;
+					n->action = DEFELEM_DROP;
+					$$ = (Node *)n;
+				}
+			| ALTER PUBLICATION name ADD_P SEQUENCE relation_expr_list
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					n->pubname = $3;
+					n->sequences = $6;
+					n->action = DEFELEM_ADD;
+					$$ = (Node *)n;
+				}
+			| ALTER PUBLICATION name SET SEQUENCE relation_expr_list
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					n->pubname = $3;
+					n->sequences = $6;
+					n->action = DEFELEM_SET;
+					$$ = (Node *)n;
+				}
+			| ALTER PUBLICATION name DROP SEQUENCE relation_expr_list
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					n->pubname = $3;
+					n->sequences = $6;
+					n->action = DEFELEM_DROP;
 					$$ = (Node *)n;
 				}
 		;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 2874dc0612..98d0edefb0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -42,6 +42,7 @@
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
+#include "commands/sequence.h"
 
 typedef struct XLogRecordBuffer
 {
@@ -74,10 +75,11 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						bool two_phase);
 static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						  xl_xact_parsed_prepare *parsed);
-
+static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
+static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
 
 /* helper functions for decoding transactions */
 static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -158,6 +160,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			DecodeLogicalMsgOp(ctx, &buf);
 			break;
 
+		case RM_SEQ_ID:
+			DecodeSequence(ctx, &buf);
+			break;
+
 			/*
 			 * Rmgrs irrelevant for logical decoding; they describe stuff not
 			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -173,7 +179,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 		case RM_HASH_ID:
 		case RM_GIN_ID:
 		case RM_GIST_ID:
-		case RM_SEQ_ID:
 		case RM_SPGIST_ID:
 		case RM_BRIN_ID:
 		case RM_COMMIT_TS_ID:
@@ -1312,3 +1317,125 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
 			ctx->fast_forward || FilterByOrigin(ctx, origin_id));
 }
+
+/*
+ * Decode Sequence Tuple
+ */
+static void
+DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
+{
+	int			datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
+
+	Assert(datalen >= 0);
+
+	tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;;
+
+	ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+	tuple->tuple.t_tableOid = InvalidOid;
+
+	memcpy(((char *) tuple->tuple.t_data),
+		   data + sizeof(xl_seq_rec),
+		   SizeofHeapTupleHeader);
+
+	memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
+		   data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
+		   datalen);
+}
+
+/*
+ * Handle sequence decode
+ *
+ * Decoding sequences is a bit tricky, because while most sequence actions
+ * are non-transactional (not subject to rollback), some need to be handled
+ * as transactional.
+ *
+ * By default, a sequence increment is non-transactional - we must not queue
+ * it in a transaction as other changes, because the transaction might get
+ * rolled back and we'd discard the increment. The downstream would not be
+ * notified about the increment, which is wrong.
+ *
+ * On the other hand, the sequence may be created in a transaction. In this
+ * case we *should* queue the change as other changes in the transaction,
+ * because we don't want to send the increments for unknown sequence to the
+ * plugin - it might get confused about which sequence it's related to etc. 
+ */
+static void
+DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	ReorderBufferTupleBuf *tuplebuf;
+	RelFileNode target_node;
+	XLogReaderState *r = buf->record;
+	char	   *tupledata = NULL;
+	Size		tuplelen;
+	Size		datalen = 0;
+	TransactionId xid = XLogRecGetXid(r);
+	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+	xl_seq_rec *xlrec;
+	Snapshot	snapshot;
+	RepOriginId origin_id = XLogRecGetOrigin(r);
+	bool		transactional;
+
+	/* only decode changes flagged with XLOG_SEQ_LOG */
+	if (info != XLOG_SEQ_LOG)
+		elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding messages.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	tupledata = XLogRecGetData(r);
+	datalen = XLogRecGetDataLen(r);
+	tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
+
+	/* extract the WAL record, with "created" flag */
+	xlrec = (xl_seq_rec *) XLogRecGetData(r);
+
+	/* XXX how could we have sequence change without data? */
+	if(!datalen || !tupledata)
+		return;
+
+	tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+	DecodeSeqTuple(tupledata, datalen, tuplebuf);
+
+	/*
+	 * Should we handle the sequence increment as transactional or not?
+	 *
+	 * If the sequence was created in a still-running transaction, treat
+	 * it as transactional and queue the increments. Otherwise it needs
+	 * to be treated as non-transactional, in which case we send it to
+	 * the plugin right away.
+	 */
+	transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
+														 target_node,
+														 xlrec->created);
+
+	/* Skip the change if already processed (per the snapshot). */
+	if (transactional &&
+		!SnapBuildProcessChange(builder, xid, buf->origptr))
+		return;
+	else if (!transactional &&
+			 (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+			  SnapBuildXactNeedsSkip(builder, buf->origptr)))
+		return;
+
+	/* Queue the increment (or send immediately if not transactional). */
+	snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+	ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
+							   origin_id, target_node, transactional,
+							   xlrec->created, tuplebuf);
+}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d61ef4cfad..5f6e02fba3 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
+static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							   XLogRecPtr sequence_lsn, Relation rel,
+							   bool transactional, bool created,
+							   int64 last_value, int64 log_cnt, bool is_called);
 
 /* streaming callbacks */
 static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -217,6 +221,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->sequence = sequence_cb_wrapper;
 
 	/*
 	 * To support streaming, we require start/stop/abort/commit/change
@@ -1198,6 +1203,43 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				    XLogRecPtr sequence_lsn, Relation rel,
+				    bool transactional, bool created,
+				    int64 last_value, int64 log_cnt, bool is_called)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	if (ctx->callbacks.sequence_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "sequence";
+	state.report_location = sequence_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = sequence_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+							   created, last_value, log_cnt, is_called);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						XLogRecPtr first_lsn)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index a245252529..abe006817e 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -600,6 +600,58 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 	pq_sendbytes(out, message, sz);
 }
 
+/*
+ * Write SEQUENCE to stream
+ */
+void
+logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
+						  XLogRecPtr lsn, bool transactional, bool created,
+						  int64 last_value, int64 log_cnt, bool is_called)
+{
+	uint8		flags = 0;
+	char	   *relname;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
+
+	/* transaction ID (if not valid, we're not streaming) */
+	if (TransactionIdIsValid(xid))
+		pq_sendint32(out, xid);
+
+	pq_sendint8(out, flags);
+	pq_sendint64(out, lsn);
+
+	logicalrep_write_namespace(out, RelationGetNamespace(rel));
+	relname = RelationGetRelationName(rel);
+	pq_sendstring(out, relname);
+
+	pq_sendint8(out, transactional);
+	pq_sendint8(out, created);
+	pq_sendint64(out, last_value);
+	pq_sendint64(out, log_cnt);
+	pq_sendint8(out, is_called);
+}
+
+/*
+ * Read SEQUENCE from the stream.
+ */
+void
+logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
+{
+	/* XXX skipping flags and lsn */
+	pq_getmsgint(in, 1);
+	pq_getmsgint64(in);
+
+	/* Read relation name from stream */
+	seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
+	seqdata->seqname = pstrdup(pq_getmsgstring(in));
+
+	seqdata->transactional = pq_getmsgint(in, 1);
+	seqdata->created = pq_getmsgint(in, 1);
+	seqdata->last_value = pq_getmsgint64(in);
+	seqdata->log_cnt = pq_getmsgint64(in);
+	seqdata->is_called = pq_getmsgint(in, 1);
+}
+
 /*
  * Write relation description to the output stream.
  */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7378beb684..f99e45f22d 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -116,6 +116,13 @@ typedef struct ReorderBufferTXNByIdEnt
 	ReorderBufferTXN *txn;
 } ReorderBufferTXNByIdEnt;
 
+/* entry for hash table we use to track sequences created in running xacts */
+typedef struct ReorderBufferSequenceEnt
+{
+	RelFileNode		rnode;
+	TransactionId	xid;
+} ReorderBufferSequenceEnt;
+
 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
 typedef struct ReorderBufferTupleCidKey
 {
@@ -338,6 +345,14 @@ ReorderBufferAllocate(void)
 	buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
 								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
+	/* hash table of sequences, mapping relfilenode to XID of transaction */
+	hash_ctl.keysize = sizeof(RelFileNode);
+	hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
+	hash_ctl.hcxt = buffer->context;
+
+	buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
+								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
 	buffer->by_txn_last_xid = InvalidTransactionId;
 	buffer->by_txn_last_txn = NULL;
 
@@ -528,6 +543,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			break;
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			if (change->data.sequence.tuple)
+			{
+				ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
+				change->data.sequence.tuple = NULL;
+			}
+			break;
 	}
 
 	pfree(change);
@@ -856,6 +878,212 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 	}
 }
 
+/*
+ * Treat the sequence increment as transactional?
+ */
+bool
+ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+									 RelFileNode rnode, bool created)
+{
+	bool	found = false;
+
+	if (created)
+		return true;
+
+	hash_search(rb->sequences,
+				(void *) &rnode,
+				HASH_FIND,
+				&found);
+
+	return found;
+}
+
+
+/*
+ * A transactional sequence increment is queued to be processed upon commit
+ * and a non-transactional increment gets processed immediately.
+ *
+ * A sequence update may be both transactional and non-transactional. When
+ * created in a running transaction, treat it as transactional and queue
+ * the change in it. Otherwise treat it as non-transactional, so that we
+ * don't forget the increment in case of a rollback.
+ */
+void
+ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+						   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+						   RelFileNode rnode, bool transactional, bool created,
+						   ReorderBufferTupleBuf *tuplebuf)
+{
+	/*
+	 * Change needs to be handled as transactional, because the sequence was
+	 * created in a transaction that is still running. In that case all the
+	 * changes need to be queued in that transaction, we must not send them
+	 * to the downstream until the transaction commits.
+	 *
+	 * There's a bit of a trouble with subtransactions - we can't queue it
+	 * into the subxact, because it might be rolled back and we'd lose the
+	 * increment. We need to queue it into the same (sub)xact that created
+	 * the sequence, which is why we track the XID in the hash table.
+	 */
+	if (transactional)
+	{
+		MemoryContext oldcontext;
+		ReorderBufferChange *change;
+
+		/* lookup sequence by relfilenode */
+		ReorderBufferSequenceEnt   *ent;
+		bool						found;
+
+		/* transactional changes require a transaction */
+		Assert(xid != InvalidTransactionId);
+
+		/* search the lookup table (we ignore the return value, found is enough) */
+		ent = hash_search(rb->sequences,
+						  (void *) &rnode,
+						  created ? HASH_ENTER : HASH_FIND,
+						  &found);
+
+		/*
+		 * If this is the "create" increment, we must not have found any
+		 * pre-existing entry in the hash table (i.e. there must not be
+		 * any conflicting sequence).
+		 */
+		Assert(!(created && found));
+
+		/* But we must have either created or found an existing entry. */
+		Assert(created || found);
+
+		/*
+		 * When creating the sequence, remember the XID of the transaction
+		 * that created id.
+		 */
+		if (created)
+			ent->xid = xid;
+
+		/* XXX Maybe check that we're still in the same top-level xact? */
+
+		/* OK, allocate and queue the change */
+		oldcontext = MemoryContextSwitchTo(rb->context);
+
+		change = ReorderBufferGetChange(rb);
+
+		change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
+		change->origin_id = origin_id;
+
+		memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
+
+		change->data.sequence.created = created;
+		change->data.sequence.tuple = tuplebuf;
+
+		/* add it to the same subxact that created the sequence */
+		ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+	else
+	{
+		/*
+		 * This increment is for a sequence that was not created in any
+		 * running transaction, so we treat it as non-transactional and
+		 * just send it to the output plugin directly.
+		 */
+		ReorderBufferTXN *txn = NULL;
+		volatile Snapshot snapshot_now = snapshot;
+		bool	using_subtxn;
+
+#ifdef USE_ASSERT_CHECKING
+		/* All "creates" have to be handled as transactional. */
+		Assert(!created);
+
+		/* Make sure the sequence is not in the hash table. */
+		{
+			bool	found;
+			hash_search(rb->sequences,
+					    (void *) &rnode,
+						HASH_FIND, &found);
+			Assert(!found);
+		}
+#endif
+
+		if (xid != InvalidTransactionId)
+			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+		/* setup snapshot to allow catalog access */
+		SetupHistoricSnapshot(snapshot_now, NULL);
+
+		/*
+		 * Decoding needs access to syscaches et al., which in turn use
+		 * heavyweight locks and such. Thus we need to have enough state around to
+		 * keep track of those.  The easiest way is to simply use a transaction
+		 * internally.  That also allows us to easily enforce that nothing writes
+		 * to the database by checking for xid assignments.
+		 *
+		 * When we're called via the SQL SRF there's already a transaction
+		 * started, so start an explicit subtransaction there.
+		 */
+		using_subtxn = IsTransactionOrTransactionBlock();
+
+		PG_TRY();
+		{
+			Relation	relation;
+			HeapTuple	tuple;
+			bool		isnull;
+			int64		last_value, log_cnt;
+			bool		is_called;
+			Oid			reloid;
+
+			if (using_subtxn)
+				BeginInternalSubTransaction("sequence");
+			else
+				StartTransactionCommand();
+
+			reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
+
+			if (reloid == InvalidOid)
+				elog(ERROR, "could not map filenode \"%s\" to relation OID",
+					 relpathperm(rnode,
+								 MAIN_FORKNUM));
+
+			relation = RelationIdGetRelation(reloid);
+			tuple = &tuplebuf->tuple;
+
+			/*
+			 * Extract the internal sequence values, describing the state.
+			 *
+			 * XXX Seems a bit strange to access it directly. Maybe there's
+			 * a better / more correct way?
+			 */
+			last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull);
+			log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
+			is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
+
+			rb->sequence(rb, txn, lsn, relation, transactional, created,
+						 last_value, log_cnt, is_called);
+
+			RelationClose(relation);
+
+			TeardownHistoricSnapshot(false);
+
+			AbortCurrentTransaction();
+
+			if (using_subtxn)
+				RollbackAndReleaseCurrentSubTransaction();
+		}
+		PG_CATCH();
+		{
+			TeardownHistoricSnapshot(true);
+
+			AbortCurrentTransaction();
+
+			if (using_subtxn)
+				RollbackAndReleaseCurrentSubTransaction();
+
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
+	}
+}
+
 /*
  * AssertTXNLsnOrder
  *		Verify LSN ordering of transaction lists in the reorderbuffer
@@ -1532,6 +1760,31 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 				&found);
 	Assert(found);
 
+	/*
+	 * Remove sequences created in this transaction (if any).
+	 *
+	 * There's no way to search by XID, so we simply do a seqscan of all
+	 * the entries in the hash table. Hopefully there are only a couple
+	 * entries in most cases - people generally don't create many new
+	 * sequences over and over.
+	 */
+	{
+		HASH_SEQ_STATUS scan_status;
+		ReorderBufferSequenceEnt *ent;
+
+		hash_seq_init(&scan_status, rb->sequences);
+		while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
+		{
+			/* skip sequences not from this transaction */
+			if (ent->xid != txn->xid)
+				continue;
+
+			(void) hash_search(rb->sequences,
+						   (void *) &(ent->rnode),
+						   HASH_REMOVE, NULL);
+		}
+	}
+
 	/* remove entries spilled to disk */
 	if (rbtxn_is_serialized(txn))
 		ReorderBufferRestoreCleanup(rb, txn);
@@ -1947,6 +2200,39 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					change->data.msg.message);
 }
 
+/*
+ * Helper function for ReorderBufferProcessTXN for applying sequences.
+ */
+static inline void
+ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
+						   Relation relation, ReorderBufferChange *change,
+						   bool streaming)
+{
+	HeapTuple	tuple;
+	bool		isnull;
+	int64		last_value, log_cnt;
+	bool		is_called;
+
+	/* FIXME support streaming */
+	Assert(!streaming);
+
+	tuple = &change->data.sequence.tuple->tuple;
+
+	/*
+	 * Extract the internal sequence values, describing the state.
+	 *
+	 * XXX Seems a bit strange to access it directly. Maybe there's
+	 * a better / more correct way?
+	 */
+	last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull);
+	log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
+	is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
+
+	rb->sequence(rb, txn, change->lsn, relation, true,	/* gotta be transactional */
+				 change->data.sequence.created,
+				 last_value, log_cnt, is_called);
+}
+
 /*
  * Function to store the command id and snapshot at the end of the current
  * stream so that we can reuse the same while sending the next stream.
@@ -2389,6 +2675,31 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
+
+				case REORDER_BUFFER_CHANGE_SEQUENCE:
+					Assert(snapshot_now);
+
+					reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
+												change->data.sequence.relnode.relNode);
+
+					if (reloid == InvalidOid)
+						elog(ERROR, "could not map filenode \"%s\" to relation OID",
+							 relpathperm(change->data.sequence.relnode,
+										 MAIN_FORKNUM));
+
+					relation = RelationIdGetRelation(reloid);
+
+					if (!RelationIsValid(relation))
+						elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+							 reloid,
+							 relpathperm(change->data.sequence.relnode,
+										 MAIN_FORKNUM));
+
+					if (RelationIsLogicallyLogged(relation))
+						ReorderBufferApplySequence(rb, txn, relation, change, streaming);
+
+					RelationClose(relation);
+					break;
 			}
 		}
 
@@ -3776,6 +4087,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				memcpy(data, change->data.truncate.relids, size);
 				data += size;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			{
+				char	   *data;
+				ReorderBufferTupleBuf *tup;
+				Size		len = 0;
+
+				tup = change->data.sequence.tuple;
+
+				if (tup)
+				{
+					sz += sizeof(HeapTupleData);
+					len = tup->tuple.t_len;
+					sz += len;
+				}
+
+				/* make sure we have enough space */
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+				/* might have been reallocated above */
+				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+				if (len)
+				{
+					memcpy(data, &tup->tuple, sizeof(HeapTupleData));
+					data += sizeof(HeapTupleData);
+
+					memcpy(data, tup->tuple.t_data, len);
+					data += len;
+				}
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4040,6 +4384,22 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 			{
 				sz += sizeof(Oid) * change->data.truncate.nrelids;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			{
+				ReorderBufferTupleBuf *tup;
+				Size		len = 0;
+
+				tup = change->data.sequence.tuple;
+
+				if (tup)
+				{
+					sz += sizeof(HeapTupleData);
+					len = tup->tuple.t_len;
+					sz += len;
+				}
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4341,6 +4701,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 				break;
 			}
+
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			if (change->data.sequence.tuple)
+			{
+				uint32		tuplelen = ((HeapTuple) data)->t_len;
+
+				change->data.sequence.tuple =
+					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
+
+				/* restore ->tuple */
+				memcpy(&change->data.sequence.tuple->tuple, data,
+					   sizeof(HeapTupleData));
+				data += sizeof(HeapTupleData);
+
+				/* reset t_data pointer into the new tuplebuf */
+				change->data.sequence.tuple->tuple.t_data =
+					ReorderBufferTupleBufData(change->data.sequence.tuple);
+
+				/* restore tuple data itself */
+				memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
+				data += tuplelen;
+			}
+			break;
+
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..24369a1522 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
+#include "commands/sequence.h"
 #include "miscadmin.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
@@ -357,6 +358,12 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  *
  * If the synchronization position is reached (SYNCDONE), then the table can
  * be marked as READY and is no longer tracked.
+ *
+ * XXX This needs to handle sequences too - after AlterSubscription_refresh
+ * starts caring about sequences, GetSubscriptionNotReadyRelations won't
+ * return just tables, and we'll have to sync them here. Not sure it's worth
+ * creating a new "sync" worker per sequence, maybe we should just sync them
+ * in the current process (it's pretty light-weight).
  */
 static void
 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
@@ -871,6 +878,99 @@ copy_table(Relation rel)
 	logicalrep_rel_close(relmapentry, NoLock);
 }
 
+
+
+/*
+ * FIXME add comment
+ */
+static void
+fetch_sequence_data(char *nspname, char *relname,
+					int64 *last_value, int64 *log_cnt, bool *is_called)
+{
+	WalRcvExecResult *res;
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[3] = {INT8OID, INT8OID, BOOLOID};
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
+					   "  FROM %s", quote_qualified_identifier(nspname, relname));
+
+	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow);
+	pfree(cmd.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not receive list of replicated tables from the publisher: %s",
+						res->err)));
+
+	/* Process the sequence. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		bool		isnull;
+
+		*last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		*log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		*is_called = DatumGetBool(slot_getattr(slot, 3, &isnull));
+		Assert(!isnull);
+
+		ExecClearTuple(slot);
+	}
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+}
+
+/*
+ * Copy existing data of a sequence from publisher.
+ *
+ * Caller is responsible for locking the local relation.
+ */
+static void
+copy_sequence(Relation rel)
+{
+	LogicalRepRelMapEntry *relmapentry;
+	LogicalRepRelation lrel;
+	StringInfoData cmd;
+	int64		last_value = 0,
+				log_cnt = 0;
+	bool		is_called = 0;
+
+	/* Get the publisher relation info. */
+	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
+							RelationGetRelationName(rel), &lrel);
+
+	/* Put the relation into relmap. */
+	logicalrep_relmap_update(&lrel);
+
+	/* Map the publisher relation to local one. */
+	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
+	Assert(rel == relmapentry->localrel);
+
+	/* Start copy on the publisher. */
+	initStringInfo(&cmd);
+
+	Assert(lrel.relkind == RELKIND_SEQUENCE);
+
+	fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
+
+	elog(WARNING, "sequence %s info last_value %ld log_cnt %ld is_called %d",
+		 quote_qualified_identifier(lrel.nspname, lrel.relname),
+		 last_value, log_cnt, is_called);
+
+	ResetSequence2(RelationGetRelid(rel), last_value, log_cnt, is_called);
+
+	logicalrep_rel_close(relmapentry, NoLock);
+}
+
+
+
+
 /*
  * Determine the tablesync slot name.
  *
@@ -1106,10 +1206,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 						originname)));
 	}
 
-	/* Now do the initial data copy */
-	PushActiveSnapshot(GetTransactionSnapshot());
-	copy_table(rel);
-	PopActiveSnapshot();
+	if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
+	{
+		/* Now do the initial sequence copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_sequence(rel);
+		PopActiveSnapshot();
+	}
+	else
+	{
+		/* Now do the initial data copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_table(rel);
+		PopActiveSnapshot();
+	}
 
 	res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
 	if (res->status != WALRCV_OK_COMMAND)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b9a7a7ffbb..1776b2d5f8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -144,6 +144,7 @@
 #include "catalog/pg_tablespace.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
+#include "commands/sequence.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/execPartition.h"
@@ -1060,6 +1061,57 @@ apply_handle_origin(StringInfo s)
 				 errmsg_internal("ORIGIN message sent out of order")));
 }
 
+/*
+ * Handle SEQUENCE message.
+ */
+static void
+apply_handle_sequence(StringInfo s)
+{
+	LogicalRepSequence	seq;
+	Oid					relid;
+
+	if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
+		return;
+
+	logicalrep_read_sequence(s, &seq);
+
+	/*
+	 * Non-transactional sequence updates should not be part of a remote
+	 * transaction. There should not be any running transaction.
+	 */
+	Assert((!seq.transactional) || in_remote_transaction);
+	Assert(!(!seq.transactional && in_remote_transaction));
+	Assert(!(!seq.transactional && IsTransactionState()));
+
+	/*
+	 * Make sure we're in a transaction (needed by ResetSequence2). For
+	 * non-transactional updates we're guaranteed to start a new one,
+	 * and we'll commit it at the end.
+	 */
+	if (!IsTransactionState())
+	{
+		StartTransactionCommand();
+		maybe_reread_subscription();
+	}
+
+	relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
+										  seq.seqname, -1),
+							 RowExclusiveLock, false);
+
+	elog(WARNING, "applying sequence transactional %d created %d last_value %ld log_cnt %ld is_called %d",
+		 seq.transactional, seq.created, seq.last_value, seq.log_cnt, seq.is_called);
+
+	/* apply the sequence change */
+	ResetSequence2(relid, seq.last_value, seq.log_cnt, seq.is_called);
+
+	/*
+	 * Commit the per-stream transaction (we only do this when not in
+	 * remote transaction, i.e. for non-transactional sequence updates.
+	 */
+	if (!in_remote_transaction)
+		CommitTransactionCommand();
+}
+
 /*
  * Handle STREAM START message.
  */
@@ -2302,6 +2354,10 @@ apply_dispatch(StringInfo s)
 			 */
 			return;
 
+		case LOGICAL_REP_MSG_SEQUENCE:
+			apply_handle_sequence(s);
+			return;
+
 		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
 			return;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e4314af13a..4366c275dc 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -49,6 +49,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							 bool transactional, const char *prefix,
 							 Size sz, const char *message);
+static void pgoutput_sequence(LogicalDecodingContext *ctx,
+							  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+							  Relation rel, bool transactional, bool created,
+							  int64 last_value, int64 log_cnt, bool is_called);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
@@ -157,6 +161,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pgoutput_change;
 	cb->truncate_cb = pgoutput_truncate;
 	cb->message_cb = pgoutput_message;
+	cb->sequence_cb = pgoutput_sequence;
 	cb->commit_cb = pgoutput_commit_txn;
 
 	cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -186,6 +191,7 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		publication_names_given = false;
 	bool		binary_option_given = false;
 	bool		messages_option_given = false;
+	bool		sequences_option_given = false;
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 
@@ -193,6 +199,7 @@ parse_output_parameters(List *options, PGOutputData *data)
 	data->streaming = false;
 	data->messages = false;
 	data->two_phase = false;
+	data->sequences = true;
 
 	foreach(lc, options)
 	{
@@ -258,6 +265,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 
 			data->messages = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "sequences") == 0)
+		{
+			if (sequences_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			sequences_option_given = true;
+
+			data->sequences = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "streaming") == 0)
 		{
 			if (streaming_given)
@@ -865,6 +882,47 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pgoutput_sequence(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+				  Relation rel, bool transactional, bool created,
+				  int64 last_value, int64 log_cnt, bool is_called)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	TransactionId xid = InvalidTransactionId;
+	RelationSyncEntry *relentry;
+
+	if (!data->sequences)
+		return;
+
+	if (!is_publishable_relation(rel))
+		return;
+
+	/* XXX handle (in_streaming) here */
+
+	relentry = get_rel_sync_entry(data, RelationGetRelid(rel));
+
+	/*
+	 * First check the sequence filter.
+	 *
+	 * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here.
+	 */
+	if (!relentry->pubactions.pubsequence)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_sequence(ctx->out,
+							  rel,
+							  xid,
+							  sequence_lsn,
+							  transactional,
+							  created,
+							  last_value,
+							  log_cnt,
+							  is_called);
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * Currently we always forward.
  */
@@ -1128,7 +1186,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->streamed_txns = NIL;
 		entry->replicate_valid = false;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
-			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+			entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
+			entry->pubactions.pubsequence = false;
 		entry->publish_as_relid = InvalidOid;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
@@ -1140,6 +1199,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		List	   *pubids = GetRelationPublications(relid);
 		ListCell   *lc;
 		Oid			publish_as_relid = relid;
+		bool		is_sequence = (get_rel_relkind(relid) == RELKIND_SEQUENCE);
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -1163,12 +1223,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
 
-			if (pub->alltables)
+			if (pub->alltables && (!is_sequence))
 			{
 				publish = true;
 				if (pub->pubviaroot && am_partition)
 					publish_as_relid = llast_oid(get_partition_ancestors(relid));
 			}
+			else if (pub->allsequences && is_sequence)
+			{
+				publish = true;
+			}
+
+			/* if a sequence, just cross-check the list of publications */
+			if (!publish && is_sequence)
+			{
+				if (list_member_oid(pubids, pub->oid))
+					publish = true;
+			}
 
 			if (!publish)
 			{
@@ -1219,10 +1290,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+				entry->pubactions.pubsequence |= pub->pubactions.pubsequence;
 			}
 
 			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
+				entry->pubactions.pubdelete && entry->pubactions.pubtruncate &&
+				entry->pubactions.pubsequence)
 				break;
 		}
 
@@ -1367,6 +1440,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 		entry->pubactions.pubupdate = false;
 		entry->pubactions.pubdelete = false;
 		entry->pubactions.pubtruncate = false;
+		entry->pubactions.pubsequence = false;
 	}
 }
 
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 13d9994af3..3dd39054e6 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5495,6 +5495,7 @@ GetRelationPublicationActions(Relation relation)
 		pubactions->pubupdate |= pubform->pubupdate;
 		pubactions->pubdelete |= pubform->pubdelete;
 		pubactions->pubtruncate |= pubform->pubtruncate;
+		pubactions->pubsequence |= pubform->pubsequence;
 
 		ReleaseSysCache(tup);
 
@@ -5503,7 +5504,8 @@ GetRelationPublicationActions(Relation relation)
 		 * other publications.
 		 */
 		if (pubactions->pubinsert && pubactions->pubupdate &&
-			pubactions->pubdelete && pubactions->pubtruncate)
+			pubactions->pubdelete && pubactions->pubtruncate &&
+			pubactions->pubsequence)
 			break;
 	}
 
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index d6bf725971..6f77354644 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1640,7 +1640,7 @@ psql_completion(const char *text, int start, int end)
 
 	/* ALTER PUBLICATION <name> */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny))
-		COMPLETE_WITH("ADD TABLE", "DROP TABLE", "OWNER TO", "RENAME TO", "SET");
+		COMPLETE_WITH("ADD TABLE", "DROP TABLE", "ADD SEQUENCE", "DROP SEQUENCE", "OWNER TO", "RENAME TO", "SET");
 	/* ALTER PUBLICATION <name> SET */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET"))
 		COMPLETE_WITH("(", "TABLE");
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 8cd0252082..8700a72dea 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11445,6 +11445,11 @@
   provolatile => 's', prorettype => 'oid', proargtypes => 'text',
   proallargtypes => '{text,oid}', proargmodes => '{i,o}',
   proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' },
+{ oid => '8000', descr => 'get OIDs of sequences in a publication',
+  proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't',
+  provolatile => 's', prorettype => 'oid', proargtypes => 'text',
+  proallargtypes => '{text,oid}', proargmodes => '{i,o}',
+  proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
   proname => 'pg_relation_is_publishable', provolatile => 's',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index f332bad4d4..150b8922cf 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -40,6 +40,12 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 	 */
 	bool		puballtables;
 
+	/*
+	 * indicates that this is special publication which should encompass all
+	 * sequences in the database (except for the unlogged and temp ones)
+	 */
+	bool		puballsequences;
+
 	/* true if inserts are published */
 	bool		pubinsert;
 
@@ -52,6 +58,9 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 	/* true if truncates are published */
 	bool		pubtruncate;
 
+	/* true if sequences are published */
+	bool		pubsequence;
+
 	/* true if partition changes are published using root schema */
 	bool		pubviaroot;
 } FormData_pg_publication;
@@ -72,6 +81,7 @@ typedef struct PublicationActions
 	bool		pubupdate;
 	bool		pubdelete;
 	bool		pubtruncate;
+	bool		pubsequence;
 } PublicationActions;
 
 typedef struct Publication
@@ -79,6 +89,7 @@ typedef struct Publication
 	Oid			oid;
 	char	   *name;
 	bool		alltables;
+	bool		allsequences;
 	bool		pubviaroot;
 	PublicationActions pubactions;
 } Publication;
@@ -107,6 +118,9 @@ extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
 extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(bool pubviaroot);
 
+extern List *GetAllSequencesPublicationRelations(void);
+extern List *GetPublicationSequenceRelations(Oid pubid);
+
 extern bool is_publishable_relation(Relation rel);
 extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
 											  bool if_not_exists);
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 40544dd4c7..c28e8695cb 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -48,6 +48,7 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data;
 typedef struct xl_seq_rec
 {
 	RelFileNode node;
+	bool		created;	/* is this a CREATE SEQUENCE */
 	/* SEQUENCE TUPLE DATA FOLLOWS AT THE END */
 } xl_seq_rec;
 
@@ -59,6 +60,7 @@ extern ObjectAddress DefineSequence(ParseState *pstate, CreateSeqStmt *stmt);
 extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
 extern void DeleteSequenceTuple(Oid relid);
 extern void ResetSequence(Oid seq_relid);
+extern void ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called);
 extern void ResetSequenceCaches(void);
 
 extern void seq_redo(XLogReaderState *rptr);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index def9651b34..aa602deae5 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3643,8 +3643,10 @@ typedef struct AlterPublicationStmt
 
 	/* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
 	List	   *tables;			/* List of tables to add/drop */
+	List	   *sequences;		/* List of sequences to add/drop */
 	bool		for_all_tables; /* Special publication for all tables in db */
-	DefElemAction tableAction;	/* What action to perform with the tables */
+	bool		for_all_sequences; /* Special publication for all tables in db */
+	DefElemAction action;		/* What action to perform with the tables/sequences */
 } AlterPublicationStmt;
 
 typedef struct CreateSubscriptionStmt
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 63de90d94a..931bc0722a 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -60,6 +60,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
 	LOGICAL_REP_MSG_MESSAGE = 'M',
+	LOGICAL_REP_MSG_SEQUENCE = 'X',	/* FIXME change */
 	LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
 	LOGICAL_REP_MSG_PREPARE = 'P',
 	LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
@@ -116,6 +117,19 @@ typedef struct LogicalRepTyp
 	char	   *typname;		/* name of the remote type */
 } LogicalRepTyp;
 
+/* Sequence info */
+typedef struct LogicalRepSequence
+{
+	Oid			remoteid;		/* unique id of the remote sequence */
+	char	   *nspname;		/* schema name of remote sequence */
+	char	   *seqname;		/* name of the remote sequence */
+	bool		transactional;
+	bool		created;
+	int64		last_value;
+	int64		log_cnt;		/* XXX probably not needed? */
+	bool		is_called;		/* XXX probably not needed? */
+} LogicalRepSequence;
+
 /* Transaction info */
 typedef struct LogicalRepBeginData
 {
@@ -223,6 +237,12 @@ extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
+extern void logicalrep_write_sequence(StringInfo out, Relation rel,
+									  TransactionId xid, XLogRecPtr lsn, 
+									  bool transactional, bool created,
+									  int64 last_value, int64 log_cnt,
+									  bool is_called);
+extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 810495ed0e..e3e1b03f32 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -88,6 +88,19 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
 										Size message_size,
 										const char *message);
 
+/*
+ * Called for the generic logical decoding sequences.
+ */
+typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
+										 ReorderBufferTXN *txn,
+										 XLogRecPtr sequence_lsn,
+										 Relation rel,
+										 bool transactional,
+										 bool created,
+										 int64 last_value,
+										 int64 log_cnt,
+										 bool is_called);
+
 /*
  * Filter changes by origin.
  */
@@ -219,6 +232,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeTruncateCB truncate_cb;
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeSequenceCB sequence_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 0dc460fb70..97eb2a7ef1 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -28,6 +28,7 @@ typedef struct PGOutputData
 	bool		streaming;
 	bool		messages;
 	bool		two_phase;
+	bool		sequences;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 5b40ff75f7..4512ed679b 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -64,7 +64,8 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
-	REORDER_BUFFER_CHANGE_TRUNCATE
+	REORDER_BUFFER_CHANGE_TRUNCATE,
+	REORDER_BUFFER_CHANGE_SEQUENCE
 };
 
 /* forward declaration */
@@ -158,6 +159,14 @@ typedef struct ReorderBufferChange
 			uint32		ninvalidations; /* Number of messages */
 			SharedInvalidationMessage *invalidations;	/* invalidation message */
 		}			inval;
+
+		/* Context data for Sequence changes */
+		struct
+		{
+			RelFileNode relnode;
+			bool		created;
+			ReorderBufferTupleBuf *tuple;
+		}			sequence;
 	}			data;
 
 	/*
@@ -430,6 +439,15 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* sequence callback signature */
+typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb,
+										 ReorderBufferTXN *txn,
+										 XLogRecPtr sequence_lsn,
+										 Relation rel,
+										 bool transactional, bool created,
+										 int64 last_value, int64 log_cnt,
+										 bool is_called);
+
 /* begin prepare callback signature */
 typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
 											 ReorderBufferTXN *txn);
@@ -511,6 +529,12 @@ struct ReorderBuffer
 	 */
 	HTAB	   *by_txn;
 
+	/*
+	 * relfilenode => XID lookup table for sequences created in a transaction
+	 * (also includes altered sequences, which assigns new relfilenode)
+	 */
+	HTAB	   *sequences;
+
 	/*
 	 * Transactions that could be a toplevel xact, ordered by LSN of the first
 	 * record bearing that xid.
@@ -541,6 +565,7 @@ struct ReorderBuffer
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
+	ReorderBufferSequenceCB sequence;
 
 	/*
 	 * Callbacks to be called when streaming a transaction at prepare time.
@@ -635,6 +660,10 @@ void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
 void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
+void		ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+									   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+									   RelFileNode rnode, bool transactional, bool created,
+									   ReorderBufferTupleBuf *tuplebuf);
 void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
@@ -682,4 +711,7 @@ void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
 void		StartupReorderBuffer(void);
 
+bool		ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+												 RelFileNode rnode, bool created);
+
 #endif
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index b5b065a1b6..963ce8d1df 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -160,8 +160,8 @@ DROP TABLE testpub_parted1;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 -- fail - view
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
-ERROR:  "testpub_view" is not a table
-DETAIL:  Only tables can be added to publications.
+ERROR:  "testpub_view" is not a table or sequence
+DETAIL:  Only tables and sequences can be added to publications.
 SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1, pub_test.testpub_nopk;
 RESET client_min_messages;
@@ -182,8 +182,8 @@ Tables:
 
 -- fail - view
 ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
-ERROR:  "testpub_view" is not a table
-DETAIL:  Only tables can be added to publications.
+ERROR:  "testpub_view" is not a table or sequence
+DETAIL:  Only tables and sequences can be added to publications.
 ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
 ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
 ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index e5ab11275d..393be871be 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,6 +1451,14 @@ pg_prepared_xacts| SELECT p.transaction,
    FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
      LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
      LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_publication_sequences| SELECT p.pubname,
+    n.nspname AS schemaname,
+    c.relname AS sequencename
+   FROM pg_publication p,
+    LATERAL pg_get_publication_sequences((p.pubname)::text) gpt(relid),
+    (pg_class c
+     JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
+  WHERE (c.oid = gpt.relid);
 pg_publication_tables| SELECT p.pubname,
     n.nspname AS schemaname,
     c.relname AS tablename
-- 
2.31.1

Reply via email to