On 4/5/22 12:06, Amit Kapila wrote:
> On Mon, Apr 4, 2022 at 3:10 AM Tomas Vondra
> <tomas.von...@enterprisedb.com> wrote:
>>
>> I did some experiments over the weekend, exploring how to rework the
>> sequence decoding in various ways. Let me share some WIP patches,
>> hopefully that can be useful for trying more stuff and moving this
>> discussion forward.
>>
>> I tried two things - (1) accumulating sequence increments in global
>> array and then doing something with it, and (2) treating all sequence
>> increments as regular changes (in a TXN) and then doing something
>> special during the replay. Attached are two patchsets, one for each
>> approach.
>>
>> Note: It's important to remember decoding of sequences is not the only
>> code affected by this. The logical messages have the same issue,
>> certainly when it comes to transactional vs. non-transactional stuff and
>> handling of snapshots. Even if the sequence decoding ends up being
>> reverted, we still need to fix that, somehow. And my feeling is the
>> solutions ought to be pretty similar in both cases.
>>
>> Now, regarding the two approaches:
>>
>> (1) accumulating sequences in global hash table
>>
>> The main problem with regular sequence increments is that those need to
>> be non-transactional - a transaction may use a sequence without any
>> WAL-logging, if the WAL was written by an earlier transaction. The
>> problem is the earlier trasaction might have been rolled back, and thus
>> simply discarded by the logical decoding. But we still need to apply
>> that, in order not to lose the sequence increment.
>>
>> The current code just applies those non-transactional increments right
>> after decoding the increment, but that does not work because we may not
>> have a snapshot at that point. And we only have the snapshot when within
>> a transaction (AFAICS) so this queues all changes and then applies the
>> changes later.
>>
>> The changes need to be shared by all transactions, so queueing them in a
>> global works fairly well - otherwise we'd have to walk all transactions,
>> in order to see if there are relevant sequence increments.
>>
>> But some increments may be transactional, e.g. when the sequence is
>> created or altered in a transaction. To allow tracking this, this uses a
>> hash table, with relfilenode as a key.
>>
>> There's a couple issues with this, though. Firstly, stashing the changes
>> outside transactions, it's not included in memory accounting, it's not
>> spilled to disk or streamed, etc. I guess fixing this is possible, but
>> it's certainly not straightforward, because we mix increments from many
>> different transactions.
>>
>> A bigger issue is that I'm not sure this actually handles the snapshots
>> correctly either.
>>
>> The non-transactional increments affect all transactions, so when
>> ReorderBufferProcessSequences gets executed, it processes all of them,
>> no matter the source transaction. Can we be sure the snapshot in the
>> applying transaction is the same (or "compatible") as the snapshot in
>> the source transaction?
>>
> 
> I don't think we can assume that. I think it is possible that some
> other transaction's WAL can be in-between start/end lsn of txn (which
> we decide to send) which may not finally reach a consistent state.
> Consider a case similar to shown in one of my previous emails:
> Session-2:
> Begin;
> SELECT pg_current_xact_id();
> 
> Session-1:
> SELECT 'init' FROM pg_create_logical_replication_slot('test_slot',
> 'test_decoding', false, true);
> 
> Session-3:
> Begin;
> SELECT pg_current_xact_id();
> 
> Session-2:
> Commit;
> Begin;
> INSERT INTO t1_seq SELECT nextval('seq1') FROM generate_series(1,100);
> 
> Session-3:
> Commit;
> 
> Session-2:
> Commit;
> 
> Here, we send changes (say insert from txn 700) from session-2 because
> session-3's commit happens before it. Now, consider another
> transaction parallel to txn 700 which generates some WAL related to
> sequences but it committed before session-3's commit. So though, its
> changes will be the in-between start/end LSN of txn 700 but those
> shouldn't be sent.
> 
> I have not tried this and also this may be solvable in some way but I
> think processing changes from other TXNs sounds risky to me in terms
> of snapshot handling.
> 

Yes, I know this can happen. I was only really thinking about what might
happen to the relfilenode of the sequence itself - and I don't think any
concurrent transaction could swoop in and change the relfilenode in any
meaningful way, due to locking.

But of course, if we expect/require to have a perfect snapshot for that
exact position in the transaction, this won't work. IMO the whole idea
that we can have non-transactional bits in naturally transactional
decoding seems a bit suspicious (at least in hindsight).

No matter what we do for sequences, though, this still affects logical
messages too. Not sure what to do there :-(

>>
>>
>> (2) treating sequence change as regular changes
>>
>> This adopts a different approach - instead of accumulating the sequence
>> increments in a global hash table, it treats them as regular changes.
>> Which solves the snapshot issue, and issues with spilling to disk,
>> streaming and so on.
>>
>> But it has various other issues with handling concurrent transactions,
>> unfortunately, which probably make this approach infeasible:
>>
>> * The non-transactional stuff has to be applied in the first transaction
>> that commits, not in the transaction that generated the WAL. That does
>> not work too well with this approach, because we have to walk changes in
>> all other transactions.
>>
> 
> Why do you want to traverse other TXNs in this approach? Is it because
> the current TXN might be using some value of sequence which has been
> actually WAL logged in the other transaction but that other
> transaction has not been sent yet? I think if we don't send that then
> probably replica sequences columns (in some tables) have some values
> but actually the sequence itself won't have still that value which
> sounds problematic. Is that correct?
> 

Well, how else would you get to sequence changes in the other TXNs?

Consider this:

T1: begin
T2: begin

T2: nextval('s') -> writes WAL for 32 values
T1: nextval('s') -> gets value without WAL

T1: commit
T2: commit

Now, if we commit T1 without "applying" the sequence change from T2, we
loose the sequence state. But we still write/replicate the value
generated from the sequence.

>> * Another serious issue seems to be streaming - if we already streamed
>> some of the changes, we can't iterate through them anymore.
>>
>> Also, having to walk the transactions over and over for each change, to
>> apply relevant sequence increments, that's mighty expensive. The other
>> approach needs to do that too, but walking the global hash table seems
>> much cheaper.
>>
>> The other issue this handling of aborted transactions - we need to apply
>> sequence increments even from those transactions, of course. The other
>> approach has this issue too, though.
>>
>>
>> (3) tracking sequences touched by transaction
>>
>> This is the approach proposed by Hannu Krosing. I haven't explored this
>> again yet, but I recall I wrote a PoC patch a couple months back.
>>
>> It seems to me most of the problems stems from trying to derive sequence
>> state from decoded WAL changes, which is problematic because of the
>> non-transactional nature of sequences (i.e. WAL for one transaction
>> affects other transactions in non-obvious ways). And this approach
>> simply works around that entirely - instead of trying to deduce the
>> sequence state from WAL, we'd make sure to write the current sequence
>> state (or maybe just ID of the sequence) at commit time. Which should
>> eliminate most of the complexity / problems, I think.
>>
> 
> That sounds promising but I haven't thought in detail about that approach.
> 

So, here's a patch doing that. It's a reworked/improved version of the
patch [1] shared in November.

It seems to be working pretty nicely. The behavior is a little bit
different, of course, because we only replicate "committed" changes, so
if you do nextval() in aborted transaction that is not replicated. Which
I think is fine, because we generally make no durability guarantees for
aborted transactions in general.

But there are a couple issues too:

1) locking

