Hi,

One of the existing limitations of logical decoding / replication is
that it does no care about sequences. The annoying consequence is that
after a failover to logical replica, all the table data may be
replicated but the sequences are still at the initial values, requiring
some custom solution that moves the sequences forward enough to prevent
duplicities.

There have been attempts to address this in the past, most recently [1],
but none of them got in due to various issues.

This is an attempt, based on [1] (but with many significant parts added
or reworked), aiming to deal with this. The primary purpose of sharing
it is getting feedback and opinions on the design decisions. It's still
a WIP - it works fine AFAICS, but some of the bits may be a bit hackish.

The overall goal is to have the same sequence data on the primary and
logical replica, or something sufficiently close to that, so that the
replica after a failover does not generate duplicate values.

This patch does a couple basic things:

1) extends the logical decoding to handle sequences. It adds a new
   callback, similarly to what we have for messages. There's a bit of
   complexity with transactional and non-transactional behavior, more
   about that later

2) extends test_decoding to support this new callback, printing the
   sequence increments (the decoded WAL records)

3) extends built-in replication to support sequences, so publications
   may contain both tables and sequences, etc., sequences data sync
   when creating subscriptions, etc.


transactional vs. non-transactional
-----------------------------------

The first part (extending logical decoding) is simple in principle. We
simply decode the sequence updates, but then comes a challenge - should
we just treat it transactionally and stash it in reorder buffer, or
just pass it to the output plugin right-away?

For messages, this can be specified as a flag when adding the message,
so the user can decide depending on the message purpose. For sequences,
all we do is nextval() and it depends on the context in which it's used,
we can't just pick one of those approaches.

Consider this, for example:

  CREATE SEQUENCE s;
  BEGIN;
    SELECT nextval('s') FROM generate_series(1,1000) s(i);
  ROLLBACK;

If we handle this "transactionally", we'd stash the "nextval" increment
into the transaction, and then discard it due to the rollback, so the
output plugin (and replica) would never get it. So this is an argument
for non-transactional behavior.

On the other hand, consider this:

  CREATE SEQUENCE s;
  BEGIN;
     ALTER SEQUENCE s RESTART WITH 2000;
     SELECT nextval('s') FROM generate_series(1,1000) s(i);
  ROLLBACK;

In this case the ALTER creates a new relfilenode, and the ROLLBACK does
discard it including the effects of the nextval calls. So here we should
treat it transactionally, stash the increment(s) in the transaction and
just discard it all on rollback.

A somewhat similar example is this

  BEGIN;
     CREATE SEQUENCE s;
     SELECT nextval('s') FROM generate_series(1,1000) s(i);
  COMMIT;

Again - the decoded nextval needs to be handled transactionally, because
otherwise it's going to be very difficult for custom plugins to combine
this with DDL replication.

So the patch does a fairly simple thing:

1) By default, sequences are treated non-transactionally, i.e. sent to
   the output plugin right away.

2) We track sequences created in running (sub)transactions, and those
   are handled transactionally. This includes ALTER SEQUENCE cases,
   which create a new relfilenode, which is used as an identifier.

It's a bit more complex, because of cases like this:

  BEGIN;
     CREATE SEQUENCE s;
     SAVEPOINT a;
     SELECT nextval('s') FROM generate_series(1,1000) s(i);
     ROLLBACK TO a;
  COMMIT;

because we must not discard the nextval changes - this is handled by
always stashing the nextval changes to the subxact where the sequence
relfilenode was created.

The tracking is a bit cumbersome - there's a hash table with relfilenode
mapped to XID in which it was created. AFAIK that works, but might be
an issue with many sequences created in running transactions. Not sure.


detecting sequence creation
---------------------------

Detection that a sequence (or rather the relfilenode) was created is
done by adding a "created" flag into the xl_seq_rec, and setting it to
"true" in the first WAL record after the creation. There might be some
other way, but this seemed simple enough.


applying the sequence (ResetSequence2)
--------------------------------------

The decoding pretty much just extracts log_value, log_cnt and is_called
from the sequence, and passes them to the output plugin. On the replica
we extract those from the message, and write them to the local sequence
using a new ResetSequence2 function.

It's possible we don't really need log_cnt and is_called. After all,
log_cnt is zero most of the time anyway, and the worst thing that could
happen if we ignore it is we skip a couple values (which seems fine).


syncing sequences in a subscription
-----------------------------------

After creating a subscription, the sequences get syncronized just like
tables. This part ia a bit hacked together, and there's definitely room
for improvement - e.g. a new bgworker is started for each sequence, as
we simply treat both tabels and sequences as "relation". But all we need
to do for sequences is copying the (last_value, log_cnt, is_called) and
calling ResetSequence2, so maybe we could sync all sequences in a single
worker, or something like that.


new "sequence" publication action
---------------------------------

The publications now have a new "sequence" publication action, which is
enabled by default. This determines whether the publication decodes
sequences or what.


FOR ALL SEQUENCES
-----------------

It should be possible to create FOR ALL SEQUENCES publications, just
like we have FOR ALL TABLES. But this produces shift/reduce conflicts
in the grammar, and I didn't bother dealing with that. So for now it's
required to do ALTER PUBLICATION ... [ADD | DROP] SEQUENCE ...


no streaming support yet
------------------------

There's no supoprt for streaming of in-progress transactions yet, but
should be trivial to add.


GetCurrentTransactionId() in nextval
------------------------------------

There's a bit annoying behavior of nextval() - if you do this:

  BEGIN;
    CREATE SEQUENCE s;
    SAVEPOINT a;
    SELECT nextval('s') FROM generate_series(1,100) s(i);
  COMMIT;

then the WAL record for nextval (right after the savepoint) will have
XID 0 (easy to see in pg_waldump). That's kinda strange, and it causes
problems in DecodeSequence() when calling

    SnapBuildProcessChange(builder, xid, buf->origptr)

for transactional changes, because that expects a valid XID. Fixing
this required adding GetCurrentTransactionId() to nextval() and two
other functions, which were only doing

  if (RelationNeedsWAL(seqrel))
    GetTopTransactionId();

so far. I'm not sure if this has some particularly bad consequences.


regards

[1] https://www.postgresql.org/message-id/flat/1710ed7e13b.cd7177461430746.3372264562543607781%40highgo.ca

--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 9a31e0b879..56ddc3abae 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
 
 REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	decoding_into_rel binary prepared replorigin time messages \
-	spill slot truncate stream stats twophase twophase_stream
+	spill slot truncate stream stats twophase twophase_stream \
+	sequence
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
 	twophase_snapshot
diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out
new file mode 100644
index 0000000000..07f3e3e92c
--- /dev/null
+++ b/contrib/test_decoding/expected/sequence.out
@@ -0,0 +1,327 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE SEQUENCE test_sequence;
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       3
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       4
+(1 row)
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+      14
+(1 row)
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    4000
+(1 row)
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, true);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, false);
+ setval 
+--------
+   3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3500
+(1 row)
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                                 data                                                 
+------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 4, log_cnt: 0 is_called: 1
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 334, log_cnt: 0 is_called: 1
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 14, log_cnt: 32 is_called: 1
+ COMMIT
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 14, log_cnt: 0 is_called: 1
+ COMMIT
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 4000, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 4320, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3830, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3830, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3500, log_cnt: 0 is_called: 0
+ sequence: public.test_sequence transactional: 0 created: 0 last_value: 3820, log_cnt: 0 is_called: 1
+(24 rows)
+
+DROP SEQUENCE test_sequence;
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       3
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3001
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3002
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3003
+(1 row)
+
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    6001
+(1 row)
+
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data 
+------
+(0 rows)
+
+-- rollback on table creation with serial column 
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data 
+------
+(0 rows)
+
+-- rollback on table with serial column 
+CREATE TABLE test_table (a SERIAL, b INT);
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+-- check table and sequence values after rollback 
+SELECT * from test_table_a_seq;
+ last_value | log_cnt | is_called 
+------------+---------+-----------
+       3003 |      30 | t
+(1 row)
+
+SELECT nextval('test_table_a_seq');
+ nextval 
+---------
+    3004
+(1 row)
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                                  data                                                   
+---------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_table_a_seq transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 3000, log_cnt: 0 is_called: 1
+ sequence: public.test_table_a_seq transactional: 0 created: 0 last_value: 3033, log_cnt: 0 is_called: 1
+ BEGIN
+ COMMIT
+(8 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                                 data                                                  
+-------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_table_a_seq transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ sequence: public.test_table_a_seq transactional: 1 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ table public.test_table: INSERT: a[integer]:1 b[integer]:100
+ table public.test_table: INSERT: a[integer]:2 b[integer]:200
+ COMMIT
+(6 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+ nextval 
+---------
+       1
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval 
+--------
+   3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval 
+---------
+    3001
+(1 row)
+
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ setval 
+--------
+   5000
+(1 row)
+
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+ last_value | log_cnt | is_called 
+------------+---------+-----------
+       3001 |      32 | t
+(1 row)
+
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+                                                 data                                                 
+------------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value: 1, log_cnt: 0 is_called: 0
+ sequence: public.test_sequence transactional: 1 created: 0 last_value: 33, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 1 created: 0 last_value: 3000, log_cnt: 0 is_called: 1
+ sequence: public.test_sequence transactional: 1 created: 0 last_value: 3033, log_cnt: 0 is_called: 1
+ COMMIT
+(6 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql
new file mode 100644
index 0000000000..d8a34738f3
--- /dev/null
+++ b/contrib/test_decoding/sql/sequence.sql
@@ -0,0 +1,119 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE SEQUENCE test_sequence;
+
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, true);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, false);
+SELECT nextval('test_sequence');
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+DROP SEQUENCE test_sequence;
+
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table creation with serial column 
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table with serial column 
+CREATE TABLE test_table (a SERIAL, b INT);
+
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+
+-- check table and sequence values after rollback 
+SELECT * from test_table_a_seq;
+SELECT nextval('test_table_a_seq');
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index de1b692658..162e3c18f0 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -35,6 +35,7 @@ typedef struct
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
 	bool		only_local;
+	bool		skip_sequences;
 } TestDecodingData;
 
 /*
@@ -76,6 +77,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
+static void pg_decode_sequence(LogicalDecodingContext *ctx,
+							  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+							  Relation rel, bool transactional, bool created,
+							  int64 last_value, int64 log_cnt, bool is_called);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
 									 TransactionId xid,
 									 const char *gid);
@@ -141,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+	cb->sequence_cb = pg_decode_sequence;
 	cb->filter_prepare_cb = pg_decode_filter_prepare;
 	cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
 	cb->prepare_cb = pg_decode_prepare_txn;
@@ -175,6 +181,9 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->skip_empty_xacts = false;
 	data->only_local = false;
 
+	/* skip sequences by default for backwards compatibility */
+	data->skip_sequences = true;
+
 	ctx->output_plugin_private = data;
 
 	opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
@@ -265,6 +274,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "skip-sequences") == 0)
+		{
+
+			if (elem->arg == NULL)
+				continue;	/* true by default */
+			else if (!parse_bool(strVal(elem->arg), &data->skip_sequences))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -744,6 +764,26 @@ pg_decode_message(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				   XLogRecPtr sequence_lsn, Relation rel,
+				   bool transactional, bool created,
+				   int64 last_value, int64 log_cnt, bool is_called)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+	char	   *nspname = get_namespace_name(RelationGetNamespace(rel));
+
+	/* return if requested to skip_sequences */
+	if (data->skip_sequences)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "sequence: %s.%s transactional: %d created: %d last_value: %zu, log_cnt: %zu is_called: %d",
+					 nspname, RelationGetRelationName(rel),
+					 transactional, created, last_value, log_cnt, is_called);
+	OutputPluginWrite(ctx, true);
+}
+
 static void
 pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 16493209c6..ff8a6f102c 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9429,6 +9429,11 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       <entry>prepared transactions</entry>
      </row>
 
+     <row>
+      <entry><link linkend="view-pg-publication-sequences"><structname>pg_publication_sequences</structname></link></entry>
+      <entry>publications and their associated sequences</entry>
+     </row>
+
      <row>
       <entry><link linkend="view-pg-publication-tables"><structname>pg_publication_tables</structname></link></entry>
       <entry>publications and their associated tables</entry>
@@ -11263,6 +11268,72 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
  </sect1>
 
+ <sect1 id="view-pg-publication-sequences">
+  <title><structname>pg_publication_sequences</structname></title>
+
+  <indexterm zone="view-pg-publication-sequences">
+   <primary>pg_publication_sequences</primary>
+  </indexterm>
+
+  <para>
+   The view <structname>pg_publication_sequences</structname> provides
+   information about the mapping between publications and the sequences they
+   contain.  Unlike the underlying catalog
+   <link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>,
+   this view expands
+   publications defined as <literal>FOR ALL SEQUENCES</literal>, so for such
+   publications there will be a row for each eligible sequence.
+  </para>
+
+  <table>
+   <title><structname>pg_publication_sequences</structname> Columns</title>
+   <tgroup cols="1">
+    <thead>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       Column Type
+      </para>
+      <para>
+       Description
+      </para></entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>pubname</structfield> <type>name</type>
+       (references <link linkend="catalog-pg-publication"><structname>pg_publication</structname></link>.<structfield>pubname</structfield>)
+      </para>
+      <para>
+       Name of publication
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>schemaname</structfield> <type>name</type>
+       (references <link linkend="catalog-pg-namespace"><structname>pg_namespace</structname></link>.<structfield>nspname</structfield>)
+      </para>
+      <para>
+       Name of schema containing sequence
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>sequencename</structfield> <type>name</type>
+       (references <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>relname</structfield>)
+      </para>
+      <para>
+       Name of sequence
+      </para></entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+ </sect1>
+
  <sect1 id="view-pg-publication-tables">
   <title><structname>pg_publication_tables</structname></title>
 
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index faa114b2c6..c68a1573de 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -24,6 +24,9 @@ PostgreSQL documentation
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
@@ -50,7 +53,18 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
   </para>
 
   <para>
-   The fourth variant of this command listed in the synopsis can change
+   The next three variants change which sequences are part of the publication.
+   The <literal>SET SEQUENCE</literal> clause will replace the list of sequences
+   in the publication with the specified one.  The <literal>ADD SEQUENCE</literal>
+   and <literal>DROP SEQUENCE</literal> clauses will add and remove one or more
+   sequences from the publication.  Note that adding sequences to a publication
+   that is already subscribed to will require a <literal>ALTER SUBSCRIPTION
+   ... REFRESH PUBLICATION</literal> action on the subscribing side in order
+   to become effective.
+  </para>
+
+  <para>
+   The seventh variant of this command listed in the synopsis can change
    all of the publication properties specified in
    <xref linkend="sql-createpublication"/>.  Properties not mentioned in the
    command retain their previous settings.
@@ -62,7 +76,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
 
   <para>
    You must own the publication to use <command>ALTER PUBLICATION</command>.
-   Adding a table to a publication additionally requires owning that table.
+   Adding a table to a publication additionally requires owning that table,
+   and the same requirement applies to sequences.
    To alter the owner, you must also be a direct or indirect member of the new
    owning role. The new owner must have <literal>CREATE</literal> privilege on
    the database.  Also, the new owner of a <literal>FOR ALL TABLES</literal>
@@ -97,6 +112,15 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><replaceable class="parameter">sequence_name</replaceable></term>
+    <listitem>
+     <para>
+      Name of an existing sequence.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term>
     <listitem>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 367ac814f4..3412ca68e9 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -138,8 +138,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     <term><literal>REFRESH PUBLICATION</literal></term>
     <listitem>
      <para>
-      Fetch missing table information from publisher.  This will start
-      replication of tables that were added to the subscribed-to publications
+      Fetch missing table and sequence information from publisher.  This will start
+      replication of tables and sequences that were added to the subscribed-to publications
       since the last invocation of <command>REFRESH PUBLICATION</command> or
       since <command>CREATE SUBSCRIPTION</command>.
      </para>
@@ -156,7 +156,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
           Specifies whether the existing data in the publications that are
           being subscribed to should be copied once the replication starts.
           The default is <literal>true</literal>.  (Previously subscribed
-          tables are not copied.)
+          tables and sequences are not copied.)
          </para>
         </listitem>
        </varlistentry>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 86e415af89..427582f36a 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -51,26 +51,27 @@ check_publication_add_relation(Relation targetrel)
 {
 	/* Must be a regular or partitioned table */
 	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
-		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
+		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE &&
+		RelationGetForm(targetrel)->relkind != RELKIND_SEQUENCE)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("\"%s\" is not a table",
+				 errmsg("\"%s\" is not a table or sequence",
 						RelationGetRelationName(targetrel)),
-				 errdetail("Only tables can be added to publications.")));
+				 errdetail("Only tables and sequences can be added to publications.")));
 
-	/* Can't be system table */
+	/* Can't be system table/sequence */
 	if (IsCatalogRelation(targetrel))
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("\"%s\" is a system table",
+				 errmsg("\"%s\" is a system table or sequence",
 						RelationGetRelationName(targetrel)),
-				 errdetail("System tables cannot be added to publications.")));
+				 errdetail("System tables / sequences cannot be added to publications.")));
 
 	/* UNLOGGED and TEMP relations cannot be part of publication. */
 	if (!RelationIsPermanent(targetrel))
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("table \"%s\" cannot be replicated",
+				 errmsg("table or sequence \"%s\" cannot be replicated",
 						RelationGetRelationName(targetrel)),
 				 errdetail("Temporary and unlogged relations cannot be replicated.")));
 }
@@ -98,7 +99,8 @@ static bool
 is_publishable_class(Oid relid, Form_pg_class reltuple)
 {
 	return (reltuple->relkind == RELKIND_RELATION ||
-			reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
+			reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
+			reltuple->relkind == RELKIND_SEQUENCE) &&
 		!IsCatalogRelationOid(relid) &&
 		reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
 		relid >= FirstNormalObjectId;
@@ -271,6 +273,10 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 
 		pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
 
+		/* skip sequences here */
+		if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+			continue;
+
 		if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
 			pub_partopt != PUBLICATION_PART_ROOT)
 		{
@@ -304,6 +310,49 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 	return result;
 }
 
+/*
+ * Gets list of relation oids for a publication (sequences only).
+ *
+ * This should only be used for normal publications, the FOR ALL TABLES
+ * should use GetAllSequencesPublicationRelations().
+ */
+List *
+GetPublicationSequenceRelations(Oid pubid)
+{
+	List	   *result;
+	Relation	pubrelsrel;
+	ScanKeyData scankey;
+	SysScanDesc scan;
+	HeapTuple	tup;
+
+	/* Find all publications associated with the relation. */
+	pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&scankey,
+				Anum_pg_publication_rel_prpubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(pubid));
+
+	scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
+							  true, NULL, 1, &scankey);
+
+	result = NIL;
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_publication_rel pubrel;
+
+		pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+		if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+			result = lappend_oid(result, pubrel->prrelid);
+	}
+
+	systable_endscan(scan);
+	table_close(pubrelsrel, AccessShareLock);
+
+	return result;
+}
+
 /*
  * Gets list of publication oids for publications marked as FOR ALL TABLES.
  */
@@ -404,6 +453,46 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 	return result;
 }
 