We have to read sequence change before the commit, but we must not allow
reordering (because then the state might go backwards again). I'm not
sure how serious impact could this have on performance.

2) dropped sequences

I'm not sure what to do about sequences dropped in the transaction. The
patch simply attempts to read the current sequence state before the
commit, but if the sequence was dropped (in that transaction), that
can't happen. I'm not sure if that's OK or not.

3) WAL record

To replicate the stuff the patch uses a LogicalMessage, but I guess a
separate WAL record would be better. But that's a technical detail.


regards

[1]
https://www.postgresql.org/message-id/2cd38bab-c874-8e0b-98e7-d9abaaf98...@enterprisedb.com

>>
>> I'm not really sure what to do about this. All of those reworks seems
>> like an extensive redesign of the patch, and considering the last CF is
>> already over ... not great.
>>
> 
> Yeah, I share the same feeling that even if we devise solutions to all
> the known problems it requires quite some time to ensure everything is
> correct.
> 

True. Let's keep working on this for a bit more time and then we can
decide what to do.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 0ba154eef933a9b39bb7eaa0e339620a2f98a37b Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 5 Apr 2022 00:25:02 +0200
Subject: [PATCH] rework sequence decoding

---
 contrib/test_decoding/test_decoding.c         |  24 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c  |  13 +
 src/backend/access/transam/xact.c             |  25 ++
 src/backend/commands/sequence.c               | 263 +++++++-----
 src/backend/replication/logical/decode.c      | 112 ++---
 src/backend/replication/logical/logical.c     |   9 +-
 src/backend/replication/logical/message.c     |   3 +-
 src/backend/replication/logical/proto.c       |   4 +-
 .../replication/logical/reorderbuffer.c       | 387 ++----------------
 src/backend/replication/logical/tablesync.c   |   3 +-
 src/backend/replication/logical/worker.c      |  16 +-
 src/backend/replication/pgoutput/pgoutput.c   |  21 +-
 src/include/access/rmgrlist.h                 |   2 +-
 src/include/commands/sequence.h               |   5 +-
 src/include/replication/logicalproto.h        |   2 -
 src/include/replication/message.h             |  15 +
 src/include/replication/output_plugin.h       |   2 -
 src/include/replication/reorderbuffer.h       |  16 +-
 18 files changed, 330 insertions(+), 592 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index c7a87f5fe5b..851117b4155 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -79,7 +79,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  Size sz, const char *message);
 static void pg_decode_sequence(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
-							  Relation rel, bool transactional,
+							  Relation rel,
 							  int64 last_value, int64 log_cnt, bool is_called);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
 									 TransactionId xid,
@@ -123,7 +123,7 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
 									 Size sz, const char *message);
 static void pg_decode_stream_sequence(LogicalDecodingContext *ctx,
 									  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
-									  Relation rel, bool transactional,
+									  Relation rel,
 									  int64 last_value, int64 log_cnt, bool is_called);
 static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
 									  ReorderBufferTXN *txn,
@@ -782,7 +782,6 @@ pg_decode_message(LogicalDecodingContext *ctx,
 static void
 pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				   XLogRecPtr sequence_lsn, Relation rel,
-				   bool transactional,
 				   int64 last_value, int64 log_cnt, bool is_called)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
@@ -792,22 +791,19 @@ pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		return;
 
 	/* output BEGIN if we haven't yet, but only for the transactional case */
-	if (transactional)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
-		if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
-		{
-			pg_output_begin(ctx, data, txn, false);
-		}
-		txndata->xact_wrote_changes = true;
+		pg_output_begin(ctx, data, txn, false);
 	}
+	txndata->xact_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	appendStringInfoString(ctx->out, "sequence ");
 	appendStringInfoString(ctx->out,
 						   quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
 													  RelationGetRelationName(rel)));
-	appendStringInfo(ctx->out, 	": transactional:%d last_value: " INT64_FORMAT " log_cnt: " INT64_FORMAT " is_called:%d",
-					 transactional, last_value, log_cnt, is_called);
+	appendStringInfo(ctx->out, 	": last_value: %zu log_cnt: %zu is_called:%d",
+					 last_value, log_cnt, is_called);
 	OutputPluginWrite(ctx, true);
 }
 
@@ -1013,7 +1009,6 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
 static void
 pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 						  XLogRecPtr sequence_lsn, Relation rel,
-						  bool transactional,
 						  int64 last_value, int64 log_cnt, bool is_called)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
@@ -1023,7 +1018,6 @@ pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		return;
 
 	/* output BEGIN if we haven't yet, but only for the transactional case */
-	if (transactional)
 	{
 		if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 		{
@@ -1037,8 +1031,8 @@ pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	appendStringInfoString(ctx->out,
 						   quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
 													  RelationGetRelationName(rel)));
-	appendStringInfo(ctx->out, 	": transactional:%d last_value: " INT64_FORMAT " log_cnt: " INT64_FORMAT " is_called:%d",
-					 transactional, last_value, log_cnt, is_called);
+	appendStringInfo(ctx->out, 	": last_value: " INT64_FORMAT " log_cnt: " INT64_FORMAT " is_called:%d",
+					 last_value, log_cnt, is_called);
 	OutputPluginWrite(ctx, true);
 }
 
diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicalmsgdesc.c
index 099e11a84e7..de77dcb0dc2 100644
--- a/src/backend/access/rmgrdesc/logicalmsgdesc.c
+++ b/src/backend/access/rmgrdesc/logicalmsgdesc.c
@@ -40,6 +40,16 @@ logicalmsg_desc(StringInfo buf, XLogReaderState *record)
 			sep = " ";
 		}
 	}
+	else if (info == XLOG_LOGICAL_SEQUENCE)
+	{
+		xl_logical_sequence *xlrec = (xl_logical_sequence *) rec;
+
+		appendStringInfo(buf, "rel %u/%u/%u last: %lu log_cnt: %lu is_called: %d",
+						 xlrec->node.spcNode,
+						 xlrec->node.dbNode,
+						 xlrec->node.relNode,
+						 xlrec->last, xlrec->log_cnt, xlrec->is_called);
+	}
 }
 
 const char *
@@ -48,5 +58,8 @@ logicalmsg_identify(uint8 info)
 	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
 		return "MESSAGE";
 
+	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_SEQUENCE)
+		return "SEQUENCE";
+
 	return NULL;
 }
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 3596a7d7345..5b6a07ca892 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -37,6 +37,7 @@
 #include "catalog/storage.h"
 #include "commands/async.h"
 #include "commands/tablecmds.h"
+#include "commands/sequence.h"
 #include "commands/trigger.h"
 #include "common/pg_prng.h"
 #include "executor/spi.h"
@@ -2223,6 +2224,15 @@ CommitTransaction(void)
 	/* Prevent cancel/die interrupt while cleaning up */
 	HOLD_INTERRUPTS();
 
+	/*
+	 * Write state of sequences to WAL. We need to do this early enough so
+	 * that we can still write stuff to WAL, but probably before changing
+	 * the transaction state.
+	 *
+	 * XXX Should this happen before/after holding the interrupts?
+	 */
+	AtEOXact_Sequences(true);
+
 	/* Commit updates to the relation map --- do this as late as possible */
 	AtEOXact_RelationMap(true, is_parallel_worker);
 