+/*
+ * Gets list of all relation published by FOR ALL TABLES publication(s).
+ *
+ * If the publication publishes partition changes via their respective root
+ * partitioned tables, we must exclude partitions in favor of including the
+ * root partitioned tables.
+ */
+List *
+GetAllSequencesPublicationRelations(void)
+{
+	Relation	classRel;
+	ScanKeyData key[1];
+	TableScanDesc scan;
+	HeapTuple	tuple;
+	List	   *result = NIL;
+
+	classRel = table_open(RelationRelationId, AccessShareLock);
+
+	ScanKeyInit(&key[0],
+				Anum_pg_class_relkind,
+				BTEqualStrategyNumber, F_CHAREQ,
+				CharGetDatum(RELKIND_SEQUENCE));
+
+	scan = table_beginscan_catalog(classRel, 1, key);
+
+	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+	{
+		Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+		Oid			relid = relForm->oid;
+
+		if (is_publishable_class(relid, relForm))
+			result = lappend_oid(result, relid);
+	}
+
+	table_endscan(scan);
+
+	table_close(classRel, AccessShareLock);
+	return result;
+}
+
 /*
  * Get publication using oid
  *
@@ -426,10 +515,12 @@ GetPublication(Oid pubid)
 	pub->oid = pubid;
 	pub->name = pstrdup(NameStr(pubform->pubname));
 	pub->alltables = pubform->puballtables;
+	pub->allsequences = pubform->puballsequences;
 	pub->pubactions.pubinsert = pubform->pubinsert;
 	pub->pubactions.pubupdate = pubform->pubupdate;
 	pub->pubactions.pubdelete = pubform->pubdelete;
 	pub->pubactions.pubtruncate = pubform->pubtruncate;
+	pub->pubactions.pubsequence = pubform->pubsequence;
 	pub->pubviaroot = pubform->pubviaroot;
 
 	ReleaseSysCache(tup);
@@ -555,3 +646,56 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 
 	SRF_RETURN_DONE(funcctx);
 }
+
+/*
+ * Returns Oids of sequences in a publication.
+ */
+Datum
+pg_get_publication_sequences(PG_FUNCTION_ARGS)
+{
+	FuncCallContext *funcctx;
+	char	   *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	Publication *publication;
+	List	   *sequences;
+
+	/* stuff done only on the first call of the function */
+	if (SRF_IS_FIRSTCALL())
+	{
+		MemoryContext oldcontext;
+
+		/* create a function context for cross-call persistence */
+		funcctx = SRF_FIRSTCALL_INIT();
+
+		/* switch to memory context appropriate for multiple function calls */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		publication = GetPublicationByName(pubname, false);
+
+		/*
+		 * Publications support partitioned tables, although all changes are
+		 * replicated using leaf partition identity and schema, so we only
+		 * need those.
+		 */
+		if (publication->allsequences)
+			sequences = GetAllSequencesPublicationRelations();
+		else
+			sequences = GetPublicationSequenceRelations(publication->oid);
+
+		funcctx->user_fctx = (void *) sequences;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	/* stuff done on every call of the function */
+	funcctx = SRF_PERCALL_SETUP();
+	sequences = (List *) funcctx->user_fctx;
+
+	if (funcctx->call_cntr < list_length(sequences))
+	{
+		Oid			relid = list_nth_oid(sequences, funcctx->call_cntr);
+
+		SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 999d984068..16f050023a 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -372,6 +372,16 @@ CREATE VIEW pg_publication_tables AS
          pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
     WHERE C.oid = GPT.relid;
 
+CREATE VIEW pg_publication_sequences AS
+    SELECT
+        P.pubname AS pubname,
+        N.nspname AS schemaname,
+        C.relname AS sequencename
+    FROM pg_publication P,
+         LATERAL pg_get_publication_sequences(P.pubname) GPT,
+         pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
+    WHERE C.oid = GPT.relid;
+
 CREATE VIEW pg_locks AS
     SELECT * FROM pg_lock_status() AS L;
 
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 95c253c8e0..5e5c5feab4 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -16,6 +16,7 @@
 
 #include "access/genam.h"
 #include "access/htup_details.h"
+#include "access/relation.h"
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
@@ -54,6 +55,12 @@ static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 								 AlterPublicationStmt *stmt);
 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
 
+static List *OpenSequenceList(List *sequences);
+static void CloseSequenceList(List *rels);
+static void PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+								 AlterPublicationStmt *stmt);
+static void PublicationDropSequences(Oid pubid, List *rels, bool missing_ok);
+
 static void
 parse_publication_options(List *options,
 						  bool *publish_given,
@@ -71,6 +78,7 @@ parse_publication_options(List *options,
 	pubactions->pubupdate = true;
 	pubactions->pubdelete = true;
 	pubactions->pubtruncate = true;
+	pubactions->pubsequence = true;
 	*publish_via_partition_root = false;
 
 	/* Parse options */
@@ -97,6 +105,7 @@ parse_publication_options(List *options,
 			pubactions->pubupdate = false;
 			pubactions->pubdelete = false;
 			pubactions->pubtruncate = false;
+			pubactions->pubsequence = false;
 
 			*publish_given = true;
 			publish = defGetString(defel);
@@ -119,6 +128,8 @@ parse_publication_options(List *options,
 					pubactions->pubdelete = true;
 				else if (strcmp(publish_opt, "truncate") == 0)
 					pubactions->pubtruncate = true;
+				else if (strcmp(publish_opt, "sequence") == 0)
+					pubactions->pubsequence = true;
 				else
 					ereport(ERROR,
 							(errcode(ERRCODE_SYNTAX_ERROR),
@@ -210,6 +221,8 @@ CreatePublication(CreatePublicationStmt *stmt)
 		BoolGetDatum(pubactions.pubdelete);
 	values[Anum_pg_publication_pubtruncate - 1] =
 		BoolGetDatum(pubactions.pubtruncate);
+	values[Anum_pg_publication_pubsequence - 1] =
+		BoolGetDatum(pubactions.pubsequence);
 	values[Anum_pg_publication_pubviaroot - 1] =
 		BoolGetDatum(publish_via_partition_root);
 
@@ -374,9 +387,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	rels = OpenTableList(stmt->tables);
 
-	if (stmt->tableAction == DEFELEM_ADD)
+	if (stmt->action == DEFELEM_ADD)
 		PublicationAddTables(pubid, rels, false, stmt);
-	else if (stmt->tableAction == DEFELEM_DROP)
+	else if (stmt->action == DEFELEM_DROP)
 		PublicationDropTables(pubid, rels, false);
 	else						/* DEFELEM_SET */
 	{
@@ -427,6 +440,82 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 	CloseTableList(rels);
 }
 
+/*
+ * Add or remove sequence to/from publication.
+ */
+static void
+AlterPublicationSequences(AlterPublicationStmt *stmt, Relation rel,
+						  HeapTuple tup)
+{
+	List	   *rels = NIL;
+	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+	Oid			pubid = pubform->oid;
+
+	/* Check that user is allowed to manipulate the publication tables. */
+	if (pubform->puballsequences)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
+						NameStr(pubform->pubname)),
+				 errdetail("Sequences cannot be added to or dropped from FOR ALL TABLES publications.")));
+
+	Assert(list_length(stmt->sequences) > 0);
+
+	rels = OpenSequenceList(stmt->sequences);
+
+	if (stmt->action == DEFELEM_ADD)
+		PublicationAddSequences(pubid, rels, false, stmt);
+	else if (stmt->action == DEFELEM_DROP)
+		PublicationDropSequences(pubid, rels, false);
+	else						/* DEFELEM_SET */
+	{
+		List	   *oldrelids = GetPublicationRelations(pubid,
+														PUBLICATION_PART_ROOT);
+		List	   *delrels = NIL;
+		ListCell   *oldlc;
+
+		/* Calculate which relations to drop. */
+		foreach(oldlc, oldrelids)
+		{
+			Oid			oldrelid = lfirst_oid(oldlc);
+			ListCell   *newlc;
+			bool		found = false;
+
+			foreach(newlc, rels)
+			{
+				Relation	newrel = (Relation) lfirst(newlc);
+
+				if (RelationGetRelid(newrel) == oldrelid)
+				{
+					found = true;
+					break;
+				}
+			}
+
+			if (!found)
+			{
+				Relation	oldrel = relation_open(oldrelid,
+												ShareUpdateExclusiveLock);
+
+				delrels = lappend(delrels, oldrel);
+			}
+		}
+
+		/* And drop them. */
+		PublicationDropSequences(pubid, delrels, true);
+
+		/*
+		 * Don't bother calculating the difference for adding, we'll catch and
+		 * skip existing ones when doing catalog update.
+		 */
+		PublicationAddSequences(pubid, rels, true, stmt);
+
+		CloseSequenceList(delrels);
+	}
+
+	CloseSequenceList(rels);
+}
+
 /*
  * Alter the existing publication.
  *
@@ -460,8 +549,12 @@ AlterPublication(AlterPublicationStmt *stmt)
 
 	if (stmt->options)
 		AlterPublicationOptions(stmt, rel, tup);
-	else
+	else if (stmt->tables)
 		AlterPublicationTables(stmt, rel, tup);
+	else if (stmt->sequences)
+		AlterPublicationSequences(stmt, rel, tup);
+	else
+		Assert(false);
 
 	/* Cleanup. */
 	heap_freetuple(tup);
@@ -666,6 +759,139 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 	}
 }
 
+/*
+ * Open relations specified by a RangeVar list.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
+ */
+static List *
+OpenSequenceList(List *sequences)
+{
+	List	   *relids = NIL;
+	List	   *rels = NIL;
+	ListCell   *lc;
+
+	/*
+	 * Open, share-lock, and check all the explicitly-specified relations
+	 */
+	foreach(lc, sequences)
+	{
+		RangeVar   *rv = castNode(RangeVar, lfirst(lc));
+		Relation	rel;
+		Oid			myrelid;
+
+		/* Allow query cancel in case this takes a long time */
+		CHECK_FOR_INTERRUPTS();
+
+		rel = relation_openrv(rv, ShareUpdateExclusiveLock);
+		myrelid = RelationGetRelid(rel);
+
+		/*
+		 * Filter out duplicates if user specifies "foo, foo".
+		 *
+		 * Note that this algorithm is known to not be very efficient (O(N^2))
+		 * but given that it only works on list of sequences given to us by user
+		 * it's deemed acceptable.
+		 */
+		if (list_member_oid(relids, myrelid))
+		{
+			relation_close(rel, ShareUpdateExclusiveLock);
+			continue;
+		}
+
+		rels = lappend(rels, rel);
+		relids = lappend_oid(relids, myrelid);
+	}
+
+	list_free(relids);
+
+	return rels;
+}
+
+/*
+ * Close all relations in the list.
+ */
+static void
+CloseSequenceList(List *rels)
+{
+	ListCell   *lc;
+
+	foreach(lc, rels)
+	{
+		Relation	rel = (Relation) lfirst(lc);
+
+		relation_close(rel, NoLock);
+	}
+}
+
+/*
+ * Add listed tables to the publication.
+ */
+static void
+PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+						AlterPublicationStmt *stmt)
+{
+	ListCell   *lc;
+
+	Assert(!stmt || !stmt->for_all_sequences);
+
+	foreach(lc, rels)
+	{
+		Relation	rel = (Relation) lfirst(lc);
+		ObjectAddress obj;
+
+		/* Must be owner of the table or superuser. */
+		if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
+						   RelationGetRelationName(rel));
+
+		obj = publication_add_relation(pubid, rel, if_not_exists);
+		if (stmt)
+		{
+			EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+											 (Node *) stmt);
+
+			InvokeObjectPostCreateHook(PublicationRelRelationId,
+									   obj.objectId, 0);
+		}
+	}
+}
+
+/*
+ * Remove listed sequences from the publication.
+ */
+static void
+PublicationDropSequences(Oid pubid, List *rels, bool missing_ok)
+{
+	ObjectAddress obj;
+	ListCell   *lc;
+	Oid			prid;
+
+	foreach(lc, rels)
+	{
+		Relation	rel = (Relation) lfirst(lc);
+		Oid			relid = RelationGetRelid(rel);
+
+		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
+							   ObjectIdGetDatum(relid),
+							   ObjectIdGetDatum(pubid));
+		if (!OidIsValid(prid))
+		{
+			if (missing_ok)
+				continue;
+
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+					 errmsg("relation \"%s\" is not part of the publication",
+							RelationGetRelationName(rel))));
+		}
+
+		ObjectAddressSet(obj, PublicationRelRelationId, prid);
+		performDeletion(&obj, DROP_CASCADE, 0);
+	}
+}
+
+
 /*
  * Internal workhorse for changing a publication owner
  */
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 0415df9ccb..6898a45365 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -94,7 +94,7 @@ static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */
  */
 static SeqTableData *last_used_seq = NULL;
 
-static void fill_seq_with_data(Relation rel, HeapTuple tuple);
+static void fill_seq_with_data(Relation rel, HeapTuple tuple, bool create);
 static Relation lock_and_open_sequence(SeqTable seq);
 static void create_seq_hashtable(void);
 static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
@@ -222,7 +222,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
 
 	/* now initialize the sequence's data */
 	tuple = heap_form_tuple(tupDesc, value, null);
-	fill_seq_with_data(rel, tuple);
+	fill_seq_with_data(rel, tuple, true);
 
 	/* process OWNED BY if given */
 	if (owned_by)
@@ -327,7 +327,86 @@ ResetSequence(Oid seq_relid)
 	/*
 	 * Insert the modified tuple into the new storage file.
 	 */