@@ -2499,6 +2509,15 @@ PrepareTransaction(void)
 	/* Prevent cancel/die interrupt while cleaning up */
 	HOLD_INTERRUPTS();
 
+	/*
+	 * Write state of sequences to WAL. We need to do this early enough so
+	 * that we can still write stuff to WAL, but probably before changing
+	 * the transaction state.
+	 *
+	 * XXX Should this happen before/after holding the interrupts?
+	 */
+	AtEOXact_Sequences(true);
+
 	/*
 	 * set the current transaction state information appropriately during
 	 * prepare processing
@@ -2734,6 +2753,12 @@ AbortTransaction(void)
 			 TransStateAsString(s->state));
 	Assert(s->parent == NULL);
 
+	/*
+	 * For transaction abort we don't need to write anything to WAL, we just
+	 * do cleanup of the hash table etc.
+	 */
+	AtEOXact_Sequences(false);
+
 	/*
 	 * set the current transaction state information appropriately during the
 	 * abort processing
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 717bb0b2aa9..50c03b71dac 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -37,6 +37,7 @@
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "parser/parse_type.h"
+#include "replication/message.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/smgr.h"
@@ -75,6 +76,7 @@ typedef struct SeqTableData
 {
 	Oid			relid;			/* pg_class OID of this sequence (hash key) */
 	Oid			filenode;		/* last seen relfilenode of this sequence */
+	Oid			tablespace;		/* last seen tablespace of this sequence */
 	LocalTransactionId lxid;	/* xact in which we last did a seq op */
 	bool		last_valid;		/* do we have a valid "last" value? */
 	int64		last;			/* value last returned by nextval */
@@ -82,6 +84,7 @@ typedef struct SeqTableData
 	/* if last != cached, we have not used up all the cached values */
 	int64		increment;		/* copy of sequence's increment field */
 	/* note that increment is zero until we first do nextval_internal() */
+	bool		need_log;		/* should be written to WAL at commit? */
 } SeqTableData;
 
 typedef SeqTableData *SeqTable;
@@ -336,83 +339,13 @@ ResetSequence(Oid seq_relid)
 	relation_close(seq_rel, NoLock);
 }
 
-/*
- * Update the sequence state by modifying the existing sequence data row.
- *
- * This keeps the same relfilenode, so the behavior is non-transactional.
- */
-static void
-SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called)
-{
-	SeqTable	elm;
-	Relation	seqrel;
-	Buffer		buf;
-	HeapTupleData seqdatatuple;
-	Form_pg_sequence_data seq;
-
-	/* open and lock sequence */
-	init_sequence(seqrelid, &elm, &seqrel);
-
-	/* lock page' buffer and read tuple */
-	seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
-
-	/* check the comment above nextval_internal()'s equivalent call. */
-	if (RelationNeedsWAL(seqrel))
-	{
-		GetTopTransactionId();
-
-		if (XLogLogicalInfoActive())
-			GetCurrentTransactionId();
-	}
-
-	/* ready to change the on-disk (or really, in-buffer) tuple */
-	START_CRIT_SECTION();
-
-	seq->last_value = last_value;
-	seq->is_called = is_called;
-	seq->log_cnt = log_cnt;
-
-	MarkBufferDirty(buf);
-
-	/* XLOG stuff */
-	if (RelationNeedsWAL(seqrel))
-	{
-		xl_seq_rec	xlrec;
-		XLogRecPtr	recptr;
-		Page		page = BufferGetPage(buf);
-
-		XLogBeginInsert();
-		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
-
-		xlrec.node = seqrel->rd_node;
-		xlrec.created = false;
-
-		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
-		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
-
-		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
-
-		PageSetLSN(page, recptr);
-	}
-
-	END_CRIT_SECTION();
-
-	UnlockReleaseBuffer(buf);
-
-	/* Clear local cache so that we don't think we have cached numbers */
-	/* Note that we do not change the currval() state */
-	elm->cached = elm->last;
-
-	relation_close(seqrel, NoLock);
-}
-
 /*
  * Update the sequence state by creating a new relfilenode.
  *
  * This creates a new relfilenode, to allow transactional behavior.
  */
-static void
-SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
+void
+SetSequence(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
 {
 	SeqTable	elm;
 	Relation	seqrel;
@@ -469,27 +402,6 @@ SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool i
 	relation_close(seqrel, NoLock);
 }
 
-/*
- * Set a sequence to a specified internal state.
- *
- * The change is made transactionally, so that on failure of the current
- * transaction, the sequence will be restored to its previous state.
- * We do that by creating a whole new relfilenode for the sequence; so this
- * works much like the rewriting forms of ALTER TABLE.
- *
- * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
- * which must not be released until end of transaction.  Caller is also
- * responsible for permissions checking.
- */
-void
-SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called)
-{
-	if (transactional)
-		SetSequence_transactional(seq_relid, last_value, log_cnt, is_called);
-	else
-		SetSequence_non_transactional(seq_relid, last_value, log_cnt, is_called);
-}
-
 /*
  * Initialize a sequence's relation with the specified tuple as content
  */
@@ -530,7 +442,13 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
 	tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
 	ItemPointerSet(&tuple->t_data->t_ctid, 0, FirstOffsetNumber);
 
-	/* check the comment above nextval_internal()'s equivalent call. */
+	/*
+	 * Remember we need to write this sequence to WAL at commit, and make sure
+	 * we have XID if we may need to decode this sequence.
+	 *
+	 * XXX Maybe we should not be doing this except when actually reading a
+	 * value from the sequence?
+	 */
 	if (RelationNeedsWAL(rel))
 	{
 		GetTopTransactionId();
@@ -558,7 +476,6 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.node = rel->rd_node;
-		xlrec.created = true;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) tuple->t_data, tuple->t_len);
@@ -762,6 +679,23 @@ nextval_internal(Oid relid, bool check_permissions)
 	/* open and lock sequence */
 	init_sequence(relid, &elm, &seqrel);
 
+	/*
+	 * Remember we need to write this sequence to WAL at commit, and make sure
+	 * we have XID if we may need to decode this sequence.
+	 *
+	 * XXX Maybe we should not be doing this except when actually reading a
+	 * value from the sequence?
+	 */
+	if (RelationNeedsWAL(seqrel))
+	{
+		elm->need_log = true;
+
+		GetTopTransactionId();
+
+		if (XLogLogicalInfoActive())
+			GetCurrentTransactionId();
+	}
+
 	if (check_permissions &&
 		pg_class_aclcheck(elm->relid, GetUserId(),
 						  ACL_USAGE | ACL_UPDATE) != ACLCHECK_OK)
@@ -973,7 +907,6 @@ nextval_internal(Oid relid, bool check_permissions)
 		seq->log_cnt = 0;
 
 		xlrec.node = seqrel->rd_node;
-		xlrec.created = false;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -1092,6 +1025,23 @@ do_setval(Oid relid, int64 next, bool iscalled)
 	/* open and lock sequence */
 	init_sequence(relid, &elm, &seqrel);
 