-	fill_seq_with_data(seq_rel, tuple);
+	fill_seq_with_data(seq_rel, tuple, true);
+
+	/* Clear local cache so that we don't think we have cached numbers */
+	/* Note that we do not change the currval() state */
+	elm->cached = elm->last;
+
+	relation_close(seq_rel, NoLock);
+}
+
+/*
+ * Reset a sequence to its initial value.
+ *
+ * The change is made transactionally, so that on failure of the current
+ * transaction, the sequence will be restored to its previous state.
+ * We do that by creating a whole new relfilenode for the sequence; so this
+ * works much like the rewriting forms of ALTER TABLE.
+ *
+ * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
+ * which must not be released until end of transaction.  Caller is also
+ * responsible for permissions checking.
+ */
+void
+ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
+{
+	Relation	seq_rel;
+	SeqTable	elm;
+	Form_pg_sequence_data seq;
+	Buffer		buf;
+	HeapTupleData seqdatatuple;
+	HeapTuple	tuple;
+
+	/*
+	 * Read the old sequence.  This does a bit more work than really
+	 * necessary, but it's simple, and we do want to double-check that it's
+	 * indeed a sequence.
+	 */
+	init_sequence(seq_relid, &elm, &seq_rel);
+	(void) read_seq_tuple(seq_rel, &buf, &seqdatatuple);
+
+	/*
+	 * Copy the existing sequence tuple.
+	 */
+	tuple = heap_copytuple(&seqdatatuple);
+
+	/* Now we're done with the old page */
+	UnlockReleaseBuffer(buf);
+
+	/*
+	 * Modify the copied tuple to execute the restart (compare the RESTART
+	 * action in AlterSequence)
+	 */
+	seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+	seq->last_value = last_value;
+	seq->is_called = is_called;
+	seq->log_cnt = log_cnt;
+
+	/*
+	 * Create a new storage file for the sequence.
+	 */
+	RelationSetNewRelfilenode(seq_rel, seq_rel->rd_rel->relpersistence);
+
+	/*
+	 * Ensure sequence's relfrozenxid is at 0, since it won't contain any
+	 * unfrozen XIDs.  Same with relminmxid, since a sequence will never
+	 * contain multixacts.
+	 */
+	Assert(seq_rel->rd_rel->relfrozenxid == InvalidTransactionId);
+	Assert(seq_rel->rd_rel->relminmxid == InvalidMultiXactId);
+
+	/*
+	 * Insert the modified tuple into the new storage file.
+	 *
+	 * XXX Maybe this should also use created=true, just like the other places
+	 * calling fill_seq_with_data. That's probably needed for correct cascading
+	 * replication.
+	 *
+	 * XXX That'd mean all fill_seq_with_data callers use created=true, making
+	 * the parameter unnecessary.
+	 */
+	fill_seq_with_data(seq_rel, tuple, false);
 
 	/* Clear local cache so that we don't think we have cached numbers */
 	/* Note that we do not change the currval() state */
@@ -340,7 +419,7 @@ ResetSequence(Oid seq_relid)
  * Initialize a sequence's relation with the specified tuple as content
  */
 static void
-fill_seq_with_data(Relation rel, HeapTuple tuple)
+fill_seq_with_data(Relation rel, HeapTuple tuple, bool create)
 {
 	Buffer		buf;
 	Page		page;
@@ -378,8 +457,21 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
 
 	/* check the comment above nextval_internal()'s equivalent call. */
 	if (RelationNeedsWAL(rel))
+	{
 		GetTopTransactionId();
 
+		/*
+		 * Ensure we have a proper XID, which will be included in the XLOG
+		 * record by XLogRecordAssemble. Otherwise the first nextval() in
+		 * a subxact (without any preceding changes) would get XID 0,
+		 * and it'd be impossible to decide which top xact it belongs to.
+		 * It'd also trigger assert in DecodeSequence.
+		 *
+		 * XXX Not sure if this is the best solution.
+		 */
+		GetCurrentTransactionId();
+	}
+
 	START_CRIT_SECTION();
 
 	MarkBufferDirty(buf);
@@ -399,6 +491,7 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.node = rel->rd_node;
+		xlrec.created = create;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) tuple->t_data, tuple->t_len);
@@ -502,7 +595,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
 		/*
 		 * Insert the modified tuple into the new storage file.
 		 */
-		fill_seq_with_data(seqrel, newdatatuple);
+		fill_seq_with_data(seqrel, newdatatuple, true);
 	}
 
 	/* process OWNED BY if given */
@@ -766,8 +859,21 @@ nextval_internal(Oid relid, bool check_permissions)
 	 * (Have to do that here, so we're outside the critical section)
 	 */
 	if (logit && RelationNeedsWAL(seqrel))
+	{
 		GetTopTransactionId();
 
+		/*
+		 * Ensure we have a proper XID, which will be included in the XLOG
+		 * record by XLogRecordAssemble. Otherwise the first nextval() in
+		 * a subxact (without any preceding changes) would get XID 0,
+		 * and it'd be impossible to decide which top xact it belongs to.
+		 * It'd also trigger assert in DecodeSequence.
+		 *
+		 * XXX Not sure if this is the best solution.
+		 */
+		GetCurrentTransactionId();
+	}
+
 	/* ready to change the on-disk (or really, in-buffer) tuple */
 	START_CRIT_SECTION();
 
@@ -803,6 +909,7 @@ nextval_internal(Oid relid, bool check_permissions)
 		seq->log_cnt = 0;
 
 		xlrec.node = seqrel->rd_node;
+		xlrec.created = false;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -977,8 +1084,21 @@ do_setval(Oid relid, int64 next, bool iscalled)
 
 	/* check the comment above nextval_internal()'s equivalent call. */
 	if (RelationNeedsWAL(seqrel))
+	{
 		GetTopTransactionId();
 
+		/*
+		 * Ensure we have a proper XID, which will be included in the XLOG
+		 * record by XLogRecordAssemble. Otherwise the first nextval() in
+		 * a subxact (without any preceding changes) would get XID 0,
+		 * and it'd be impossible to decide which top xact it belongs to.
+		 * It'd also trigger assert in DecodeSequence.
+		 *
+		 * XXX Not sure if this is the best solution.
+		 */
+		GetCurrentTransactionId();
+	}
+
 	/* ready to change the on-disk (or really, in-buffer) tuple */
 	START_CRIT_SECTION();
 
@@ -999,6 +1119,8 @@ do_setval(Oid relid, int64 next, bool iscalled)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.node = seqrel->rd_node;
+		xlrec.created = false;
+
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 8aa6de1785..2112be663b 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -47,6 +47,7 @@
 #include "utils/syscache.h"
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -461,6 +462,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 		char	   *err;
 		WalReceiverConn *wrconn;
 		List	   *tables;
+		List	   *sequences;
 		ListCell   *lc;
 		char		table_state;
 
@@ -498,6 +500,26 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 										InvalidXLogRecPtr);
 			}
 
+			/*
+			 * Get the sequence list from publisher and build local sequence
+			 * status info.
+			 */
+			sequences = fetch_sequence_list(wrconn, publications);
+			foreach(lc, sequences)
+			{
+				RangeVar   *rv = (RangeVar *) lfirst(lc);
+				Oid			relid;
+
+				relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+				/* Check for supported relkind. */
+				CheckSubscriptionRelkind(get_rel_relkind(relid),
+										 rv->schemaname, rv->relname);
+
+				AddSubscriptionRelState(subid, relid, table_state,
+										InvalidXLogRecPtr);
+			}
+
 			/*
 			 * If requested, create permanent slot for the subscription. We
 			 * won't use the initial snapshot for anything, so no need to
@@ -644,6 +666,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		{
 			Oid			relid = subrel_local_oids[off];
 
+			/* XXX ignore sequences - maybe do this in GetSubscriptionRelations? */
+			if (get_rel_relkind(relid) == RELKIND_SEQUENCE)
+				continue;
+
 			if (!bsearch(&relid, pubrel_local_oids,
 						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
 			{
@@ -735,6 +761,185 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
 			}
 		}
+
+		/*
+		 * XXX now do the same thing for sequences, maybe before the preceding
+		 * block, or earlier?
+		 */
+
+		/* Get the table list from publisher. */
+		pubrel_names = fetch_sequence_list(wrconn, sub->publications);
+
+		/* Get local table list. */
+		subrel_states = GetSubscriptionRelations(sub->oid);
+
+		/*
+		 * Build qsorted array of local table oids for faster lookup. This can
+		 * potentially contain all tables in the database so speed of lookup
+		 * is important.
+		 */
+		subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+		off = 0;
+		foreach(lc, subrel_states)
+		{
+			SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+
+			subrel_local_oids[off++] = relstate->relid;
+		}
+		qsort(subrel_local_oids, list_length(subrel_states),
+			  sizeof(Oid), oid_cmp);
+
+		/*
+		 * Rels that we want to remove from subscription and drop any slots
+		 * and origins corresponding to them.
+		 */
+		sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+
+		/*
+		 * Walk over the remote tables and try to match them to locally known
+		 * tables. If the table is not known locally create a new state for
+		 * it.
+		 *
+		 * Also builds array of local oids of remote tables for the next step.
+		 */
+		off = 0;
+		pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+		foreach(lc, pubrel_names)
+		{
+			RangeVar   *rv = (RangeVar *) lfirst(lc);
+			Oid			relid;
+
+			relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+			/* Check for supported relkind. */
+			CheckSubscriptionRelkind(get_rel_relkind(relid),
+									 rv->schemaname, rv->relname);
+
+			pubrel_local_oids[off++] = relid;
+
+			if (!bsearch(&relid, subrel_local_oids,
+						 list_length(subrel_states), sizeof(Oid), oid_cmp))
+			{
+				AddSubscriptionRelState(sub->oid, relid,
+										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+										InvalidXLogRecPtr);
+				ereport(DEBUG1,
+						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
+										 rv->schemaname, rv->relname, sub->name)));
+			}
+		}
+
+		/*
+		 * Next remove state for tables we should not care about anymore using
+		 * the data we collected above
+		 */
+		qsort(pubrel_local_oids, list_length(pubrel_names),
+			  sizeof(Oid), oid_cmp);
+
+		remove_rel_len = 0;
+		for (off = 0; off < list_length(subrel_states); off++)
+		{
+			Oid			relid = subrel_local_oids[off];
+
+			/* XXX ignore non-sequences - maybe do this in GetSubscriptionRelations? */
+			if (get_rel_relkind(relid) != RELKIND_SEQUENCE)
+				continue;
+
+			if (!bsearch(&relid, pubrel_local_oids,
+						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
+			{
+				char		state;
+				XLogRecPtr	statelsn;
+
+				/*
+				 * Lock pg_subscription_rel with AccessExclusiveLock to
+				 * prevent any race conditions with the apply worker
+				 * re-launching workers at the same time this code is trying
+				 * to remove those tables.
+				 *
+				 * Even if new worker for this particular rel is restarted it
+				 * won't be able to make any progress as we hold exclusive
+				 * lock on subscription_rel till the transaction end. It will
+				 * simply exit as there is no corresponding rel entry.
+				 *
+				 * This locking also ensures that the state of rels won't
+				 * change till we are done with this refresh operation.
+				 */
+				if (!rel)
+					rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+				/* Last known rel state. */
+				state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+
+				sub_remove_rels[remove_rel_len].relid = relid;
+				sub_remove_rels[remove_rel_len++].state = state;
+
+				elog(WARNING, "B: remove rel %d", relid);
+
+				RemoveSubscriptionRel(sub->oid, relid);
+
+				logicalrep_worker_stop(sub->oid, relid);
+
+				/*
+				 * For READY state, we would have already dropped the
+				 * tablesync origin.
+				 */
+				if (state != SUBREL_STATE_READY)
+				{
+					char		originname[NAMEDATALEN];
+
+					/*
+					 * Drop the tablesync's origin tracking if exists.
+					 *
+					 * It is possible that the origin is not yet created for
+					 * tablesync worker, this can happen for the states before
+					 * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+					 * concurrently try to drop the origin and by this time
+					 * the origin might be already removed. For these reasons,
+					 * passing missing_ok = true.
+					 */
+					ReplicationOriginNameForTablesync(sub->oid, relid, originname,
+													  sizeof(originname));
+					replorigin_drop_by_name(originname, true, false);
+				}
+
+				ereport(DEBUG1,
+						(errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+										 get_namespace_name(get_rel_namespace(relid)),
+										 get_rel_name(relid),
+										 sub->name)));
+			}
+		}
+
+		/*
+		 * Drop the tablesync slots associated with removed tables. This has
+		 * to be at the end because otherwise if there is an error while doing
+		 * the database operations we won't be able to rollback dropped slots.
+		 */
+		for (off = 0; off < remove_rel_len; off++)
+		{
+			if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
+				sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+			{
+				char		syncslotname[NAMEDATALEN] = {0};
+
+				/*
+				 * For READY/SYNCDONE states we know the tablesync slot has
+				 * already been dropped by the tablesync worker.
+				 *
+				 * For other states, there is no certainty, maybe the slot
+				 * does not exist yet. Also, if we fail after removing some of
+				 * the slots, next time, it will again try to drop already
+				 * dropped slots and fail. For these reasons, we allow
+				 * missing_ok = true for the drop.
+				 */
+				ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
+												syncslotname, sizeof(syncslotname));
+				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+			}
+		}
+
 	}
 	PG_FINALLY();
 	{
@@ -1534,6 +1739,75 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	return tablelist;
 }
 
+/*
+ * Get the list of sequences which belong to specified publications on the
+ * publisher connection.
+ */
+static List *
+fetch_sequence_list(WalReceiverConn *wrconn, List *publications)
+{
+	WalRcvExecResult *res;
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	ListCell   *lc;
+	bool		first;
+	List	   *tablelist = NIL;
+
+	Assert(list_length(publications) > 0);
+
+	initStringInfo(&cmd);
+	appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.tablename\n"
+						   "  FROM pg_catalog.pg_publication_sequences s\n"
+						   " WHERE s.pubname IN (");
+	first = true;
+	foreach(lc, publications)
+	{
+		char	   *pubname = strVal(lfirst(lc));
+
+		if (first)
+			first = false;
+		else
+			appendStringInfoString(&cmd, ", ");
+
+		appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+	}
+	appendStringInfoChar(&cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	pfree(cmd.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not receive list of replicated tables from the publisher: %s",
+						res->err)));
+
+	/* Process tables. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *nspname;
+		char	   *relname;
+		bool		isnull;
+		RangeVar   *rv;
+
+		nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+		relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		rv = makeRangeVar(nspname, relname, -1);
+		tablelist = lappend(tablelist, rv);
+
+		ExecClearTuple(slot);
+	}
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+
+	return tablelist;
+}
+
 /*
  * This is to report the connection failure while dropping replication slots.
  * Here, we report the WARNING for all tablesync slots so that user can drop
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 1e285e0349..a1c7880be6 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -619,11 +619,11 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
 				 errdetail("\"%s.%s\" is a foreign table.",
 						   nspname, relname)));
 
-	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && relkind != RELKIND_SEQUENCE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
 						nspname, relname),
-				 errdetail("\"%s.%s\" is not a table.",
+				 errdetail("\"%s.%s\" is not a table or a sequence.",
 						   nspname, relname)));
 }
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 621f7ce068..b98c1b2f6e 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4831,8 +4831,10 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from)
 	COPY_STRING_FIELD(pubname);
 	COPY_NODE_FIELD(options);
 	COPY_NODE_FIELD(tables);
+	COPY_NODE_FIELD(sequences);
 	COPY_SCALAR_FIELD(for_all_tables);
-	COPY_SCALAR_FIELD(tableAction);
+	COPY_SCALAR_FIELD(for_all_sequences);
+	COPY_SCALAR_FIELD(action);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3033c1934c..b72a5831e3 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2300,8 +2300,10 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a,
 	COMPARE_STRING_FIELD(pubname);
 	COMPARE_NODE_FIELD(options);
 	COMPARE_NODE_FIELD(tables);
+	COMPARE_NODE_FIELD(sequences);
 	COMPARE_SCALAR_FIELD(for_all_tables);
-	COMPARE_SCALAR_FIELD(tableAction);
+	COMPARE_SCALAR_FIELD(for_all_sequences);
+	COMPARE_SCALAR_FIELD(action);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 9ee90e3f13..880883e3b8 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -434,6 +434,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
 %type <node>	opt_publication_for_tables publication_for_tables
+/* %type <node>	opt_publication_for_sequences publication_for_sequences */
 
 %type <list>	opt_fdw_options fdw_options
 %type <defelt>	fdw_option
@@ -9612,6 +9613,7 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec
  *****************************************************************************/
 
 CreatePublicationStmt:
+			/* CREATE PUBLICATION name opt_publication_for_tables opt_publication_for_sequences opt_definition */
 			CREATE PUBLICATION name opt_publication_for_tables opt_definition
 				{
 					CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
@@ -9646,6 +9648,12 @@ publication_for_tables:
 				}
 		;
 
+/*
+ * FIXME
+ *
+ * opt_publication_for_sequences and publication_for_sequences should be
+ * copies for sequences
+ */
 
 /*****************************************************************************
  *
@@ -9657,6 +9665,12 @@ publication_for_tables:
  *
  * ALTER PUBLICATION name SET TABLE table [, table2]
  *
+ * ALTER PUBLICATION name ADD SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name DROP SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name SET SEQUENCE sequence [, sequence2]
+ *
  *****************************************************************************/
 
 AlterPublicationStmt:
@@ -9672,7 +9686,7 @@ AlterPublicationStmt:
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
 					n->tables = $6;
-					n->tableAction = DEFELEM_ADD;
+					n->action = DEFELEM_ADD;
 					$$ = (Node *)n;
 				}
 			| ALTER PUBLICATION name SET TABLE relation_expr_list
@@ -9680,7 +9694,7 @@ AlterPublicationStmt:
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
 					n->tables = $6;
-					n->tableAction = DEFELEM_SET;
+					n->action = DEFELEM_SET;
 					$$ = (Node *)n;
 				}
 			| ALTER PUBLICATION name DROP TABLE relation_expr_list
@@ -9688,7 +9702,31 @@ AlterPublicationStmt:
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
 					n->tables = $6;
-					n->tableAction = DEFELEM_DROP;
+					n->action = DEFELEM_DROP;
+					$$ = (Node *)n;
+				}
+			| ALTER PUBLICATION name ADD_P SEQUENCE relation_expr_list
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					n->pubname = $3;
+					n->sequences = $6;
+					n->action = DEFELEM_ADD;
+					$$ = (Node *)n;
+				}
+			| ALTER PUBLICATION name SET SEQUENCE relation_expr_list
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					n->pubname = $3;
+					n->sequences = $6;
+					n->action = DEFELEM_SET;
+					$$ = (Node *)n;
+				}
+			| ALTER PUBLICATION name DROP SEQUENCE relation_expr_list
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					n->pubname = $3;
+					n->sequences = $6;
+					n->action = DEFELEM_DROP;
 					$$ = (Node *)n;
 				}
 		;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 70670169ac..f6296941d9 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -42,6 +42,7 @@
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
+#include "commands/sequence.h"
 
 typedef struct XLogRecordBuffer
 {
@@ -74,10 +75,11 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						bool two_phase);
 static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						  xl_xact_parsed_prepare *parsed);
-
+static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
+static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
 
 /* helper functions for decoding transactions */
 static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -158,6 +160,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			DecodeLogicalMsgOp(ctx, &buf);
 			break;
 
+		case RM_SEQ_ID:
+			DecodeSequence(ctx, &buf);
+			break;
+
 			/*
 			 * Rmgrs irrelevant for logical decoding; they describe stuff not
 			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -173,7 +179,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 		case RM_HASH_ID:
 		case RM_GIN_ID:
 		case RM_GIST_ID:
-		case RM_SEQ_ID:
 		case RM_SPGIST_ID:
 		case RM_BRIN_ID:
 		case RM_COMMIT_TS_ID:
@@ -1315,3 +1320,125 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
 			ctx->fast_forward || FilterByOrigin(ctx, origin_id));
 }
+
+/*
+ * Decode Sequence Tuple
+ */
+static void
+DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
+{
+	int			datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
+
+	Assert(datalen >= 0);
+
+	tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;;
+
+	ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+	tuple->tuple.t_tableOid = InvalidOid;
+
+	memcpy(((char *) tuple->tuple.t_data),
+		   data + sizeof(xl_seq_rec),
+		   SizeofHeapTupleHeader);
+
+	memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
+		   data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
+		   datalen);
+}
+
+/*
+ * Handle sequence decode
+ *
+ * Decoding sequences is a bit tricky, because while most sequence actions
+ * are non-transactional (not subject to rollback), some need to be handled
+ * as transactional.
+ *
+ * By default, a sequence increment is non-transactional - we must not queue
+ * it in a transaction as other changes, because the transaction might get
+ * rolled back and we'd discard the increment. The downstream would not be
+ * notified about the increment, which is wrong.
+ *
+ * On the other hand, the sequence may be created in a transaction. In this
+ * case we *should* queue the change as other changes in the transaction,
+ * because we don't want to send the increments for unknown sequence to the
+ * plugin - it might get confused about which sequence it's related to etc. 
+ */
+static void
+DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	ReorderBufferTupleBuf *tuplebuf;
+	RelFileNode target_node;
+	XLogReaderState *r = buf->record;
+	char	   *tupledata = NULL;
+	Size		tuplelen;
+	Size		datalen = 0;
+	TransactionId xid = XLogRecGetXid(r);
+	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+	xl_seq_rec *xlrec;
+	Snapshot	snapshot;
+	RepOriginId origin_id = XLogRecGetOrigin(r);
+	bool		transactional;
+
+	/* only decode changes flagged with XLOG_SEQ_LOG */
+	if (info != XLOG_SEQ_LOG)
+		elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding messages.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	tupledata = XLogRecGetData(r);
+	datalen = XLogRecGetDataLen(r);
+	tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
+
+	/* extract the WAL record, with "created" flag */
+	xlrec = (xl_seq_rec *) XLogRecGetData(r);
+
+	/* XXX how could we have sequence change without data? */
+	if(!datalen || !tupledata)
+		return;
+
+	tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+	DecodeSeqTuple(tupledata, datalen, tuplebuf);
+
+	/*
+	 * Should we handle the sequence increment as transactional or not?
+	 *
+	 * If the sequence was created in a still-running transaction, treat
+	 * it as transactional and queue the increments. Otherwise it needs
+	 * to be treated as non-transactional, in which case we send it to
+	 * the plugin right away.
+	 */
+	transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
+														 target_node,
+														 xlrec->created);
+
+	/* Skip the change if already processed (per the snapshot). */
+	if (transactional &&
+		!SnapBuildProcessChange(builder, xid, buf->origptr))
+		return;
+	else if (!transactional &&
+			 (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+			  SnapBuildXactNeedsSkip(builder, buf->origptr)))
+		return;
+
+	/* Queue the increment (or send immediately if not transactional). */
+	snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+	ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
+							   origin_id, target_node, transactional,
+							   xlrec->created, tuplebuf);
+}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ffc6160e9f..66a4a4603a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
+static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							   XLogRecPtr sequence_lsn, Relation rel,
+							   bool transactional, bool created,
+							   int64 last_value, int64 log_cnt, bool is_called);
 
 /* streaming callbacks */
 static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -217,6 +221,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->sequence = sequence_cb_wrapper;
 
 	/*
 	 * To support streaming, we require start/stop/abort/commit/change
@@ -1179,6 +1184,43 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				    XLogRecPtr sequence_lsn, Relation rel,
+				    bool transactional, bool created,
+				    int64 last_value, int64 log_cnt, bool is_called)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	if (ctx->callbacks.sequence_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "sequence";
+	state.report_location = sequence_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = sequence_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+							   created, last_value, log_cnt, is_called);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						XLogRecPtr first_lsn)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1cf59e0fb0..f9e928f906 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -389,6 +389,58 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 	pq_sendbytes(out, message, sz);
 }
 
+/*
+ * Write SEQUENCE to stream
+ */
+void
+logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
+						  XLogRecPtr lsn, bool transactional, bool created,
+						  int64 last_value, int64 log_cnt, bool is_called)
+{
+	uint8		flags = 0;
+	char	   *relname;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
+
+	/* transaction ID (if not valid, we're not streaming) */
+	if (TransactionIdIsValid(xid))
+		pq_sendint32(out, xid);
+
+	pq_sendint8(out, flags);
+	pq_sendint64(out, lsn);
+
+	logicalrep_write_namespace(out, RelationGetNamespace(rel));
+	relname = RelationGetRelationName(rel);
+	pq_sendstring(out, relname);
+
+	pq_sendint8(out, transactional);
+	pq_sendint8(out, created);
+	pq_sendint64(out, last_value);
+	pq_sendint64(out, log_cnt);
+	pq_sendint8(out, is_called);
+}
+
+/*
+ * Read SEQUENCE from the stream.
+ */
+void
+logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
+{
+	/* XXX skipping flags and lsn */
+	pq_getmsgint(in, 1);
+	pq_getmsgint64(in);
+
+	/* Read relation name from stream */
+	seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
+	seqdata->seqname = pstrdup(pq_getmsgstring(in));
+
+	seqdata->transactional = pq_getmsgint(in, 1);
+	seqdata->created = pq_getmsgint(in, 1);
+	seqdata->last_value = pq_getmsgint64(in);
+	seqdata->log_cnt = pq_getmsgint64(in);
+	seqdata->is_called = pq_getmsgint(in, 1);
+}
+
 /*
  * Write relation description to the output stream.
  */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 2d9e1279bb..94b0290d78 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -116,6 +116,13 @@ typedef struct ReorderBufferTXNByIdEnt
 	ReorderBufferTXN *txn;
 } ReorderBufferTXNByIdEnt;
 