+	/*
+	 * Remember we need to write this sequence to WAL at commit, and make sure
+	 * we have XID if we may need to decode this sequence.
+	 *
+	 * XXX Maybe we should not be doing this except when actually reading a
+	 * value from the sequence?
+	 */
+	if (RelationNeedsWAL(seqrel))
+	{
+		elm->need_log = true;
+
+		GetTopTransactionId();
+
+		if (XLogLogicalInfoActive())
+			GetCurrentTransactionId();
+	}
+
 	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
 		ereport(ERROR,
 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
@@ -1166,8 +1116,6 @@ do_setval(Oid relid, int64 next, bool iscalled)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.node = seqrel->rd_node;
-		xlrec.created = false;
-
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
 
@@ -1316,6 +1264,7 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
 	if (seqrel->rd_rel->relfilenode != elm->filenode)
 	{
 		elm->filenode = seqrel->rd_rel->relfilenode;
+		elm->tablespace = seqrel->rd_rel->reltablespace;
 		elm->cached = elm->last;
 	}
 
@@ -2051,3 +2000,121 @@ seq_mask(char *page, BlockNumber blkno)
 
 	mask_unused_space(page);
 }
+
+
+static void
+read_sequence_info(Relation seqrel, int64 *last_value, int64 *log_cnt, bool *is_called)
+{
+	Buffer		buf;
+	Form_pg_sequence_data seq;
+	HeapTupleData	seqdatatuple;
+
+	seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
+
+	*last_value = seq->last_value;
+	*is_called = seq->is_called;
+	*log_cnt = seq->log_cnt;
+
+	UnlockReleaseBuffer(buf);
+}
+
+/* XXX Do this only for wal_level = logical, probably? */
+void
+AtEOXact_Sequences(bool isCommit)
+{
+	SeqTable		entry;
+	HASH_SEQ_STATUS	scan;
+
+	if (!seqhashtab)
+		return;
+
+	/* only do this with wal_level=logical */
+	if (!XLogLogicalInfoActive())
+		return;
+
+	/*
+	 * XXX Maybe we could enforce having XID here? Or is it too late?
+	 */
+	// Assert(GetTopTransactionId() != InvalidTransactionId);
+	// Assert(GetCurrentTransactionId() != InvalidTransactionId);
+
+	hash_seq_init(&scan, seqhashtab);
+
+	while ((entry = (SeqTable) hash_seq_search(&scan)))
+	{
+		Relation			rel;
+		RelFileNode			rnode;
+		xl_logical_sequence	xlrec;
+
+		/* not commit, we don't guarantee any data to be durable */
+		if (!isCommit)
+			entry->need_log = false;
+
+		/*
+		 * If not touched in the current transaction, don't log anything.
+		 * We leave needs_log set, so that if future transactions touch
+		 * the sequence we'll log it properly.
+		 */
+		if (!entry->need_log)
+			continue;
+
+		/* if this is commit, we'll log the */
+		entry->need_log = false;
+
+		/*
+		 * does the relation still exist?
+		 *
+		 * XXX We need to make sure another transaction can't jump ahead of
+		 * this one, otherwise the ordering of sequence changes could change.
+		 * Imagine T1 and T2, where T1 writes sequence state first, but then
+		 * T2 does it too and commits first:
+		 *
+		 * T1: log sequence state
+		 * T2: increment sequence
+		 * T2: log sequence state
+		 * T2: commit
+		 * T1: commit
+		 *
+		 * If we apply the sequences in this order, it'd be broken as the
+		 * value might go backwars. ShareUpdateExclusive protects against
+		 * that, but it's also restricting commit throughtput, probably.
+		 *
+		 * XXX This might be an issue for deadlocks, too, if two xacts try
+		 * to write sequences in different ordering. We may need to sort
+		 * the OIDs first, to enforce the same lock ordering.
+		 */
+		rel = try_relation_open(entry->relid, ShareUpdateExclusiveLock);
+
+		/* XXX relation might have been dropped, maybe we should have logged
+		 * the last change? */
+		if (!rel)
+			continue;
+
+		/* tablespace */
+		if (OidIsValid(entry->tablespace))
+			rnode.spcNode = entry->tablespace;
+		else
+			rnode.spcNode = MyDatabaseTableSpace;
+
+		rnode.dbNode = MyDatabaseId;			/* database */
+		rnode.relNode = entry->filenode;		/* relation */
+
+		xlrec.node = rnode;
+		xlrec.reloid = entry->relid;
+
+		/* XXX is it good enough to log values we have in cache? seems
+		 * wrong and we may need to re-read that. */
+		read_sequence_info(rel, &xlrec.last, &xlrec.log_cnt, &xlrec.is_called);
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) &xlrec, SizeOfLogicalSequence);
+
+		/* allow origin filtering */
+		XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+		(void) XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_SEQUENCE);
+
+		/* hold the sequence lock until the end of the transaction */
+		relation_close(rel, NoLock);
+	}
+}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77bc7aea7a0..a8548f9ac64 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -44,6 +44,9 @@
 #include "storage/standby.h"
 #include "commands/sequence.h"
 
+static void DecodeLogicalMessage(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -64,7 +67,6 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
-static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
 
 /* helper functions for decoding transactions */
 static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -560,6 +562,27 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
  */
 void
 logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_LOGICAL_MESSAGE:
+			DecodeLogicalMessage(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_SEQUENCE:
+			DecodeLogicalSequence(ctx, buf);
+			break;
+
+		default:
+			elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
+	}
+}
+
+static void
+DecodeLogicalMessage(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
@@ -1253,36 +1276,6 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			ctx->fast_forward || FilterByOrigin(ctx, origin_id));
 }
 
-/*
- * DecodeSeqTuple
- *		decode tuple describing the sequence increment
- *
- * Sequences are represented as a table with a single row, which gets updated
- * by nextval(). The tuple is stored in WAL right after the xl_seq_rec, so we
- * simply copy it into the tuplebuf (similar to seq_redo).
- */
-static void
-DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
-{
-	int			datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
-
-	Assert(datalen >= 0);
-
-	tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
-
-	ItemPointerSetInvalid(&tuple->tuple.t_self);
-
-	tuple->tuple.t_tableOid = InvalidOid;
-
-	memcpy(((char *) tuple->tuple.t_data),
-		   data + sizeof(xl_seq_rec),
-		   SizeofHeapTupleHeader);
-
-	memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
-		   data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
-		   datalen);
-}
-
 /*
  * Handle sequence decode
  *
@@ -1301,24 +1294,18 @@ DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
  * plugin - it might get confused about which sequence it's related to etc.
  */
 void
-sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeLogicalSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
-	ReorderBufferTupleBuf *tuplebuf;
 	RelFileNode target_node;
 	XLogReaderState *r = buf->record;
-	char	   *tupledata = NULL;
-	Size		tuplelen;
-	Size		datalen = 0;
 	TransactionId xid = XLogRecGetXid(r);
 	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
-	xl_seq_rec *xlrec;
-	Snapshot	snapshot;
+	xl_logical_sequence *xlrec;
 	RepOriginId origin_id = XLogRecGetOrigin(r);
-	bool		transactional;
 
 	/* only decode changes flagged with XLOG_SEQ_LOG */
-	if (info != XLOG_SEQ_LOG)
+	if (info != XLOG_LOGICAL_SEQUENCE)
 		elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
 
 	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
@@ -1331,8 +1318,11 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		ctx->fast_forward)
 		return;
 
+	/* extract the WAL record, with "created" flag */
+	xlrec = (xl_logical_sequence *) XLogRecGetData(r);
+
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	target_node = xlrec->node;
 	if (target_node.dbNode != ctx->slot->data.database)
 		return;
 
@@ -1340,44 +1330,12 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
-	tupledata = XLogRecGetData(r);
-	datalen = XLogRecGetDataLen(r);
-	tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
-
-	/* extract the WAL record, with "created" flag */
-	xlrec = (xl_seq_rec *) XLogRecGetData(r);
-
-	/* XXX how could we have sequence change without data? */
-	if(!datalen || !tupledata)
-		return;
-
-	tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
-	DecodeSeqTuple(tupledata, datalen, tuplebuf);
-
-	/*
-	 * Should we handle the sequence increment as transactional or not?
-	 *
-	 * If the sequence was created in a still-running transaction, treat
-	 * it as transactional and queue the increments. Otherwise it needs
-	 * to be treated as non-transactional, in which case we send it to
-	 * the plugin right away.
-	 */
-	transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
-														 target_node,
-														 xlrec->created);
-
 	/* Skip the change if already processed (per the snapshot). */
-	if (transactional &&
-		!SnapBuildProcessChange(builder, xid, buf->origptr))
-		return;
-	else if (!transactional &&
-			 (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
-			  SnapBuildXactNeedsSkip(builder, buf->origptr)))
+	if (!SnapBuildProcessChange(builder, xid, buf->origptr))
 		return;
 
 	/* Queue the increment (or send immediately if not transactional). */
-	snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
-	ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
-							   origin_id, target_node, transactional,
-							   xlrec->created, tuplebuf);
+	ReorderBufferQueueSequence(ctx->reorder, xid, buf->endptr,
+							   origin_id, xlrec->reloid, target_node,
+							   xlrec->last, xlrec->log_cnt, xlrec->is_called);
 }
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index e1f14aeecb5..d00cc08fb3a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -75,7 +75,6 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   const char *prefix, Size message_size, const char *message);
 static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 								XLogRecPtr sequence_lsn, Relation rel,
-								bool transactional,
 								int64 last_value, int64 log_cnt, bool is_called);
 
 /* streaming callbacks */
@@ -96,7 +95,6 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx
 									  const char *prefix, Size message_size, const char *message);
 static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   XLogRecPtr sequence_lsn, Relation rel,
-									   bool transactional,
 									   int64 last_value, int64 log_cnt, bool is_called);
 static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   int nrelations, Relation relations[], ReorderBufferChange *change);
@@ -1218,7 +1216,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 static void
 sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
-					XLogRecPtr sequence_lsn, Relation rel, bool transactional,
+					XLogRecPtr sequence_lsn, Relation rel,
 					int64 last_value, int64 log_cnt, bool is_called)
 {
 	LogicalDecodingContext *ctx = cache->private_data;
@@ -1245,7 +1243,7 @@ sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_location = sequence_lsn;
 
 	/* do the actual work: call callback */
-	ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+	ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel,
 							   last_value, log_cnt, is_called);
 
 	/* Pop the error context stack */
@@ -1560,7 +1558,6 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void
 stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						   XLogRecPtr sequence_lsn, Relation rel,
-						   bool transactional,
 						   int64 last_value, int64 log_cnt, bool is_called)
 {
 	LogicalDecodingContext *ctx = cache->private_data;
@@ -1591,7 +1588,7 @@ stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_location = sequence_lsn;
 
 	/* do the actual work: call callback */
-	ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+	ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel,
 							   last_value, log_cnt, is_called);
 
 	/* Pop the error context stack */
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
index 1c34912610e..69178dc1cfb 100644
--- a/src/backend/replication/logical/message.c
+++ b/src/backend/replication/logical/message.c
@@ -82,7 +82,8 @@ logicalmsg_redo(XLogReaderState *record)
 {
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
-	if (info != XLOG_LOGICAL_MESSAGE)
+	if (info != XLOG_LOGICAL_MESSAGE &&
+		info != XLOG_LOGICAL_SEQUENCE)
 		elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
 
 	/* This is only interesting for logical decoding, see decode.c. */
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 18d3cbb9248..ae078eed048 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -667,7 +667,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
  */
 void
 logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
-						  XLogRecPtr lsn, bool transactional,
+						  XLogRecPtr lsn,
 						  int64 last_value, int64 log_cnt, bool is_called)
 {
 	uint8		flags = 0;
@@ -686,7 +686,6 @@ logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
 	relname = RelationGetRelationName(rel);
 	pq_sendstring(out, relname);
 
-	pq_sendint8(out, transactional);
 	pq_sendint64(out, last_value);
 	pq_sendint64(out, log_cnt);
 	pq_sendint8(out, is_called);
@@ -706,7 +705,6 @@ logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
 	seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
 	seqdata->seqname = pstrdup(pq_getmsgstring(in));
 
-	seqdata->transactional = pq_getmsgint(in, 1);
 	seqdata->last_value = pq_getmsgint64(in);
 	seqdata->log_cnt = pq_getmsgint64(in);
 	seqdata->is_called = pq_getmsgint(in, 1);
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4702750a2e7..0cc7c8d3bc4 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -77,39 +77,7 @@
  *	  a bit more memory to the oldest subtransactions, because it's likely
  *	  they are the source for the next sequence of changes.
  *
- *	  When decoding sequences, we differentiate between a sequences created
- *	  in a (running) transaction, and sequences created in other (already
- *	  committed) transactions. Changes for sequences created in the same
- *	  top-level transaction are treated as "transactional" i.e. just like
- *	  any other change from that transaction (and discarded in case of a
- *	  rollback). Changes for sequences created earlier are treated as not
- *	  transactional - are processed immediately, as if performed outside
- *	  any transaction (and thus not rolled back).
- *
- *	  This mixed behavior is necessary - sequences are non-transactional
- *	  (e.g. ROLLBACK does not undo the sequence increments). But for new
- *	  sequences, we need to handle them in a transactional way, because if
- *	  we ever get some DDL support, the sequence won't exist until the
- *	  transaction gets applied. So we need to ensure the increments don't
- *	  happen until the sequence gets created.
- *
- *	  To differentiate which sequences are "old" and which were created
- *	  in a still-running transaction, we track sequences created in running
- *	  transactions in a hash table. Sequences are identified by relfilenode,
- *	  and we track XID of the (sub)transaction that created it. This means
- *	  that if a transaction does something that changes the relfilenode
- *	  (like an alter / reset of a sequence), the new relfilenode will be
- *	  treated as if created in the transaction. The list of sequences gets
- *	  discarded when the transaction completes (commit/rollback).
- *
- *	  We don't use the XID to check if it's the same top-level transaction.
- *	  It's enough to know it was created in an in-progress transaction,
- *	  and we know it must be the current one because otherwise it wouldn't
- *	  see the sequence object.
- *
- *	  The XID may be valid even for non-transactional sequences - we simply
- *	  keep the XID logged to WAL, it's up to the reorderbuffer to decide if
- *	  the increment is transactional.
+ *	  FIXME
  *
  * -------------------------------------------------------------------------
  */
@@ -151,13 +119,6 @@ typedef struct ReorderBufferTXNByIdEnt
 	ReorderBufferTXN *txn;
 } ReorderBufferTXNByIdEnt;
 