+/* entry for hash table we use to track sequences created in running xacts */
+typedef struct ReorderBufferSequenceEnt
+{
+	RelFileNode		rnode;
+	TransactionId	xid;
+} ReorderBufferSequenceEnt;
+
 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
 typedef struct ReorderBufferTupleCidKey
 {
@@ -337,6 +344,14 @@ ReorderBufferAllocate(void)
 	buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
 								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
+	/* hash table of sequences, mapping relfilenode to XID of transaction */
+	hash_ctl.keysize = sizeof(RelFileNode);
+	hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
+	hash_ctl.hcxt = buffer->context;
+
+	buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
+								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
 	buffer->by_txn_last_xid = InvalidTransactionId;
 	buffer->by_txn_last_txn = NULL;
 
@@ -523,6 +538,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			break;
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			if (change->data.sequence.tuple)
+			{
+				ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
+				change->data.sequence.tuple = NULL;
+			}
+			break;
 	}
 
 	pfree(change);
@@ -850,6 +872,212 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 	}
 }
 
+/*
+ * Treat the sequence increment as transactional?
+ */
+bool
+ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+									 RelFileNode rnode, bool created)
+{
+	bool	found = false;
+
+	if (created)
+		return true;
+
+	hash_search(rb->sequences,
+				(void *) &rnode,
+				HASH_FIND,
+				&found);
+
+	return found;
+}
+
+
+/*
+ * A transactional sequence increment is queued to be processed upon commit
+ * and a non-transactional increment gets processed immediately.
+ *
+ * A sequence update may be both transactional and non-transactional. When
+ * created in a running transaction, treat it as transactional and queue
+ * the change in it. Otherwise treat it as non-transactional, so that we
+ * don't forget the increment in case of a rollback.
+ */
+void
+ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+						   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+						   RelFileNode rnode, bool transactional, bool created,
+						   ReorderBufferTupleBuf *tuplebuf)
+{
+	/*
+	 * Change needs to be handled as transactional, because the sequence was
+	 * created in a transaction that is still running. In that case all the
+	 * changes need to be queued in that transaction, we must not send them
+	 * to the downstream until the transaction commits.
+	 *
+	 * There's a bit of a trouble with subtransactions - we can't queue it
+	 * into the subxact, because it might be rolled back and we'd lose the
+	 * increment. We need to queue it into the same (sub)xact that created
+	 * the sequence, which is why we track the XID in the hash table.
+	 */
+	if (transactional)
+	{
+		MemoryContext oldcontext;
+		ReorderBufferChange *change;
+
+		/* lookup sequence by relfilenode */
+		ReorderBufferSequenceEnt   *ent;
+		bool						found;
+
+		/* transactional changes require a transaction */
+		Assert(xid != InvalidTransactionId);
+
+		/* search the lookup table (we ignore the return value, found is enough) */
+		ent = hash_search(rb->sequences,
+						  (void *) &rnode,
+						  created ? HASH_ENTER : HASH_FIND,
+						  &found);
+
+		/*
+		 * If this is the "create" increment, we must not have found any
+		 * pre-existing entry in the hash table (i.e. there must not be
+		 * any conflicting sequence).
+		 */
+		Assert(!(created && found));
+
+		/* But we must have either created or found an existing entry. */
+		Assert(created || found);
+
+		/*
+		 * When creating the sequence, remember the XID of the transaction
+		 * that created id.
+		 */
+		if (created)
+			ent->xid = xid;
+
+		/* XXX Maybe check that we're still in the same top-level xact? */
+
+		/* OK, allocate and queue the change */
+		oldcontext = MemoryContextSwitchTo(rb->context);
+
+		change = ReorderBufferGetChange(rb);
+
+		change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
+		change->origin_id = origin_id;
+
+		memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
+
+		change->data.sequence.created = created;
+		change->data.sequence.tuple = tuplebuf;
+
+		/* add it to the same subxact that created the sequence */
+		ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+	else
+	{
+		/*
+		 * This increment is for a sequence that was not created in any
+		 * running transaction, so we treat it as non-transactional and
+		 * just send it to the output plugin directly.
+		 */
+		ReorderBufferTXN *txn = NULL;
+		volatile Snapshot snapshot_now = snapshot;
+		bool	using_subtxn;
+
+#ifdef USE_ASSERT_CHECKING
+		/* All "creates" have to be handled as transactional. */
+		Assert(!created);
+
+		/* Make sure the sequence is not in the hash table. */
+		{
+			bool	found;
+			hash_search(rb->sequences,
+					    (void *) &rnode,
+						HASH_FIND, &found);
+			Assert(!found);
+		}
+#endif
+
+		if (xid != InvalidTransactionId)
+			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+		/* setup snapshot to allow catalog access */
+		SetupHistoricSnapshot(snapshot_now, NULL);
+
+		/*
+		 * Decoding needs access to syscaches et al., which in turn use
+		 * heavyweight locks and such. Thus we need to have enough state around to
+		 * keep track of those.  The easiest way is to simply use a transaction
+		 * internally.  That also allows us to easily enforce that nothing writes
+		 * to the database by checking for xid assignments.
+		 *
+		 * When we're called via the SQL SRF there's already a transaction
+		 * started, so start an explicit subtransaction there.
+		 */
+		using_subtxn = IsTransactionOrTransactionBlock();
+
+		PG_TRY();
+		{
+			Relation	relation;
+			HeapTuple	tuple;
+			bool		isnull;
+			int64		last_value, log_cnt;
+			bool		is_called;
+			Oid			reloid;
+
+			if (using_subtxn)
+				BeginInternalSubTransaction("sequence");
+			else
+				StartTransactionCommand();
+
+			reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
+
+			if (reloid == InvalidOid)
+				elog(ERROR, "could not map filenode \"%s\" to relation OID",
+					 relpathperm(rnode,
+								 MAIN_FORKNUM));
+
+			relation = RelationIdGetRelation(reloid);
+			tuple = &tuplebuf->tuple;
+
+			/*
+			 * Extract the internal sequence values, describing the state.
+			 *
+			 * XXX Seems a bit strange to access it directly. Maybe there's
+			 * a better / more correct way?
+			 */
+			last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull);
+			log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
+			is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
+
+			rb->sequence(rb, txn, lsn, relation, transactional, created,
+						 last_value, log_cnt, is_called);
+
+			RelationClose(relation);
+
+			TeardownHistoricSnapshot(false);
+
+			AbortCurrentTransaction();
+
+			if (using_subtxn)
+				RollbackAndReleaseCurrentSubTransaction();
+		}
+		PG_CATCH();
+		{
+			TeardownHistoricSnapshot(true);
+
+			AbortCurrentTransaction();
+
+			if (using_subtxn)
+				RollbackAndReleaseCurrentSubTransaction();
+
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
+	}
+}
+
 /*
  * AssertTXNLsnOrder
  *		Verify LSN ordering of transaction lists in the reorderbuffer
@@ -1526,6 +1754,31 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 				&found);
 	Assert(found);
 
+	/*
+	 * Remove sequences created in this transaction (if any).
+	 *
+	 * There's no way to search by XID, so we simply do a seqscan of all
+	 * the entries in the hash table. Hopefully there are only a couple
+	 * entries in most cases - people generally don't create many new
+	 * sequences over and over.
+	 */
+	{
+		HASH_SEQ_STATUS scan_status;
+		ReorderBufferSequenceEnt *ent;
+
+		hash_seq_init(&scan_status, rb->sequences);
+		while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
+		{
+			/* skip sequences not from this transaction */
+			if (ent->xid != txn->xid)
+				continue;
+
+			(void) hash_search(rb->sequences,
+						   (void *) &(ent->rnode),
+						   HASH_REMOVE, NULL);
+		}
+	}
+
 	/* remove entries spilled to disk */
 	if (rbtxn_is_serialized(txn))
 		ReorderBufferRestoreCleanup(rb, txn);
@@ -1941,6 +2194,39 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					change->data.msg.message);
 }
 