-/* entry for hash table we use to track sequences created in running xacts */
-typedef struct ReorderBufferSequenceEnt
-{
-	RelFileNode		rnode;
-	TransactionId	xid;
-} ReorderBufferSequenceEnt;
-
 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
 typedef struct ReorderBufferTupleCidKey
 {
@@ -388,14 +349,6 @@ ReorderBufferAllocate(void)
 	buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
 								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
-	/* hash table of sequences, mapping relfilenode to XID of transaction */
-	hash_ctl.keysize = sizeof(RelFileNode);
-	hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
-	hash_ctl.hcxt = buffer->context;
-
-	buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
-								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
 	buffer->by_txn_last_xid = InvalidTransactionId;
 	buffer->by_txn_last_txn = NULL;
 
@@ -582,17 +535,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 				change->data.truncate.relids = NULL;
 			}
 			break;
-		case REORDER_BUFFER_CHANGE_SEQUENCE:
-			if (change->data.sequence.tuple)
-			{
-				ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
-				change->data.sequence.tuple = NULL;
-			}
-			break;
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
 			break;
 	}
 
@@ -923,57 +870,6 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 	}
 }
 
-/*
- * Treat the sequence increment as transactional?
- *
- * The hash table tracks all sequences created in in-progress transactions,
- * so we simply do a lookup (the sequence is identified by relfilende). If
- * we find a match, the increment should be handled as transactional.
- */
-bool
-ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
-									 RelFileNode rnode, bool created)
-{
-	bool	found = false;
-
-	if (created)
-		return true;
-
-	hash_search(rb->sequences,
-				(void *) &rnode,
-				HASH_FIND,
-				&found);
-
-	return found;
-}
-
-/*
- * Cleanup sequences created in in-progress transactions.
- *
- * There's no way to search by XID, so we simply do a seqscan of all
- * the entries in the hash table. Hopefully there are only a couple
- * entries in most cases - people generally don't create many new
- * sequences over and over.
- */
-static void
-ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
-{
-	HASH_SEQ_STATUS scan_status;
-	ReorderBufferSequenceEnt *ent;
-
-	hash_seq_init(&scan_status, rb->sequences);
-	while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
-	{
-		/* skip sequences not from this transaction */
-		if (ent->xid != xid)
-			continue;
-
-		(void) hash_search(rb->sequences,
-					   (void *) &(ent->rnode),
-					   HASH_REMOVE, NULL);
-	}
-}
-
 /*
  * A transactional sequence increment is queued to be processed upon commit
  * and a non-transactional increment gets processed immediately.
@@ -985,166 +881,32 @@ ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
  */
 void
 ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
-						   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
-						   RelFileNode rnode, bool transactional, bool created,
-						   ReorderBufferTupleBuf *tuplebuf)
+						   XLogRecPtr lsn, RepOriginId origin_id,
+						   Oid reloid, RelFileNode rnode,
+						   int64 last, int64 log_cnt, bool is_called)
 {
-	/*
-	 * Change needs to be handled as transactional, because the sequence was
-	 * created in a transaction that is still running. In that case all the
-	 * changes need to be queued in that transaction, we must not send them
-	 * to the downstream until the transaction commits.
-	 *
-	 * There's a bit of a trouble with subtransactions - we can't queue it
-	 * into the subxact, because it might be rolled back and we'd lose the
-	 * increment. We need to queue it into the same (sub)xact that created
-	 * the sequence, which is why we track the XID in the hash table.
-	 */
-	if (transactional)
-	{
-		MemoryContext oldcontext;
-		ReorderBufferChange *change;
-
-		/* lookup sequence by relfilenode */
-		ReorderBufferSequenceEnt   *ent;
-		bool						found;
-
-		/* transactional changes require a transaction */
-		Assert(xid != InvalidTransactionId);
-
-		/* search the lookup table (we ignore the return value, found is enough) */
-		ent = hash_search(rb->sequences,
-						  (void *) &rnode,
-						  created ? HASH_ENTER : HASH_FIND,
-						  &found);
-
-		/*
-		 * If this is the "create" increment, we must not have found any
-		 * pre-existing entry in the hash table (i.e. there must not be
-		 * any conflicting sequence).
-		 */
-		Assert(!(created && found));
-
-		/* But we must have either created or found an existing entry. */
-		Assert(created || found);
-
-		/*
-		 * When creating the sequence, remember the XID of the transaction
-		 * that created id.
-		 */
-		if (created)
-			ent->xid = xid;
-
-		/* XXX Maybe check that we're still in the same top-level xact? */
-
-		/* OK, allocate and queue the change */
-		oldcontext = MemoryContextSwitchTo(rb->context);
-
-		change = ReorderBufferGetChange(rb);
-
-		change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
-		change->origin_id = origin_id;
-
-		memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
-
-		change->data.sequence.tuple = tuplebuf;
-
-		/* add it to the same subxact that created the sequence */
-		ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
-
-		MemoryContextSwitchTo(oldcontext);
-	}
-	else
-	{
-		/*
-		 * This increment is for a sequence that was not created in any
-		 * running transaction, so we treat it as non-transactional and
-		 * just send it to the output plugin directly.
-		 */
-		ReorderBufferTXN *txn = NULL;
-		volatile Snapshot snapshot_now = snapshot;
-		bool	using_subtxn;
-
-#ifdef USE_ASSERT_CHECKING
-		/* All "creates" have to be handled as transactional. */
-		Assert(!created);
-
-		/* Make sure the sequence is not in the hash table. */
-		{
-			bool	found;
-			hash_search(rb->sequences,
-						(void *) &rnode,
-						HASH_FIND, &found);
-			Assert(!found);
-		}
-#endif
-
-		if (xid != InvalidTransactionId)
-			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
-
-		/* setup snapshot to allow catalog access */
-		SetupHistoricSnapshot(snapshot_now, NULL);
-
-		/*
-		 * Decoding needs access to syscaches et al., which in turn use
-		 * heavyweight locks and such. Thus we need to have enough state around to
-		 * keep track of those.  The easiest way is to simply use a transaction
-		 * internally.  That also allows us to easily enforce that nothing writes
-		 * to the database by checking for xid assignments.
-		 *
-		 * When we're called via the SQL SRF there's already a transaction
-		 * started, so start an explicit subtransaction there.
-		 */
-		using_subtxn = IsTransactionOrTransactionBlock();
-
-		PG_TRY();
-		{
-			Relation	relation;
-			HeapTuple	tuple;
-			Form_pg_sequence_data seq;
-			Oid			reloid;
-
-			if (using_subtxn)
-				BeginInternalSubTransaction("sequence");
-			else
-				StartTransactionCommand();
-
-			reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
-
-			if (reloid == InvalidOid)
-				elog(ERROR, "could not map filenode \"%s\" to relation OID",
-					 relpathperm(rnode,
-								 MAIN_FORKNUM));
-
-			relation = RelationIdGetRelation(reloid);
-			tuple = &tuplebuf->tuple;
-			seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+	MemoryContext oldcontext;
+	ReorderBufferChange *change;
 
-			rb->sequence(rb, txn, lsn, relation, transactional,
-						 seq->last_value, seq->log_cnt, seq->is_called);
+	/* OK, allocate and queue the change */
+	oldcontext = MemoryContextSwitchTo(rb->context);
 
-			RelationClose(relation);
+	change = ReorderBufferGetChange(rb);
 
-			TeardownHistoricSnapshot(false);
+	change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
+	change->origin_id = origin_id;
 
-			AbortCurrentTransaction();
+	memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
 
-			if (using_subtxn)
-				RollbackAndReleaseCurrentSubTransaction();
-		}
-		PG_CATCH();
-		{
-			TeardownHistoricSnapshot(true);
+	change->data.sequence.reloid = reloid;
+	change->data.sequence.last = last;
+	change->data.sequence.log_cnt = log_cnt;
+	change->data.sequence.is_called = is_called;
 
-			AbortCurrentTransaction();
-
-			if (using_subtxn)
-				RollbackAndReleaseCurrentSubTransaction();
+	/* add it to the same subxact that created the sequence */
+	ReorderBufferQueueChange(rb, xid, lsn, change, false);
 
-			PG_RE_THROW();
-		}
-		PG_END_TRY();
-	}
+	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
@@ -1823,9 +1585,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 				&found);
 	Assert(found);
 