+/*
+ * Helper function for ReorderBufferProcessTXN for applying sequences.
+ */
+static inline void
+ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
+						   Relation relation, ReorderBufferChange *change,
+						   bool streaming)
+{
+	HeapTuple	tuple;
+	bool		isnull;
+	int64		last_value, log_cnt;
+	bool		is_called;
+
+	/* FIXME support streaming */
+	Assert(!streaming);
+
+	tuple = &change->data.sequence.tuple->tuple;
+
+	/*
+	 * Extract the internal sequence values, describing the state.
+	 *
+	 * XXX Seems a bit strange to access it directly. Maybe there's
+	 * a better / more correct way?
+	 */
+	last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull);
+	log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
+	is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
+
+	rb->sequence(rb, txn, change->lsn, relation, true,	/* gotta be transactional */
+				 change->data.sequence.created,
+				 last_value, log_cnt, is_called);
+}
+
 /*
  * Function to store the command id and snapshot at the end of the current
  * stream so that we can reuse the same while sending the next stream.
@@ -2357,6 +2643,31 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
+
+				case REORDER_BUFFER_CHANGE_SEQUENCE:
+					Assert(snapshot_now);
+
+					reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
+												change->data.sequence.relnode.relNode);
+
+					if (reloid == InvalidOid)
+						elog(ERROR, "could not map filenode \"%s\" to relation OID",
+							 relpathperm(change->data.sequence.relnode,
+										 MAIN_FORKNUM));
+
+					relation = RelationIdGetRelation(reloid);
+
+					if (!RelationIsValid(relation))
+						elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+							 reloid,
+							 relpathperm(change->data.sequence.relnode,
+										 MAIN_FORKNUM));
+
+					if (RelationIsLogicallyLogged(relation))
+						ReorderBufferApplySequence(rb, txn, relation, change, streaming);
+
+					RelationClose(relation);
+					break;
 			}
 		}
 
@@ -3751,6 +4062,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				memcpy(data, change->data.truncate.relids, size);
 				data += size;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			{
+				char	   *data;
+				ReorderBufferTupleBuf *tup;
+				Size		len = 0;
+
+				tup = change->data.sequence.tuple;
+
+				if (tup)
+				{
+					sz += sizeof(HeapTupleData);
+					len = tup->tuple.t_len;
+					sz += len;
+				}
+
+				/* make sure we have enough space */
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+				/* might have been reallocated above */
+				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+				if (len)
+				{
+					memcpy(data, &tup->tuple, sizeof(HeapTupleData));
+					data += sizeof(HeapTupleData);
+
+					memcpy(data, tup->tuple.t_data, len);
+					data += len;
+				}
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4014,6 +4358,22 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 			{
 				sz += sizeof(Oid) * change->data.truncate.nrelids;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			{
+				ReorderBufferTupleBuf *tup;
+				Size		len = 0;
+
+				tup = change->data.sequence.tuple;
+
+				if (tup)
+				{
+					sz += sizeof(HeapTupleData);
+					len = tup->tuple.t_len;
+					sz += len;
+				}
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4314,6 +4674,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 				break;
 			}
+
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			if (change->data.sequence.tuple)
+			{
+				uint32		tuplelen = ((HeapTuple) data)->t_len;
+
+				change->data.sequence.tuple =
+					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
+
+				/* restore ->tuple */
+				memcpy(&change->data.sequence.tuple->tuple, data,
+					   sizeof(HeapTupleData));
+				data += sizeof(HeapTupleData);
+
+				/* reset t_data pointer into the new tuplebuf */
+				change->data.sequence.tuple->tuple.t_data =
+					ReorderBufferTupleBufData(change->data.sequence.tuple);
+
+				/* restore tuple data itself */
+				memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
+				data += tuplelen;
+			}
+			break;
+
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 67f907cdd9..d1ba2a9645 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -99,6 +99,7 @@
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
+#include "commands/sequence.h"
 #include "miscadmin.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
@@ -353,6 +354,12 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  *
  * If the synchronization position is reached (SYNCDONE), then the table can
  * be marked as READY and is no longer tracked.
+ *
+ * XXX This needs to handle sequences too - after AlterSubscription_refresh
+ * starts caring about sequences, GetSubscriptionNotReadyRelations won't
+ * return just tables, and we'll have to sync them here. Not sure it's worth
+ * creating a new "sync" worker per sequence, maybe we should just sync them
+ * in the current process (it's pretty light-weight).
  */
 static void
 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
@@ -870,6 +877,99 @@ copy_table(Relation rel)
 	logicalrep_rel_close(relmapentry, NoLock);
 }
 
+
+
+/*
+ * FIXME add comment
+ */
+static void
+fetch_sequence_data(char *nspname, char *relname,
+					int64 *last_value, int64 *log_cnt, bool *is_called)
+{
+	WalRcvExecResult *res;
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[3] = {INT8OID, INT8OID, BOOLOID};
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
+					   "  FROM %s", quote_qualified_identifier(nspname, relname));
+
+	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow);
+	pfree(cmd.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not receive list of replicated tables from the publisher: %s",
+						res->err)));
+
+	/* Process the sequence. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		bool		isnull;
+
+		*last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		*log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		*is_called = DatumGetBool(slot_getattr(slot, 3, &isnull));
+		Assert(!isnull);
+
+		ExecClearTuple(slot);
+	}
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+}
+
+/*
+ * Copy existing data of a sequence from publisher.
+ *
+ * Caller is responsible for locking the local relation.
+ */
+static void
+copy_sequence(Relation rel)
+{
+	LogicalRepRelMapEntry *relmapentry;
+	LogicalRepRelation lrel;
+	StringInfoData cmd;
+	int64		last_value = 0,
+				log_cnt = 0;
+	bool		is_called = 0;
+
+	/* Get the publisher relation info. */
+	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
+							RelationGetRelationName(rel), &lrel);
+
+	/* Put the relation into relmap. */
+	logicalrep_relmap_update(&lrel);
+
+	/* Map the publisher relation to local one. */
+	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
+	Assert(rel == relmapentry->localrel);
+
+	/* Start copy on the publisher. */
+	initStringInfo(&cmd);
+
+	Assert(lrel.relkind == RELKIND_SEQUENCE);
+
+	fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
+
+	elog(WARNING, "sequence %s info last_value %ld log_cnt %ld is_called %d",
+		 quote_qualified_identifier(lrel.nspname, lrel.relname),
+		 last_value, log_cnt, is_called);
+
+	ResetSequence2(RelationGetRelid(rel), last_value, log_cnt, is_called);
+
+	logicalrep_rel_close(relmapentry, NoLock);
+}
+
+
+
+
 /*
  * Determine the tablesync slot name.
  *
@@ -1102,10 +1202,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 						originname)));
 	}
 
-	/* Now do the initial data copy */
-	PushActiveSnapshot(GetTransactionSnapshot());
-	copy_table(rel);
-	PopActiveSnapshot();
+	if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
+	{
+		/* Now do the initial sequence copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_sequence(rel);
+		PopActiveSnapshot();
+	}
+	else
+	{
+		/* Now do the initial data copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_table(rel);
+		PopActiveSnapshot();
+	}
 
 	res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
 	if (res->status != WALRCV_OK_COMMAND)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6ba447ea97..1e85a65890 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -70,6 +70,7 @@
 #include "catalog/pg_tablespace.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
+#include "commands/sequence.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/execPartition.h"
@@ -813,6 +814,36 @@ apply_handle_origin(StringInfo s)
 				 errmsg("ORIGIN message sent out of order")));
 }
 
+/*
+ * Handle SEQUENCE message.
+ */
+static void
+apply_handle_sequence(StringInfo s)
+{
+	LogicalRepSequence	seq;
+	Oid					relid;
+
+	// FIXME
+	// if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
+	//	return;
+	ensure_transaction();
+
+	logicalrep_read_sequence(s, &seq);
+
+	relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
+										  seq.seqname, -1),
+							 RowExclusiveLock, false);
+
+	elog(WARNING, "applying sequence transactional %d created %d last_value %ld log_cnt %ld is_called %d",
+		 seq.transactional, seq.created, seq.last_value, seq.log_cnt, seq.is_called);
+
+	/* apply the sequence change */
+	ResetSequence2(relid, seq.last_value, seq.log_cnt, seq.is_called);
+
+	/* Commit the per-stream transaction */
+	CommitTransactionCommand();
+}
+
 /*
  * Handle STREAM START message.
  */
@@ -2017,6 +2048,10 @@ apply_dispatch(StringInfo s)
 			 */
 			return;
 
+		case LOGICAL_REP_MSG_SEQUENCE:
+			apply_handle_sequence(s);
+			return;
+
 		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
 			return;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index fe12d08a94..37aeb42d22 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -49,6 +49,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							 bool transactional, const char *prefix,
 							 Size sz, const char *message);
+static void pgoutput_sequence(LogicalDecodingContext *ctx,
+							  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+							  Relation rel, bool transactional, bool created,
+							  int64 last_value, int64 log_cnt, bool is_called);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
@@ -144,6 +148,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pgoutput_change;
 	cb->truncate_cb = pgoutput_truncate;
 	cb->message_cb = pgoutput_message;
+	cb->sequence_cb = pgoutput_sequence;
 	cb->commit_cb = pgoutput_commit_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
@@ -166,11 +171,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		publication_names_given = false;
 	bool		binary_option_given = false;
 	bool		messages_option_given = false;
+	bool		sequences_option_given = false;
 	bool		streaming_given = false;
 
 	data->binary = false;
 	data->streaming = false;
 	data->messages = false;
+	data->sequences = true;
 
 	foreach(lc, options)
 	{
@@ -236,6 +243,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 
 			data->messages = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "sequences") == 0)
+		{
+			if (sequences_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			sequences_option_given = true;
+
+			data->sequences = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "streaming") == 0)
 		{
 			if (streaming_given)
@@ -753,6 +770,47 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pgoutput_sequence(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+				  Relation rel, bool transactional, bool created,
+				  int64 last_value, int64 log_cnt, bool is_called)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	TransactionId xid = InvalidTransactionId;
+	RelationSyncEntry *relentry;
+
+	if (!data->sequences)
+		return;
+
+	if (!is_publishable_relation(rel))
+		return;
+
+	/* XXX handle (in_streaming) here */
+
+	relentry = get_rel_sync_entry(data, RelationGetRelid(rel));
+
+	/*
+	 * First check the sequence filter.
+	 *
+	 * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here.
+	 */
+	if (!relentry->pubactions.pubsequence)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_sequence(ctx->out,
+							  rel,
+							  xid,
+							  sequence_lsn,
+							  transactional,
+							  created,
+							  last_value,
+							  log_cnt,
+							  is_called);
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * Currently we always forward.
  */
@@ -1026,7 +1084,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->streamed_txns = NIL;
 		entry->replicate_valid = false;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
-			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+			entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
+			entry->pubactions.pubsequence = false;
 		entry->publish_as_relid = InvalidOid;
 		entry->map = NULL;	/* will be set by maybe_send_schema() if needed */
 	}
@@ -1037,6 +1096,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		List	   *pubids = GetRelationPublications(relid);
 		ListCell   *lc;
 		Oid			publish_as_relid = relid;
+		bool		is_sequence = (get_rel_relkind(relid) == RELKIND_SEQUENCE);
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -1060,12 +1120,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
 
-			if (pub->alltables)
+			if (pub->alltables && (!is_sequence))
 			{
 				publish = true;
 				if (pub->pubviaroot && am_partition)
 					publish_as_relid = llast_oid(get_partition_ancestors(relid));
 			}