-	/* Remove sequences created in this transaction (if any). */
-	ReorderBufferSequenceCleanup(rb, txn->xid);
-
 	/* remove entries spilled to disk */
 	if (rbtxn_is_serialized(txn))
 		ReorderBufferRestoreCleanup(rb, txn);
@@ -2249,19 +2008,23 @@ ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
 						   Relation relation, ReorderBufferChange *change,
 						   bool streaming)
 {
-	HeapTuple	tuple;
-	Form_pg_sequence_data seq;
-
-	tuple = &change->data.sequence.tuple->tuple;
-	seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+	int64		last_value, log_cnt;
+	bool		is_called;
+/*
+ * FIXME is it possible that we write sequence state for T1 before T2, but
+ * then end up committing T2 first? If the sequence could be incremented
+ * in between, that might cause data corruption result.
+ */
+	last_value = change->data.sequence.last;
+	log_cnt = change->data.sequence.log_cnt;
+	is_called = change->data.sequence.is_called;
 
-	/* Only ever called from ReorderBufferApplySequence, so transational. */
 	if (streaming)
-		rb->stream_sequence(rb, txn, change->lsn, relation, true,
-							seq->last_value, seq->log_cnt, seq->is_called);
+		rb->stream_sequence(rb, txn, change->lsn, relation,
+							last_value, log_cnt, is_called);
 	else
-		rb->sequence(rb, txn, change->lsn, relation, true,
-					 seq->last_value, seq->log_cnt, seq->is_called);
+		rb->sequence(rb, txn, change->lsn, relation,
+					 last_value, log_cnt, is_called);
 }
 
 /*
@@ -2710,15 +2473,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_SEQUENCE:
 					Assert(snapshot_now);
 
+					/*
 					reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
 												change->data.sequence.relnode.relNode);
 
 					if (reloid == InvalidOid)
-						elog(ERROR, "could not map filenode \"%s\" to relation OID",
+						elog(ERROR, "zz could not map filenode \"%s\" to relation OID",
 							 relpathperm(change->data.sequence.relnode,
 										 MAIN_FORKNUM));
 
-					relation = RelationIdGetRelation(reloid);
+					*/
+					relation = RelationIdGetRelation(change->data.sequence.reloid);
 
 					if (!RelationIsValid(relation))
 						elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
@@ -4115,45 +3880,13 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				memcpy(data, change->data.truncate.relids, size);
 				data += size;
 
-				break;
-			}
-		case REORDER_BUFFER_CHANGE_SEQUENCE:
-			{
-				char	   *data;
-				ReorderBufferTupleBuf *tup;
-				Size		len = 0;
-
-				tup = change->data.sequence.tuple;
-
-				if (tup)
-				{
-					sz += sizeof(HeapTupleData);
-					len = tup->tuple.t_len;
-					sz += len;
-				}
-
-				/* make sure we have enough space */
-				ReorderBufferSerializeReserve(rb, sz);
-
-				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
-				/* might have been reallocated above */
-				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
-
-				if (len)
-				{
-					memcpy(data, &tup->tuple, sizeof(HeapTupleData));
-					data += sizeof(HeapTupleData);
-
-					memcpy(data, tup->tuple.t_data, len);
-					data += len;
-				}
-
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
 			/* ReorderBufferChange contains everything important */
 			break;
 	}
@@ -4412,28 +4145,13 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 			{
 				sz += sizeof(Oid) * change->data.truncate.nrelids;
 
-				break;
-			}
-		case REORDER_BUFFER_CHANGE_SEQUENCE:
-			{
-				ReorderBufferTupleBuf *tup;
-				Size		len = 0;
-
-				tup = change->data.sequence.tuple;
-
-				if (tup)
-				{
-					sz += sizeof(HeapTupleData);
-					len = tup->tuple.t_len;
-					sz += len;
-				}
-
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
 			/* ReorderBufferChange contains everything important */
 			break;
 	}
@@ -4729,34 +4447,11 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 				break;
 			}
-
-		case REORDER_BUFFER_CHANGE_SEQUENCE:
-			if (change->data.sequence.tuple)
-			{
-				uint32		tuplelen = ((HeapTuple) data)->t_len;
-
-				change->data.sequence.tuple =
-					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
-
-				/* restore ->tuple */
-				memcpy(&change->data.sequence.tuple->tuple, data,
-					   sizeof(HeapTupleData));
-				data += sizeof(HeapTupleData);
-
-				/* reset t_data pointer into the new tuplebuf */
-				change->data.sequence.tuple->tuple.t_data =
-					ReorderBufferTupleBufData(change->data.sequence.tuple);
-
-				/* restore tuple data itself */
-				memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
-				data += tuplelen;
-			}
-			break;
-
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
 			break;
 	}
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 697fb23634c..d6adbd6e46d 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1220,8 +1220,7 @@ copy_sequence(Relation rel)
 
 	fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
 
-	/* tablesync sets the sequences in non-transactional way */
-	SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called);
+	SetSequence(RelationGetRelid(rel), last_value, log_cnt, is_called);
 
 	logicalrep_rel_close(relmapentry, NoLock);
 }
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f3868b3e1f8..fbc65f277ea 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1159,12 +1159,9 @@ apply_handle_sequence(StringInfo s)
 	logicalrep_read_sequence(s, &seq);
 
 	/*
-	 * Non-transactional sequence updates should not be part of a remote
-	 * transaction. There should not be any running transaction.
+	 * We should be in remote transaction.
 	 */
-	Assert((!seq.transactional) || in_remote_transaction);
-	Assert(!(!seq.transactional && in_remote_transaction));
-	Assert(!(!seq.transactional && IsTransactionState()));
+	Assert(in_remote_transaction);
 
 	/*
 	 * Make sure we're in a transaction (needed by SetSequence). For
@@ -1185,14 +1182,7 @@ apply_handle_sequence(StringInfo s)
 	LockRelationOid(relid, AccessExclusiveLock);
 
 	/* apply the sequence change */
-	SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
-
-	/*
-	 * Commit the per-stream transaction (we only do this when not in
-	 * remote transaction, i.e. for non-transactional sequence updates.
-	 */
-	if (!in_remote_transaction)
-		CommitTransactionCommand();
+	SetSequence(relid, seq.last_value, seq.log_cnt, seq.is_called);
 }
 
 /*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9d33630464c..31be8064ebe 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -57,7 +57,7 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 Size sz, const char *message);
 static void pgoutput_sequence(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
-							  Relation relation, bool transactional,
+							  Relation relation,
 							  int64 last_value, int64 log_cnt, bool is_called);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
@@ -1712,12 +1712,13 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 static void
 pgoutput_sequence(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
-				  Relation relation, bool transactional,
+				  Relation relation,
 				  int64 last_value, int64 log_cnt, bool is_called)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	TransactionId xid = InvalidTransactionId;
 	RelationSyncEntry *relentry;
+	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
 
 	if (!data->sequences)
 		return;
@@ -1742,25 +1743,15 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
 	if (!relentry->pubactions.pubsequence)
 		return;
 
-	/*
-	 * Output BEGIN if we haven't yet. Avoid for non-transactional
-	 * sequence changes.
-	 */
-	if (transactional)
-	{
-		PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
-
-		/* Send BEGIN if we haven't yet */
-		if (txndata && !txndata->sent_begin_txn)
-			pgoutput_send_begin(ctx, txn);
-	}
+	/* Send BEGIN if we haven't yet */
+	if (txndata && !txndata->sent_begin_txn)
+		pgoutput_send_begin(ctx, txn);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_sequence(ctx->out,
 							  relation,
 							  xid,
 							  sequence_lsn,
-							  transactional,
 							  last_value,
 							  log_cnt,
 							  is_called);
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index cf8b6d48193..9a74721c97c 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -40,7 +40,7 @@ PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog
 PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
 PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
 PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, sequence_decode)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
 PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
 PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 5bab90db8e0..0bc90c922ba 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -48,7 +48,6 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data;
 typedef struct xl_seq_rec
 {
 	RelFileNode node;
-	bool		created;	/* creates a new relfilenode (CREATE/ALTER) */
 	/* SEQUENCE TUPLE DATA FOLLOWS AT THE END */
 } xl_seq_rec;
 
@@ -60,9 +59,11 @@ extern ObjectAddress DefineSequence(ParseState *pstate, CreateSeqStmt *stmt);
 extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
 extern void DeleteSequenceTuple(Oid relid);
 extern void ResetSequence(Oid seq_relid);
-extern void SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called);
+extern void SetSequence(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called);
 extern void ResetSequenceCaches(void);
 
+extern void AtEOXact_Sequences(bool isCommit);
+
 extern void seq_redo(XLogReaderState *rptr);
 extern void seq_desc(StringInfo buf, XLogReaderState *rptr);
 extern const char *seq_identify(uint8 info);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 13ee10fdd4e..a466db394b8 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -125,7 +125,6 @@ typedef struct LogicalRepSequence
 	Oid			remoteid;		/* unique id of the remote sequence */
 	char	   *nspname;		/* schema name of remote sequence */
 	char	   *seqname;		/* name of the remote sequence */
-	bool		transactional;
 	int64		last_value;
 	int64		log_cnt;
 	bool		is_called;
@@ -245,7 +244,6 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_sequence(StringInfo out, Relation rel,
 									  TransactionId xid, XLogRecPtr lsn,
-									  bool transactional,
 									  int64 last_value, int64 log_cnt,
 									  bool is_called);
 extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
index 7d7785292f1..a084acc06ab 100644
--- a/src/include/replication/message.h
+++ b/src/include/replication/message.h
@@ -27,13 +27,28 @@ typedef struct xl_logical_message
 	char		message[FLEXIBLE_ARRAY_MEMBER];
 } xl_logical_message;
 
+/*
+ * Generic logical decoding sequence wal record.
+ */
+typedef struct xl_logical_sequence
+{
+	RelFileNode	node;
+	Oid			reloid;
+	int64		last;			/* last value emitted for sequence */
+	int64		log_cnt;		/* last value cached for sequence */
+	bool		is_called;
+} xl_logical_sequence;
+
 #define SizeOfLogicalMessage	(offsetof(xl_logical_message, message))
+#define SizeOfLogicalSequence	(sizeof(xl_logical_sequence))
 
 extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
 									size_t size, bool transactional);
 
 /* RMGR API*/
 #define XLOG_LOGICAL_MESSAGE	0x00
+#define XLOG_LOGICAL_SEQUENCE	0x10
+
 void		logicalmsg_redo(XLogReaderState *record);
 void		logicalmsg_desc(StringInfo buf, XLogReaderState *record);
 const char *logicalmsg_identify(uint8 info);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index fe85d49a030..abc05644516 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -95,7 +95,6 @@ typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
 										 ReorderBufferTXN *txn,
 										 XLogRecPtr sequence_lsn,
 										 Relation rel,
-										 bool transactional,
 										 int64 last_value,
 										 int64 log_cnt,
 										 bool is_called);
@@ -219,7 +218,6 @@ typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ct
 											   ReorderBufferTXN *txn,
 											   XLogRecPtr sequence_lsn,
 											   Relation rel,
-											   bool transactional,
 											   int64 last_value,
 											   int64 log_cnt,
 											   bool is_called);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0bcc150b331..260630c2ba5 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -164,7 +164,10 @@ typedef struct ReorderBufferChange
 		struct
 		{
 			RelFileNode relnode;
-			ReorderBufferTupleBuf *tuple;
+			Oid			reloid;
+			int64		last;
+			int64		log_cnt;
+			bool		is_called;
 		}			sequence;
 	}			data;
 
@@ -443,7 +446,6 @@ typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb,
 										 ReorderBufferTXN *txn,
 										 XLogRecPtr sequence_lsn,
 										 Relation rel,
-										 bool transactional,
 										 int64 last_value, int64 log_cnt,
 										 bool is_called);
 
@@ -518,7 +520,6 @@ typedef void (*ReorderBufferStreamSequenceCB) (ReorderBuffer *rb,
 											   ReorderBufferTXN *txn,
 											   XLogRecPtr sequence_lsn,
 											   Relation rel,
-											   bool transactional,
 											   int64 last_value, int64 log_cnt,
 											   bool is_called);
 
@@ -670,9 +671,9 @@ void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
 void		ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
-									   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
-									   RelFileNode rnode, bool transactional, bool created,
-									   ReorderBufferTupleBuf *tuplebuf);
+									   XLogRecPtr lsn, RepOriginId origin_id,
+									   Oid reloid, RelFileNode rnode,
+									   int64 last, int64 log_cnt, bool is_called);
 void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
@@ -720,7 +721,4 @@ void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
 void		StartupReorderBuffer(void);
 
-bool		ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
-												 RelFileNode rnode, bool created);
-
 #endif
-- 
2.34.1

Reply via email to