+			else if (pub->allsequences && is_sequence)
+			{
+				publish = true;
+			}
+
+			/* if a sequence, just cross-check the list of publications */
+			if (!publish && is_sequence)
+			{
+				if (list_member_oid(pubids, pub->oid))
+					publish = true;
+			}
 
 			if (!publish)
 			{
@@ -1116,10 +1187,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+				entry->pubactions.pubsequence |= pub->pubactions.pubsequence;
 			}
 
 			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
+				entry->pubactions.pubdelete && entry->pubactions.pubtruncate &&
+				entry->pubactions.pubsequence)
 				break;
 		}
 
@@ -1264,5 +1337,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 		entry->pubactions.pubupdate = false;
 		entry->pubactions.pubdelete = false;
 		entry->pubactions.pubtruncate = false;
+		entry->pubactions.pubsequence = false;
 	}
 }
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index fd05615e76..890829bc0c 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5487,6 +5487,7 @@ GetRelationPublicationActions(Relation relation)
 		pubactions->pubupdate |= pubform->pubupdate;
 		pubactions->pubdelete |= pubform->pubdelete;
 		pubactions->pubtruncate |= pubform->pubtruncate;
+		pubactions->pubsequence |= pubform->pubsequence;
 
 		ReleaseSysCache(tup);
 
@@ -5495,7 +5496,8 @@ GetRelationPublicationActions(Relation relation)
 		 * other publications.
 		 */
 		if (pubactions->pubinsert && pubactions->pubupdate &&
-			pubactions->pubdelete && pubactions->pubtruncate)
+			pubactions->pubdelete && pubactions->pubtruncate &&
+			pubactions->pubsequence)
 			break;
 	}
 
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 32c1bdfdca..147745d94c 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1640,7 +1640,7 @@ psql_completion(const char *text, int start, int end)
 
 	/* ALTER PUBLICATION <name> */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny))
-		COMPLETE_WITH("ADD TABLE", "DROP TABLE", "OWNER TO", "RENAME TO", "SET");
+		COMPLETE_WITH("ADD TABLE", "DROP TABLE", "ADD SEQUENCE", "DROP SEQUENCE", "OWNER TO", "RENAME TO", "SET");
 	/* ALTER PUBLICATION <name> SET */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET"))
 		COMPLETE_WITH("(", "TABLE");
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index acbcae4607..e3043e0f63 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11442,6 +11442,11 @@
   provolatile => 's', prorettype => 'oid', proargtypes => 'text',
   proallargtypes => '{text,oid}', proargmodes => '{i,o}',
   proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' },
+{ oid => '8000', descr => 'get OIDs of sequences in a publication',
+  proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't',
+  provolatile => 's', prorettype => 'oid', proargtypes => 'text',
+  proallargtypes => '{text,oid}', proargmodes => '{i,o}',
+  proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
   proname => 'pg_relation_is_publishable', provolatile => 's',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 1b31fee9e3..d1c9a18b05 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -40,6 +40,12 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 	 */
 	bool		puballtables;
 
+	/*
+	 * indicates that this is special publication which should encompass all
+	 * sequences in the database (except for the unlogged and temp ones)
+	 */
+	bool		puballsequences;
+
 	/* true if inserts are published */
 	bool		pubinsert;
 
@@ -52,6 +58,9 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 	/* true if truncates are published */
 	bool		pubtruncate;
 
+	/* true if sequences are published */
+	bool		pubsequence;
+
 	/* true if partition changes are published using root schema */
 	bool		pubviaroot;
 } FormData_pg_publication;
@@ -74,6 +83,7 @@ typedef struct PublicationActions
 	bool		pubupdate;
 	bool		pubdelete;
 	bool		pubtruncate;
+	bool		pubsequence;
 } PublicationActions;
 
 typedef struct Publication
@@ -81,6 +91,7 @@ typedef struct Publication
 	Oid			oid;
 	char	   *name;
 	bool		alltables;
+	bool		allsequences;
 	bool		pubviaroot;
 	PublicationActions pubactions;
 } Publication;
@@ -109,6 +120,9 @@ extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
 extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(bool pubviaroot);
 
+extern List *GetAllSequencesPublicationRelations(void);
+extern List *GetPublicationSequenceRelations(Oid pubid);
+
 extern bool is_publishable_relation(Relation rel);
 extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
 											  bool if_not_exists);
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 40544dd4c7..c28e8695cb 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -48,6 +48,7 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data;
 typedef struct xl_seq_rec
 {
 	RelFileNode node;
+	bool		created;	/* is this a CREATE SEQUENCE */
 	/* SEQUENCE TUPLE DATA FOLLOWS AT THE END */
 } xl_seq_rec;
 
@@ -59,6 +60,7 @@ extern ObjectAddress DefineSequence(ParseState *pstate, CreateSeqStmt *stmt);
 extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
 extern void DeleteSequenceTuple(Oid relid);
 extern void ResetSequence(Oid seq_relid);
+extern void ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called);
 extern void ResetSequenceCaches(void);
 
 extern void seq_redo(XLogReaderState *rptr);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index ef73342019..b45c103b6c 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3624,8 +3624,10 @@ typedef struct AlterPublicationStmt
 
 	/* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
 	List	   *tables;			/* List of tables to add/drop */
+	List	   *sequences;		/* List of sequences to add/drop */
 	bool		for_all_tables; /* Special publication for all tables in db */
-	DefElemAction tableAction;	/* What action to perform with the tables */
+	bool		for_all_sequences; /* Special publication for all tables in db */
+	DefElemAction action;		/* What action to perform with the tables/sequences */
 } AlterPublicationStmt;
 
 typedef struct CreateSubscriptionStmt
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 55b90c03ea..f3a7d18fca 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -55,6 +55,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
 	LOGICAL_REP_MSG_MESSAGE = 'M',
+	LOGICAL_REP_MSG_SEQUENCE = 'X',	/* FIXME change */
 	LOGICAL_REP_MSG_STREAM_START = 'S',
 	LOGICAL_REP_MSG_STREAM_END = 'E',
 	LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
@@ -107,6 +108,19 @@ typedef struct LogicalRepTyp
 	char	   *typname;		/* name of the remote type */
 } LogicalRepTyp;
 
+/* Sequence info */
+typedef struct LogicalRepSequence
+{
+	Oid			remoteid;		/* unique id of the remote sequence */
+	char	   *nspname;		/* schema name of remote sequence */
+	char	   *seqname;		/* name of the remote sequence */
+	bool		transactional;
+	bool		created;
+	int64		last_value;
+	int64		log_cnt;		/* XXX probably not needed? */
+	bool		is_called;		/* XXX probably not needed? */
+} LogicalRepSequence;
+
 /* Transaction info */
 typedef struct LogicalRepBeginData
 {
@@ -154,6 +168,12 @@ extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
+extern void logicalrep_write_sequence(StringInfo out, Relation rel,
+									  TransactionId xid, XLogRecPtr lsn, 
+									  bool transactional, bool created,
+									  int64 last_value, int64 log_cnt,
+									  bool is_called);
+extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 810495ed0e..e3e1b03f32 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -88,6 +88,19 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
 										Size message_size,
 										const char *message);
 
+/*
+ * Called for the generic logical decoding sequences.
+ */
+typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
+										 ReorderBufferTXN *txn,
+										 XLogRecPtr sequence_lsn,
+										 Relation rel,
+										 bool transactional,
+										 bool created,
+										 int64 last_value,
+										 int64 log_cnt,
+										 bool is_called);
+
 /*
  * Filter changes by origin.
  */
@@ -219,6 +232,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeTruncateCB truncate_cb;
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeSequenceCB sequence_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 51e7c0348d..a5aec2928a 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -27,6 +27,7 @@ typedef struct PGOutputData
 	bool		binary;
 	bool		streaming;
 	bool		messages;
+	bool		sequences;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0c6e9d1cb9..5190cdf196 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -63,7 +63,8 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
-	REORDER_BUFFER_CHANGE_TRUNCATE
+	REORDER_BUFFER_CHANGE_TRUNCATE,
+	REORDER_BUFFER_CHANGE_SEQUENCE,
 };
 
 /* forward declaration */
@@ -157,6 +158,14 @@ typedef struct ReorderBufferChange
 			uint32		ninvalidations; /* Number of messages */
 			SharedInvalidationMessage *invalidations;	/* invalidation message */
 		}			inval;
+
+		/* Context data for Sequence changes */
+		struct
+		{
+			RelFileNode relnode;
+			bool		created;
+			ReorderBufferTupleBuf *tuple;
+		}			sequence;
 	}			data;
 
 	/*
@@ -425,6 +434,15 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* sequence callback signature */
+typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb,
+										 ReorderBufferTXN *txn,
+										 XLogRecPtr sequence_lsn,
+										 Relation rel,
+										 bool transactional, bool created,
+										 int64 last_value, int64 log_cnt,
+										 bool is_called);
+
 /* begin prepare callback signature */
 typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
 											 ReorderBufferTXN *txn);
@@ -506,6 +524,12 @@ struct ReorderBuffer
 	 */
 	HTAB	   *by_txn;
 
+	/*
+	 * relfilenode => XID lookup table for sequences created in a transaction
+	 * (also includes altered sequences, which assigns new relfilenode)
+	 */
+	HTAB	   *sequences;
+
 	/*
 	 * Transactions that could be a toplevel xact, ordered by LSN of the first
 	 * record bearing that xid.
@@ -536,6 +560,7 @@ struct ReorderBuffer
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
+	ReorderBufferSequenceCB sequence;
 
 	/*
 	 * Callbacks to be called when streaming a transaction at prepare time.
@@ -630,6 +655,10 @@ void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
 void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
+void		ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+									   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+									   RelFileNode rnode, bool transactional, bool created,
+									   ReorderBufferTupleBuf *tuplebuf);
 void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
@@ -677,4 +706,7 @@ void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
 void		StartupReorderBuffer(void);
 
+bool		ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+												 RelFileNode rnode, bool created);
+
 #endif
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 63d6ab7a4e..2cbad49f2e 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -158,8 +158,8 @@ DROP TABLE testpub_parted1;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 -- fail - view
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
-ERROR:  "testpub_view" is not a table
-DETAIL:  Only tables can be added to publications.
+ERROR:  "testpub_view" is not a table or sequence
+DETAIL:  Only tables and sequences can be added to publications.
 SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1, pub_test.testpub_nopk;
 RESET client_min_messages;
@@ -180,8 +180,8 @@ Tables:
 
 -- fail - view
 ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
-ERROR:  "testpub_view" is not a table
-DETAIL:  Only tables can be added to publications.
+ERROR:  "testpub_view" is not a table or sequence
+DETAIL:  Only tables and sequences can be added to publications.
 ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
 ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
 ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index e5ab11275d..393be871be 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,6 +1451,14 @@ pg_prepared_xacts| SELECT p.transaction,
    FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
      LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
      LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_publication_sequences| SELECT p.pubname,
+    n.nspname AS schemaname,
+    c.relname AS sequencename
+   FROM pg_publication p,
+    LATERAL pg_get_publication_sequences((p.pubname)::text) gpt(relid),
+    (pg_class c
+     JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
+  WHERE (c.oid = gpt.relid);
 pg_publication_tables| SELECT p.pubname,
     n.nspname AS schemaname,
     c.relname AS tablename

Reply via email to