Hi,
On 9/23/21 12:27 PM, Peter Eisentraut wrote:
On 30.07.21 20:26, Tomas Vondra wrote:
Here's a an updated version of this patch - rebased to current master,
and fixing some of the issues raised in Peter's review.
This patch needs an update, as various conflicts have arisen now.
As was discussed before, it might be better to present a separate patch
for just the logical decoding part for now, since the replication and
DDL stuff has the potential to conflict heavily with other patches being
discussed right now. It looks like cutting this patch in two should be
doable easily.
Attached is the rebased patch, split into three parts:
1) basic decoding infrastructure (decoding, reorderbuffer etc.)
2) support for sequences in test_decoding
3) support for sequences in built-in replication (catalogs, syntax, ...)
The last part is the largest one - I'm sure we'll have discussions about
the grammar, adding sequences automatically, etc. But as you said, let's
focus on the first part, which deals with the required decoding stuff.
I've added a couple comments, explaining how we track sequences, why we
need the XID in nextval() etc. I've also added streaming support.
I looked through the test cases in test_decoding again. It all looks
pretty sensible. If anyone can think of any other tricky or dubious
cases, we can add them there. It's easiest to discuss these things with
concrete test cases rather than in theory.
One slightly curious issue is that this can make sequence values go
backwards, when seen by the logical decoding consumer, like in the test
case:
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value:
1, log_cnt: 0 is_called: 0
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value:
33, log_cnt: 0 is_called: 1
+ BEGIN
+ sequence: public.test_sequence transactional: 1 created: 1 last_value:
4, log_cnt: 0 is_called: 1
+ COMMIT
+ sequence: public.test_sequence transactional: 0 created: 0 last_value:
334, log_cnt: 0 is_called: 1
I suppose that's okay, since it's not really the intention that someone
is concurrently consuming sequence values on the subscriber. Maybe
something for the future. Fixing that would require changing the way
transactional sequence DDL updates these values, so it's not directly
the job of the decoding to address this.
Yeah, that's due to how ALTER SEQUENCE does things, and I agree redoing
that seems well out of scope for this patch. What seems a bit suspicious
is that some of the ALTER SEQUENCE changes have "created: 1" - it's
probably correct, though, because those ALTER SEQUENCE statements can be
rolled-back, so we see it as if a new sequence is created. The flag name
might be a bit confusing, though.
A small thing I found: Maybe the text that test_decoding produces for
sequences can be made to look more consistent with the one for tables.
For example, in
+ BEGIN
+ sequence: public.test_table_a_seq transactional: 1 created: 1
last_value: 1, log_cnt: 0 is_called: 0
+ sequence: public.test_table_a_seq transactional: 1 created: 0
last_value: 33, log_cnt: 0 is_called: 1
+ table public.test_table: INSERT: a[integer]:1 b[integer]:100
+ table public.test_table: INSERT: a[integer]:2 b[integer]:200
+ COMMIT
note how the punctuation is different.
I did tweak this a bit, hopefully it's more consistent.
regards
--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
>From 9b21a107dee280333e1f849134c22b2f4038d5b9 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Fri, 24 Sep 2021 00:41:33 +0200
Subject: [PATCH 1/3] Logical decoding of sequences
This adds the infrastructure for logical decoding of sequences. Support
for built-in logical replication and test_decoding is added separately.
When decoding sequences, we differentiate between a sequences created in
a (running) transaction, and sequences created in other (already
committed) transactions. Changes for sequences created in the same
top-level transaction are treated as "transactional" i.e. just like any
other change from that transaction (and discarded in case of a
rollback). Changes for sequences created earlier are treated as not
transactional - are processed immediately, as if performed outside any
transaction (and thus not rolled back).
This mixed behavior is necessary - sequences are non-transactional (e.g.
ROLLBACK does not undo the sequence increments). But for new sequences,
we need to handle them in a transactional way, because if we ever get
some DDL support, the sequence won't exist until the transaction gets
applied. So we need to ensure the increments don't happen until the
sequence gets created.
To differentiate which sequences are "old" and which were created in a
still-running transaction, we track sequences created in running
transactions in a hash table. Each sequence is identified by it's
relfilenode, and at transaction commit we discard all sequences created
in that particular transaction. For each sequence we track the XID of
the (sub)transaction that created it, and we cleanup the sequences for
each subtransaction when it completes (commit/rollback).
We don't use the XID to check if it's the same top-level transaction.
It's enough to know it was created in an in-progress transaction, and we
know it must be the current one because otherwise it wouldn't see the
sequence object.
The main changes in this patch are:
1) ensure WAL-logging of all necessary info for sequence advances
We need to be able to associate the advance with a XID, but until now
sequence advance might have XID 0 if it was the first thing the
transaction did. So we ensure the transaction has XID.
Note: This is needed because of subxacts. A XID 0 might still have the
sequence created in a different subxact of the same top-level xact.
Then there's a "created" flag added to the xl_seq_rec, which is
necessary to differentiate WAL for created/reset sequences.
2) decoding / queueing of sequences into ReorderBuffer
This is mostly copy-paste of existing code for other decoded events.
3) tracking sequences created in in-progress transactions
We use a simple hash table, indexed by relfilenode.
---
src/backend/commands/sequence.c | 83 +++-
src/backend/replication/logical/decode.c | 131 +++++-
src/backend/replication/logical/logical.c | 89 ++++
.../replication/logical/reorderbuffer.c | 424 ++++++++++++++++++
src/include/commands/sequence.h | 1 +
src/include/replication/output_plugin.h | 29 ++
src/include/replication/reorderbuffer.h | 44 +-
7 files changed, 793 insertions(+), 8 deletions(-)
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 72bfdc07a4..a98fcc2e97 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -94,7 +94,7 @@ static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */
*/
static SeqTableData *last_used_seq = NULL;
-static void fill_seq_with_data(Relation rel, HeapTuple tuple);
+static void fill_seq_with_data(Relation rel, HeapTuple tuple, bool create);
static Relation lock_and_open_sequence(SeqTable seq);
static void create_seq_hashtable(void);
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
@@ -222,7 +222,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
/* now initialize the sequence's data */
tuple = heap_form_tuple(tupDesc, value, null);
- fill_seq_with_data(rel, tuple);
+ fill_seq_with_data(rel, tuple, true);
/* process OWNED BY if given */
if (owned_by)
@@ -327,7 +327,7 @@ ResetSequence(Oid seq_relid)
/*
* Insert the modified tuple into the new storage file.
*/
- fill_seq_with_data(seq_rel, tuple);
+ fill_seq_with_data(seq_rel, tuple, true);
/* Clear local cache so that we don't think we have cached numbers */
/* Note that we do not change the currval() state */
@@ -340,7 +340,7 @@ ResetSequence(Oid seq_relid)
* Initialize a sequence's relation with the specified tuple as content
*/
static void
-fill_seq_with_data(Relation rel, HeapTuple tuple)
+fill_seq_with_data(Relation rel, HeapTuple tuple, bool create)
{
Buffer buf;
Page page;
@@ -378,8 +378,31 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
/* check the comment above nextval_internal()'s equivalent call. */
if (RelationNeedsWAL(rel))
+ {
GetTopTransactionId();
+ /*
+ * Ensure we have a proper XID, which will be included in the XLOG
+ * record by XLogRecordAssemble. Otherwise the first nextval() in
+ * a subxact (without any preceding changes) would get XID 0,
+ * and it'd be impossible to decide which top xact it belongs to.
+ * It'd also trigger assert in DecodeSequence.
+ *
+ * XXX This might seem unnecessary, because if there's no XID the
+ * transaction couldn't have done anything important yet, e.g. it
+ * could not have created a sequence. But that's incorrect, as it
+ * ignores subtransactions. The current subtransaction might not
+ * have done anything yet (thus no XID), but an earlier one might
+ * have created the sequence.
+ *
+ * XXX Not sure if this is the best solution. Maybe do this only
+ * with wal_level=logical to minimize the overhead. OTOH advancing
+ * the sequence is likely followed by using the value(s) for some
+ * other activity, which assigns the XID.
+ */
+ GetCurrentTransactionId();
+ }
+
START_CRIT_SECTION();
MarkBufferDirty(buf);
@@ -399,6 +422,7 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
xlrec.node = rel->rd_node;
+ xlrec.created = create;
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) tuple->t_data, tuple->t_len);
@@ -502,7 +526,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
/*
* Insert the modified tuple into the new storage file.
*/
- fill_seq_with_data(seqrel, newdatatuple);
+ fill_seq_with_data(seqrel, newdatatuple, true);
}
/* process OWNED BY if given */
@@ -766,8 +790,31 @@ nextval_internal(Oid relid, bool check_permissions)
* (Have to do that here, so we're outside the critical section)
*/
if (logit && RelationNeedsWAL(seqrel))
+ {
GetTopTransactionId();
+ /*
+ * Ensure we have a proper XID, which will be included in the XLOG
+ * record by XLogRecordAssemble. Otherwise the first nextval() in
+ * a subxact (without any preceding changes) would get XID 0,
+ * and it'd be impossible to decide which top xact it belongs to.
+ * It'd also trigger assert in DecodeSequence.
+ *
+ * XXX This might seem unnecessary, because if there's no XID the
+ * transaction couldn't have done anything important yet, e.g. it
+ * could not have created a sequence. But that's incorrect, as it
+ * ignores subtransactions. The current subtransaction might not
+ * have done anything yet (thus no XID), but an earlier one might
+ * have created the sequence.
+ *
+ * XXX Not sure if this is the best solution. Maybe do this only
+ * with wal_level=logical to minimize the overhead. OTOH advancing
+ * the sequence is likely followed by using the value(s) for some
+ * other activity, which assigns the XID.
+ */
+ GetCurrentTransactionId();
+ }
+
/* ready to change the on-disk (or really, in-buffer) tuple */
START_CRIT_SECTION();
@@ -803,6 +850,7 @@ nextval_internal(Oid relid, bool check_permissions)
seq->log_cnt = 0;
xlrec.node = seqrel->rd_node;
+ xlrec.created = false;
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -977,8 +1025,31 @@ do_setval(Oid relid, int64 next, bool iscalled)
/* check the comment above nextval_internal()'s equivalent call. */
if (RelationNeedsWAL(seqrel))
+ {
GetTopTransactionId();
+ /*
+ * Ensure we have a proper XID, which will be included in the XLOG
+ * record by XLogRecordAssemble. Otherwise the first nextval() in
+ * a subxact (without any preceding changes) would get XID 0,
+ * and it'd be impossible to decide which top xact it belongs to.
+ * It'd also trigger assert in DecodeSequence.
+ *
+ * XXX This might seem unnecessary, because if there's no XID the
+ * transaction couldn't have done anything important yet, e.g. it
+ * could not have created a sequence. But that's incorrect, as it
+ * ignores subtransactions. The current subtransaction might not
+ * have done anything yet (thus no XID), but an earlier one might
+ * have created the sequence.
+ *
+ * XXX Not sure if this is the best solution. Maybe do this only
+ * with wal_level=logical to minimize the overhead. OTOH advancing
+ * the sequence is likely followed by using the value(s) for some
+ * other activity, which assigns the XID.
+ */
+ GetCurrentTransactionId();
+ }
+
/* ready to change the on-disk (or really, in-buffer) tuple */
START_CRIT_SECTION();
@@ -999,6 +1070,8 @@ do_setval(Oid relid, int64 next, bool iscalled)
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
xlrec.node = seqrel->rd_node;
+ xlrec.created = false;
+
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 2874dc0612..98d0edefb0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -42,6 +42,7 @@
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "storage/standby.h"
+#include "commands/sequence.h"
typedef struct XLogRecordBuffer
{
@@ -74,10 +75,11 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
bool two_phase);
static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_prepare *parsed);
-
+static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
+static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
/* helper functions for decoding transactions */
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -158,6 +160,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
DecodeLogicalMsgOp(ctx, &buf);
break;
+ case RM_SEQ_ID:
+ DecodeSequence(ctx, &buf);
+ break;
+
/*
* Rmgrs irrelevant for logical decoding; they describe stuff not
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -173,7 +179,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
case RM_HASH_ID:
case RM_GIN_ID:
case RM_GIST_ID:
- case RM_SEQ_ID:
case RM_SPGIST_ID:
case RM_BRIN_ID:
case RM_COMMIT_TS_ID:
@@ -1312,3 +1317,125 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
ctx->fast_forward || FilterByOrigin(ctx, origin_id));
}
+
+/*
+ * Decode Sequence Tuple
+ */
+static void
+DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
+{
+ int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
+
+ Assert(datalen >= 0);
+
+ tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;;
+
+ ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+ tuple->tuple.t_tableOid = InvalidOid;
+
+ memcpy(((char *) tuple->tuple.t_data),
+ data + sizeof(xl_seq_rec),
+ SizeofHeapTupleHeader);
+
+ memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
+ data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
+ datalen);
+}
+
+/*
+ * Handle sequence decode
+ *
+ * Decoding sequences is a bit tricky, because while most sequence actions
+ * are non-transactional (not subject to rollback), some need to be handled
+ * as transactional.
+ *
+ * By default, a sequence increment is non-transactional - we must not queue
+ * it in a transaction as other changes, because the transaction might get
+ * rolled back and we'd discard the increment. The downstream would not be
+ * notified about the increment, which is wrong.
+ *
+ * On the other hand, the sequence may be created in a transaction. In this
+ * case we *should* queue the change as other changes in the transaction,
+ * because we don't want to send the increments for unknown sequence to the
+ * plugin - it might get confused about which sequence it's related to etc.
+ */
+static void
+DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBufferTupleBuf *tuplebuf;
+ RelFileNode target_node;
+ XLogReaderState *r = buf->record;
+ char *tupledata = NULL;
+ Size tuplelen;
+ Size datalen = 0;
+ TransactionId xid = XLogRecGetXid(r);
+ uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+ xl_seq_rec *xlrec;
+ Snapshot snapshot;
+ RepOriginId origin_id = XLogRecGetOrigin(r);
+ bool transactional;
+
+ /* only decode changes flagged with XLOG_SEQ_LOG */
+ if (info != XLOG_SEQ_LOG)
+ elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
+
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding messages.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
+ return;
+
+ /* only interested in our database */
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ tupledata = XLogRecGetData(r);
+ datalen = XLogRecGetDataLen(r);
+ tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
+
+ /* extract the WAL record, with "created" flag */
+ xlrec = (xl_seq_rec *) XLogRecGetData(r);
+
+ /* XXX how could we have sequence change without data? */
+ if(!datalen || !tupledata)
+ return;
+
+ tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+ DecodeSeqTuple(tupledata, datalen, tuplebuf);
+
+ /*
+ * Should we handle the sequence increment as transactional or not?
+ *
+ * If the sequence was created in a still-running transaction, treat
+ * it as transactional and queue the increments. Otherwise it needs
+ * to be treated as non-transactional, in which case we send it to
+ * the plugin right away.
+ */
+ transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
+ target_node,
+ xlrec->created);
+
+ /* Skip the change if already processed (per the snapshot). */
+ if (transactional &&
+ !SnapBuildProcessChange(builder, xid, buf->origptr))
+ return;
+ else if (!transactional &&
+ (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+ SnapBuildXactNeedsSkip(builder, buf->origptr)))
+ return;
+
+ /* Queue the increment (or send immediately if not transactional). */
+ snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+ ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
+ origin_id, target_node, transactional,
+ xlrec->created, tuplebuf);
+}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index aae0ae5b8a..6e7a2b7144 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called);
/* streaming callbacks */
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -90,6 +94,10 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn
static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called);
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change);
@@ -217,6 +225,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->apply_truncate = truncate_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
+ ctx->reorder->sequence = sequence_cb_wrapper;
/*
* To support streaming, we require start/stop/abort/commit/change
@@ -233,6 +242,7 @@ StartupDecodingContext(List *output_plugin_options,
(ctx->callbacks.stream_commit_cb != NULL) ||
(ctx->callbacks.stream_change_cb != NULL) ||
(ctx->callbacks.stream_message_cb != NULL) ||
+ (ctx->callbacks.stream_sequence_cb != NULL) ||
(ctx->callbacks.stream_truncate_cb != NULL);
/*
@@ -250,6 +260,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->stream_commit = stream_commit_cb_wrapper;
ctx->reorder->stream_change = stream_change_cb_wrapper;
ctx->reorder->stream_message = stream_message_cb_wrapper;
+ ctx->reorder->stream_sequence = stream_sequence_cb_wrapper;
ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
@@ -1202,6 +1213,43 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ if (ctx->callbacks.sequence_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "sequence";
+ state.report_location = sequence_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = sequence_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+ created, last_value, log_cnt, is_called);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr first_lsn)
@@ -1507,6 +1555,47 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* this callback is optional */
+ if (ctx->callbacks.stream_sequence_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_sequence";
+ state.report_location = sequence_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = sequence_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+ created, last_value, log_cnt, is_called);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[],
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 46e66608cf..434926459f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -77,6 +77,35 @@
* a bit more memory to the oldest subtransactions, because it's likely
* they are the source for the next sequence of changes.
*
+ * When decoding sequences, we differentiate between a sequences created
+ * in a (running) transaction, and sequences created in other (already
+ * committed) transactions. Changes for sequences created in the same
+ * top-level transaction are treated as "transactional" i.e. just like
+ * any other change from that transaction (and discarded in case of a
+ * rollback). Changes for sequences created earlier are treated as not
+ * transactional - are processed immediately, as if performed outside
+ * any transaction (and thus not rolled back).
+ *
+ * This mixed behavior is necessary - sequences are non-transactional
+ * (e.g. ROLLBACK does not undo the sequence increments). But for new
+ * sequences, we need to handle them in a transactional way, because if
+ * we ever get some DDL support, the sequence won't exist until the
+ * transaction gets applied. So we need to ensure the increments don't
+ * happen until the sequence gets created.
+ *
+ * To differentiate which sequences are "old" and which were created
+ * in a still-running transaction, we track sequences created in running
+ * transactions in a hash table. Each sequence is identified by it's
+ * relfilenode, and at transaction commit we discard all sequences
+ * created in that particular transaction. For each sequence we track
+ * the XID of the (sub)transaction that created it, and we cleanup the
+ * sequences for each subtransaction when it completes (commit/rollback).
+ *
+ * We don't use the XID to check if it's the same top-level transaction.
+ * It's enough to know it was created in an in-progress transaction,
+ * and we know it must be the current one because otherwise it wouldn't
+ * see the sequence object.
+ *
* -------------------------------------------------------------------------
*/
#include "postgres.h"
@@ -116,6 +145,13 @@ typedef struct ReorderBufferTXNByIdEnt
ReorderBufferTXN *txn;
} ReorderBufferTXNByIdEnt;
+/* entry for hash table we use to track sequences created in running xacts */
+typedef struct ReorderBufferSequenceEnt
+{
+ RelFileNode rnode;
+ TransactionId xid;
+} ReorderBufferSequenceEnt;
+
/* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
typedef struct ReorderBufferTupleCidKey
{
@@ -339,6 +375,14 @@ ReorderBufferAllocate(void)
buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ /* hash table of sequences, mapping relfilenode to XID of transaction */
+ hash_ctl.keysize = sizeof(RelFileNode);
+ hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
+ hash_ctl.hcxt = buffer->context;
+
+ buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
buffer->by_txn_last_xid = InvalidTransactionId;
buffer->by_txn_last_txn = NULL;
@@ -525,6 +569,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
change->data.truncate.relids = NULL;
}
break;
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ if (change->data.sequence.tuple)
+ {
+ ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
+ change->data.sequence.tuple = NULL;
+ }
+ break;
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -859,6 +910,242 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
}
}
+/*
+ * Treat the sequence increment as transactional?
+ *
+ * The hash table tracks all sequences created in in-progress transactions,
+ * so we simply do a lookup (the sequence is identified by relfilende). If
+ * we find a match, the increment should be handled as transactional.
+ */
+bool
+ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+ RelFileNode rnode, bool created)
+{
+ bool found = false;
+
+ if (created)
+ return true;
+
+ hash_search(rb->sequences,
+ (void *) &rnode,
+ HASH_FIND,
+ &found);
+
+ return found;
+}
+
+/*
+ * Cleanup sequences created in in-progress transactions.
+ *
+ * There's no way to search by XID, so we simply do a seqscan of all
+ * the entries in the hash table. Hopefully there are only a couple
+ * entries in most cases - people generally don't create many new
+ * sequences over and over.
+ */
+static void
+ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
+{
+ HASH_SEQ_STATUS scan_status;
+ ReorderBufferSequenceEnt *ent;
+
+ hash_seq_init(&scan_status, rb->sequences);
+ while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
+ {
+ /* skip sequences not from this transaction */
+ if (ent->xid != xid)
+ continue;
+
+ (void) hash_search(rb->sequences,
+ (void *) &(ent->rnode),
+ HASH_REMOVE, NULL);
+ }
+}
+
+/*
+ * A transactional sequence increment is queued to be processed upon commit
+ * and a non-transactional increment gets processed immediately.
+ *
+ * A sequence update may be both transactional and non-transactional. When
+ * created in a running transaction, treat it as transactional and queue
+ * the change in it. Otherwise treat it as non-transactional, so that we
+ * don't forget the increment in case of a rollback.
+ */
+void
+ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+ Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+ RelFileNode rnode, bool transactional, bool created,
+ ReorderBufferTupleBuf *tuplebuf)
+{
+ /*
+ * Change needs to be handled as transactional, because the sequence was
+ * created in a transaction that is still running. In that case all the
+ * changes need to be queued in that transaction, we must not send them
+ * to the downstream until the transaction commits.
+ *
+ * There's a bit of a trouble with subtransactions - we can't queue it
+ * into the subxact, because it might be rolled back and we'd lose the
+ * increment. We need to queue it into the same (sub)xact that created
+ * the sequence, which is why we track the XID in the hash table.
+ */
+ if (transactional)
+ {
+ MemoryContext oldcontext;
+ ReorderBufferChange *change;
+
+ /* lookup sequence by relfilenode */
+ ReorderBufferSequenceEnt *ent;
+ bool found;
+
+ /* transactional changes require a transaction */
+ Assert(xid != InvalidTransactionId);
+
+ /* search the lookup table (we ignore the return value, found is enough) */
+ ent = hash_search(rb->sequences,
+ (void *) &rnode,
+ created ? HASH_ENTER : HASH_FIND,
+ &found);
+
+ /*
+ * If this is the "create" increment, we must not have found any
+ * pre-existing entry in the hash table (i.e. there must not be
+ * any conflicting sequence).
+ */
+ Assert(!(created && found));
+
+ /* But we must have either created or found an existing entry. */
+ Assert(created || found);
+
+ /*
+ * When creating the sequence, remember the XID of the transaction
+ * that created id.
+ */
+ if (created)
+ ent->xid = xid;
+
+ /* XXX Maybe check that we're still in the same top-level xact? */
+
+ /* OK, allocate and queue the change */
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ change = ReorderBufferGetChange(rb);
+
+ change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
+ change->origin_id = origin_id;
+
+ memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
+
+ change->data.sequence.created = created;
+ change->data.sequence.tuple = tuplebuf;
+
+ /* add it to the same subxact that created the sequence */
+ ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ /*
+ * This increment is for a sequence that was not created in any
+ * running transaction, so we treat it as non-transactional and
+ * just send it to the output plugin directly.
+ */
+ ReorderBufferTXN *txn = NULL;
+ volatile Snapshot snapshot_now = snapshot;
+ bool using_subtxn;
+
+#ifdef USE_ASSERT_CHECKING
+ /* All "creates" have to be handled as transactional. */
+ Assert(!created);
+
+ /* Make sure the sequence is not in the hash table. */
+ {
+ bool found;
+ hash_search(rb->sequences,
+ (void *) &rnode,
+ HASH_FIND, &found);
+ Assert(!found);
+ }
+#endif
+
+ if (xid != InvalidTransactionId)
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ /* setup snapshot to allow catalog access */
+ SetupHistoricSnapshot(snapshot_now, NULL);
+
+ /*
+ * Decoding needs access to syscaches et al., which in turn use
+ * heavyweight locks and such. Thus we need to have enough state around to
+ * keep track of those. The easiest way is to simply use a transaction
+ * internally. That also allows us to easily enforce that nothing writes
+ * to the database by checking for xid assignments.
+ *
+ * When we're called via the SQL SRF there's already a transaction
+ * started, so start an explicit subtransaction there.
+ */
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ PG_TRY();
+ {
+ Relation relation;
+ HeapTuple tuple;
+ bool isnull;
+ int64 last_value, log_cnt;
+ bool is_called;
+ Oid reloid;
+
+ if (using_subtxn)
+ BeginInternalSubTransaction("sequence");
+ else
+ StartTransactionCommand();
+
+ reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
+
+ if (reloid == InvalidOid)
+ elog(ERROR, "could not map filenode \"%s\" to relation OID",
+ relpathperm(rnode,
+ MAIN_FORKNUM));
+
+ relation = RelationIdGetRelation(reloid);
+ tuple = &tuplebuf->tuple;
+
+ /*
+ * Extract the internal sequence values, describing the state.
+ *
+ * XXX Seems a bit strange to access it directly. Maybe there's
+ * a better / more correct way?
+ */
+ last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull);
+ log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
+ is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
+
+ rb->sequence(rb, txn, lsn, relation, transactional, created,
+ last_value, log_cnt, is_called);
+
+ RelationClose(relation);
+
+ TeardownHistoricSnapshot(false);
+
+ AbortCurrentTransaction();
+
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+ }
+ PG_CATCH();
+ {
+ TeardownHistoricSnapshot(true);
+
+ AbortCurrentTransaction();
+
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+}
+
/*
* AssertTXNLsnOrder
* Verify LSN ordering of transaction lists in the reorderbuffer
@@ -1535,6 +1822,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
&found);
Assert(found);
+ /* Remove sequences created in this transaction (if any). */
+ ReorderBufferSequenceCleanup(rb, txn->xid);
+
/* remove entries spilled to disk */
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
@@ -1950,6 +2240,42 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
change->data.msg.message);
}
+/*
+ * Helper function for ReorderBufferProcessTXN for applying sequences.
+ */
+static inline void
+ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change,
+ bool streaming)
+{
+ HeapTuple tuple;
+ bool isnull;
+ int64 last_value, log_cnt;
+ bool is_called;
+
+ tuple = &change->data.sequence.tuple->tuple;
+
+ /*
+ * Extract the internal sequence values, describing the state.
+ *
+ * XXX Seems a bit strange to access it directly. Maybe there's
+ * a better / more correct way?
+ */
+ last_value = heap_getattr(tuple, 1, RelationGetDescr(relation), &isnull);
+ log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
+ is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
+
+ /* Only ever called from ReorderBufferApplySequence, so transational. */
+ if (streaming)
+ rb->stream_sequence(rb, txn, change->lsn, relation, true,
+ change->data.sequence.created,
+ last_value, log_cnt, is_called);
+ else
+ rb->sequence(rb, txn, change->lsn, relation, true,
+ change->data.sequence.created,
+ last_value, log_cnt, is_called);
+}
+
/*
* Function to store the command id and snapshot at the end of the current
* stream so that we can reuse the same while sending the next stream.
@@ -2392,6 +2718,31 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
elog(ERROR, "tuplecid value in changequeue");
break;
+
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ Assert(snapshot_now);
+
+ reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
+ change->data.sequence.relnode.relNode);
+
+ if (reloid == InvalidOid)
+ elog(ERROR, "could not map filenode \"%s\" to relation OID",
+ relpathperm(change->data.sequence.relnode,
+ MAIN_FORKNUM));
+
+ relation = RelationIdGetRelation(reloid);
+
+ if (!RelationIsValid(relation))
+ elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+ reloid,
+ relpathperm(change->data.sequence.relnode,
+ MAIN_FORKNUM));
+
+ if (RelationIsLogicallyLogged(relation))
+ ReorderBufferApplySequence(rb, txn, relation, change, streaming);
+
+ RelationClose(relation);
+ break;
}
}
@@ -3776,6 +4127,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
memcpy(data, change->data.truncate.relids, size);
data += size;
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ {
+ char *data;
+ ReorderBufferTupleBuf *tup;
+ Size len = 0;
+
+ tup = change->data.sequence.tuple;
+
+ if (tup)
+ {
+ sz += sizeof(HeapTupleData);
+ len = tup->tuple.t_len;
+ sz += len;
+ }
+
+ /* make sure we have enough space */
+ ReorderBufferSerializeReserve(rb, sz);
+
+ data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+ /* might have been reallocated above */
+ ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+ if (len)
+ {
+ memcpy(data, &tup->tuple, sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ memcpy(data, tup->tuple.t_data, len);
+ data += len;
+ }
+
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4040,6 +4424,22 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
{
sz += sizeof(Oid) * change->data.truncate.nrelids;
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ {
+ ReorderBufferTupleBuf *tup;
+ Size len = 0;
+
+ tup = change->data.sequence.tuple;
+
+ if (tup)
+ {
+ sz += sizeof(HeapTupleData);
+ len = tup->tuple.t_len;
+ sz += len;
+ }
+
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4341,6 +4741,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
+
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ if (change->data.sequence.tuple)
+ {
+ uint32 tuplelen = ((HeapTuple) data)->t_len;
+
+ change->data.sequence.tuple =
+ ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
+
+ /* restore ->tuple */
+ memcpy(&change->data.sequence.tuple->tuple, data,
+ sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ /* reset t_data pointer into the new tuplebuf */
+ change->data.sequence.tuple->tuple.t_data =
+ ReorderBufferTupleBufData(change->data.sequence.tuple);
+
+ /* restore tuple data itself */
+ memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
+ data += tuplelen;
+ }
+ break;
+
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 40544dd4c7..5919fb90ee 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -48,6 +48,7 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data;
typedef struct xl_seq_rec
{
RelFileNode node;
+ bool created; /* is this a CREATE SEQUENCE */
/* SEQUENCE TUPLE DATA FOLLOWS AT THE END */
} xl_seq_rec;
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 810495ed0e..57bc13f11c 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -88,6 +88,19 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
Size message_size,
const char *message);
+/*
+ * Called for the generic logical decoding sequences.
+ */
+typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional,
+ bool created,
+ int64 last_value,
+ int64 log_cnt,
+ bool is_called);
+
/*
* Filter changes by origin.
*/
@@ -199,6 +212,20 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
Size message_size,
const char *message);
+/*
+ * Called for the streaming generic logical decoding sequences from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional,
+ bool created,
+ int64 last_value,
+ int64 log_cnt,
+ bool is_called);
+
/*
* Callback for streaming truncates from in-progress transactions.
*/
@@ -219,6 +246,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
+ LogicalDecodeSequenceCB sequence_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
@@ -237,6 +265,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamSequenceCB stream_sequence_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 5b40ff75f7..63e6ed037b 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -64,7 +64,8 @@ enum ReorderBufferChangeType
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
- REORDER_BUFFER_CHANGE_TRUNCATE
+ REORDER_BUFFER_CHANGE_TRUNCATE,
+ REORDER_BUFFER_CHANGE_SEQUENCE
};
/* forward declaration */
@@ -158,6 +159,14 @@ typedef struct ReorderBufferChange
uint32 ninvalidations; /* Number of messages */
SharedInvalidationMessage *invalidations; /* invalidation message */
} inval;
+
+ /* Context data for Sequence changes */
+ struct
+ {
+ RelFileNode relnode;
+ bool created;
+ ReorderBufferTupleBuf *tuple;
+ } sequence;
} data;
/*
@@ -430,6 +439,15 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
const char *prefix, Size sz,
const char *message);
+/* sequence callback signature */
+typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt,
+ bool is_called);
+
/* begin prepare callback signature */
typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn);
@@ -496,6 +514,15 @@ typedef void (*ReorderBufferStreamMessageCB) (
const char *prefix, Size sz,
const char *message);
+/* stream sequence callback signature */
+typedef void (*ReorderBufferStreamSequenceCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt,
+ bool is_called);
+
/* stream truncate callback signature */
typedef void (*ReorderBufferStreamTruncateCB) (
ReorderBuffer *rb,
@@ -511,6 +538,12 @@ struct ReorderBuffer
*/
HTAB *by_txn;
+ /*
+ * relfilenode => XID lookup table for sequences created in a transaction
+ * (also includes altered sequences, which assigns new relfilenode)
+ */
+ HTAB *sequences;
+
/*
* Transactions that could be a toplevel xact, ordered by LSN of the first
* record bearing that xid.
@@ -541,6 +574,7 @@ struct ReorderBuffer
ReorderBufferApplyTruncateCB apply_truncate;
ReorderBufferCommitCB commit;
ReorderBufferMessageCB message;
+ ReorderBufferSequenceCB sequence;
/*
* Callbacks to be called when streaming a transaction at prepare time.
@@ -560,6 +594,7 @@ struct ReorderBuffer
ReorderBufferStreamCommitCB stream_commit;
ReorderBufferStreamChangeCB stream_change;
ReorderBufferStreamMessageCB stream_message;
+ ReorderBufferStreamSequenceCB stream_sequence;
ReorderBufferStreamTruncateCB stream_truncate;
/*
@@ -635,6 +670,10 @@ void ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size message_size, const char *message);
+void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+ Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+ RelFileNode rnode, bool transactional, bool created,
+ ReorderBufferTupleBuf *tuplebuf);
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
@@ -682,4 +721,7 @@ void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
void StartupReorderBuffer(void);
+bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+ RelFileNode rnode, bool created);
+
#endif
--
2.31.1
>From e8f96eafc45a91b6d21aa1c46b6eaed547d7a89d Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Fri, 24 Sep 2021 00:42:04 +0200
Subject: [PATCH 2/3] Add support for decoding sequences to test_decoding
---
contrib/test_decoding/Makefile | 3 +-
contrib/test_decoding/expected/sequence.out | 327 ++++++++++++++++++++
contrib/test_decoding/sql/sequence.sql | 119 +++++++
contrib/test_decoding/test_decoding.c | 69 +++++
4 files changed, 517 insertions(+), 1 deletion(-)
create mode 100644 contrib/test_decoding/expected/sequence.out
create mode 100644 contrib/test_decoding/sql/sequence.sql
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 9a31e0b879..56ddc3abae 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
- spill slot truncate stream stats twophase twophase_stream
+ spill slot truncate stream stats twophase twophase_stream \
+ sequence
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot
diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out
new file mode 100644
index 0000000000..beecc3e0c8
--- /dev/null
+++ b/contrib/test_decoding/expected/sequence.out
@@ -0,0 +1,327 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE SEQUENCE test_sequence;
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 4
+(1 row)
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 14
+(1 row)
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 4000
+(1 row)
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, true);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3510
+(1 row)
+
+SELECT setval('test_sequence', 3500, false);
+ setval
+--------
+ 3500
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3500
+(1 row)
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+ data
+------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:1 log_cnt:0 is_called:0
+ COMMIT
+ sequence public.test_sequence: transactional:0 created:0 last_value:33 log_cnt:0 is_called:1
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:4 log_cnt:0 is_called:1
+ COMMIT
+ sequence public.test_sequence: transactional:0 created:0 last_value:334 log_cnt:0 is_called:1
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:14 log_cnt:32 is_called:1
+ COMMIT
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:14 log_cnt:0 is_called:1
+ COMMIT
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:4000 log_cnt:0 is_called:0
+ COMMIT
+ sequence public.test_sequence: transactional:0 created:0 last_value:4320 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3500 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3830 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3500 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3830 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:0 created:0 last_value:3500 log_cnt:0 is_called:0
+ sequence public.test_sequence: transactional:0 created:0 last_value:3820 log_cnt:0 is_called:1
+(24 rows)
+
+DROP SEQUENCE test_sequence;
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 2
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3001
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3002
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3003
+(1 row)
+
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 6001
+(1 row)
+
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data
+------
+(0 rows)
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+ data
+------
+(0 rows)
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 3003 | 30 | t
+(1 row)
+
+SELECT nextval('test_table_a_seq');
+ nextval
+---------
+ 3004
+(1 row)
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+ data
+---------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence public.test_table_a_seq: transactional:1 created:1 last_value:1 log_cnt:0 is_called:0
+ COMMIT
+ sequence public.test_table_a_seq: transactional:0 created:0 last_value:33 log_cnt:0 is_called:1
+ sequence public.test_table_a_seq: transactional:0 created:0 last_value:3000 log_cnt:0 is_called:1
+ sequence public.test_table_a_seq: transactional:0 created:0 last_value:3033 log_cnt:0 is_called:1
+ BEGIN
+ COMMIT
+(8 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+ data
+-------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence public.test_table_a_seq: transactional:1 created:1 last_value:1 log_cnt:0 is_called:0
+ sequence public.test_table_a_seq: transactional:1 created:0 last_value:33 log_cnt:0 is_called:1
+ table public.test_table: INSERT: a[integer]:1 b[integer]:100
+ table public.test_table: INSERT: a[integer]:2 b[integer]:200
+ COMMIT
+(6 rows)
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 1
+(1 row)
+
+SELECT setval('test_sequence', 3000);
+ setval
+--------
+ 3000
+(1 row)
+
+SELECT nextval('test_sequence');
+ nextval
+---------
+ 3001
+(1 row)
+
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ setval
+--------
+ 5000
+(1 row)
+
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 3001 | 32 | t
+(1 row)
+
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+ data
+------------------------------------------------------------------------------------------------
+ BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:1 log_cnt:0 is_called:0
+ sequence public.test_sequence: transactional:1 created:0 last_value:33 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:1 created:0 last_value:3000 log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:1 created:0 last_value:3033 log_cnt:0 is_called:1
+ COMMIT
+(6 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql
new file mode 100644
index 0000000000..d8a34738f3
--- /dev/null
+++ b/contrib/test_decoding/sql/sequence.sql
@@ -0,0 +1,119 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE SEQUENCE test_sequence;
+
+-- test the sequence changes by several nextval() calls
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several ALTER commands
+ALTER SEQUENCE test_sequence INCREMENT BY 10;
+SELECT nextval('test_sequence');
+
+ALTER SEQUENCE test_sequence START WITH 3000;
+ALTER SEQUENCE test_sequence MAXVALUE 10000;
+ALTER SEQUENCE test_sequence RESTART WITH 4000;
+SELECT nextval('test_sequence');
+
+-- test the sequence changes by several setval() calls
+SELECT setval('test_sequence', 3500);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, true);
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3500, false);
+SELECT nextval('test_sequence');
+
+-- show results and drop sequence
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+DROP SEQUENCE test_sequence;
+
+-- rollback on sequence creation and update
+BEGIN;
+CREATE SEQUENCE test_sequence;
+CREATE TABLE test_table (a INT);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+SELECT nextval('test_sequence');
+ALTER SEQUENCE test_sequence RESTART WITH 6000;
+INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) );
+SELECT nextval('test_sequence');
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table creation with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
+
+-- rollback on table with serial column
+CREATE TABLE test_table (a SERIAL, b INT);
+
+BEGIN;
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+INSERT INTO test_table (b) VALUES (300);
+SELECT setval('test_table_a_seq', 3000);
+INSERT INTO test_table (b) VALUES (400);
+INSERT INTO test_table (b) VALUES (500);
+INSERT INTO test_table (b) VALUES (600);
+ALTER SEQUENCE test_table_a_seq RESTART WITH 6000;
+INSERT INTO test_table (b) VALUES (700);
+INSERT INTO test_table (b) VALUES (800);
+INSERT INTO test_table (b) VALUES (900);
+ROLLBACK;
+
+-- check table and sequence values after rollback
+SELECT * from test_table_a_seq;
+SELECT nextval('test_table_a_seq');
+
+DROP TABLE test_table;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE TABLE test_table (a SERIAL, b INT);
+INSERT INTO test_table (b) VALUES (100);
+INSERT INTO test_table (b) VALUES (200);
+SAVEPOINT a;
+INSERT INTO test_table (b) VALUES (300);
+ROLLBACK TO SAVEPOINT a;
+DROP TABLE test_table;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+-- savepoint test on table with serial column
+BEGIN;
+CREATE SEQUENCE test_sequence;
+SELECT nextval('test_sequence');
+SELECT setval('test_sequence', 3000);
+SELECT nextval('test_sequence');
+SAVEPOINT a;
+ALTER SEQUENCE test_sequence START WITH 7000;
+SELECT setval('test_sequence', 5000);
+ROLLBACK TO SAVEPOINT a;
+SELECT * FROM test_sequence;
+DROP SEQUENCE test_sequence;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index e5cd84e85e..45765d299e 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -35,6 +35,7 @@ typedef struct
bool include_timestamp;
bool skip_empty_xacts;
bool only_local;
+ bool skip_sequences;
} TestDecodingData;
/*
@@ -76,6 +77,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called);
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
@@ -116,6 +121,10 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_stream_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called);
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations, Relation relations[],
@@ -141,6 +150,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
+ cb->sequence_cb = pg_decode_sequence;
cb->filter_prepare_cb = pg_decode_filter_prepare;
cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
cb->prepare_cb = pg_decode_prepare_txn;
@@ -153,6 +163,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_commit_cb = pg_decode_stream_commit;
cb->stream_change_cb = pg_decode_stream_change;
cb->stream_message_cb = pg_decode_stream_message;
+ cb->stream_sequence_cb = pg_decode_stream_sequence;
cb->stream_truncate_cb = pg_decode_stream_truncate;
}
@@ -175,6 +186,9 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->skip_empty_xacts = false;
data->only_local = false;
+ /* skip sequences by default for backwards compatibility */
+ data->skip_sequences = true;
+
ctx->output_plugin_private = data;
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
@@ -265,6 +279,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "skip-sequences") == 0)
+ {
+
+ if (elem->arg == NULL)
+ continue; /* true by default */
+ else if (!parse_bool(strVal(elem->arg), &data->skip_sequences))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -744,6 +769,28 @@ pg_decode_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
+static void
+pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ /* return if requested to skip_sequences */
+ if (data->skip_sequences)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfoString(ctx->out, "sequence ");
+ appendStringInfoString(ctx->out,
+ quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+ RelationGetRelationName(rel)));
+ appendStringInfo(ctx->out, ": transactional:%d created:%d last_value:%zu log_cnt:%zu is_called:%d",
+ transactional, created, last_value, log_cnt, is_called);
+ OutputPluginWrite(ctx, true);
+}
+
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
@@ -943,6 +990,28 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
+static void
+pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ /* return if requested to skip_sequences */
+ if (data->skip_sequences)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfoString(ctx->out, "streaming sequence ");
+ appendStringInfoString(ctx->out,
+ quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+ RelationGetRelationName(rel)));
+ appendStringInfo(ctx->out, ": transactional:%d created:%d last_value:%zu log_cnt:%zu is_called:%d",
+ transactional, created, last_value, log_cnt, is_called);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* In streaming mode, we don't display the detailed information of Truncate.
* See pg_decode_stream_change.
--
2.31.1
>From d50a5ca52bce39afe0ca88a5c02703d7778db5be Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Fri, 24 Sep 2021 00:43:22 +0200
Subject: [PATCH 3/3] Add support for decoding sequences to built-in
replication
---
doc/src/sgml/catalogs.sgml | 71 +++++
doc/src/sgml/ref/alter_publication.sgml | 28 +-
doc/src/sgml/ref/alter_subscription.sgml | 4 +-
src/backend/catalog/pg_publication.c | 149 ++++++++++-
src/backend/catalog/system_views.sql | 10 +
src/backend/commands/publicationcmds.c | 237 ++++++++++++++++-
src/backend/commands/sequence.c | 79 ++++++
src/backend/commands/subscriptioncmds.c | 272 ++++++++++++++++++++
src/backend/executor/execReplication.c | 2 +-
src/backend/nodes/copyfuncs.c | 4 +-
src/backend/nodes/equalfuncs.c | 4 +-
src/backend/parser/gram.y | 43 +++-
src/backend/replication/logical/proto.c | 54 ++++
src/backend/replication/logical/tablesync.c | 118 ++++++++-
src/backend/replication/logical/worker.c | 60 +++++
src/backend/replication/pgoutput/pgoutput.c | 86 ++++++-
src/backend/utils/cache/relcache.c | 4 +-
src/bin/psql/tab-complete.c | 2 +-
src/include/catalog/pg_proc.dat | 5 +
src/include/catalog/pg_publication.h | 14 +
src/include/commands/sequence.h | 1 +
src/include/nodes/parsenodes.h | 4 +-
src/include/replication/logicalproto.h | 20 ++
src/include/replication/pgoutput.h | 1 +
src/test/regress/expected/rules.out | 8 +
src/test/subscription/t/023_sequences.pl | 196 ++++++++++++++
26 files changed, 1451 insertions(+), 25 deletions(-)
create mode 100644 src/test/subscription/t/023_sequences.pl
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 2f0def9b19..1287804cf8 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9430,6 +9430,11 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l
<entry>prepared transactions</entry>
</row>
+ <row>
+ <entry><link linkend="view-pg-publication-sequences"><structname>pg_publication_sequences</structname></link></entry>
+ <entry>publications and their associated sequences</entry>
+ </row>
+
<row>
<entry><link linkend="view-pg-publication-tables"><structname>pg_publication_tables</structname></link></entry>
<entry>publications and their associated tables</entry>
@@ -11264,6 +11269,72 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
</sect1>
+ <sect1 id="view-pg-publication-sequences">
+ <title><structname>pg_publication_sequences</structname></title>
+
+ <indexterm zone="view-pg-publication-sequences">
+ <primary>pg_publication_sequences</primary>
+ </indexterm>
+
+ <para>
+ The view <structname>pg_publication_sequences</structname> provides
+ information about the mapping between publications and the sequences they
+ contain. Unlike the underlying catalog
+ <link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>,
+ this view expands
+ publications defined as <literal>FOR ALL SEQUENCES</literal>, so for such
+ publications there will be a row for each eligible sequence.
+ </para>
+
+ <table>
+ <title><structname>pg_publication_sequences</structname> Columns</title>
+ <tgroup cols="1">
+ <thead>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ Column Type
+ </para>
+ <para>
+ Description
+ </para></entry>
+ </row>
+ </thead>
+
+ <tbody>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>pubname</structfield> <type>name</type>
+ (references <link linkend="catalog-pg-publication"><structname>pg_publication</structname></link>.<structfield>pubname</structfield>)
+ </para>
+ <para>
+ Name of publication
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>schemaname</structfield> <type>name</type>
+ (references <link linkend="catalog-pg-namespace"><structname>pg_namespace</structname></link>.<structfield>nspname</structfield>)
+ </para>
+ <para>
+ Name of schema containing sequence
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>sequencename</structfield> <type>name</type>
+ (references <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>relname</structfield>)
+ </para>
+ <para>
+ Name of sequence
+ </para></entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ </sect1>
+
<sect1 id="view-pg-publication-tables">
<title><structname>pg_publication_tables</structname></title>
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index faa114b2c6..c68a1573de 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -24,6 +24,9 @@ PostgreSQL documentation
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
@@ -50,7 +53,18 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
</para>
<para>
- The fourth variant of this command listed in the synopsis can change
+ The next three variants change which sequences are part of the publication.
+ The <literal>SET SEQUENCE</literal> clause will replace the list of sequences
+ in the publication with the specified one. The <literal>ADD SEQUENCE</literal>
+ and <literal>DROP SEQUENCE</literal> clauses will add and remove one or more
+ sequences from the publication. Note that adding sequences to a publication
+ that is already subscribed to will require a <literal>ALTER SUBSCRIPTION
+ ... REFRESH PUBLICATION</literal> action on the subscribing side in order
+ to become effective.
+ </para>
+
+ <para>
+ The seventh variant of this command listed in the synopsis can change
all of the publication properties specified in
<xref linkend="sql-createpublication"/>. Properties not mentioned in the
command retain their previous settings.
@@ -62,7 +76,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
<para>
You must own the publication to use <command>ALTER PUBLICATION</command>.
- Adding a table to a publication additionally requires owning that table.
+ Adding a table to a publication additionally requires owning that table,
+ and the same requirement applies to sequences.
To alter the owner, you must also be a direct or indirect member of the new
owning role. The new owner must have <literal>CREATE</literal> privilege on
the database. Also, the new owner of a <literal>FOR ALL TABLES</literal>
@@ -97,6 +112,15 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><replaceable class="parameter">sequence_name</replaceable></term>
+ <listitem>
+ <para>
+ Name of an existing sequence.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term>
<listitem>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index bec5e9c483..1f9cf6e649 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -146,7 +146,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<listitem>
<para>
Fetch missing table information from publisher. This will start
- replication of tables that were added to the subscribed-to publications
+ replication of tables and sequences that were added to the subscribed-to publications
since <command>CREATE SUBSCRIPTION</command> or
the last invocation of <command>REFRESH PUBLICATION</command>.
</para>
@@ -163,7 +163,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
Specifies whether to copy pre-existing data in the publications
that are being subscribed to when the replication starts.
The default is <literal>true</literal>. (Previously-subscribed
- tables are not copied.)
+ tables and sequences are not copied.)
</para>
</listitem>
</varlistentry>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 9cd0c82f93..0502df1454 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -52,7 +52,8 @@ check_publication_add_relation(Relation targetrel)
{
/* Must be a regular or partitioned table */
if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
- RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
+ RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE &&
+ RelationGetForm(targetrel)->relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot add relation \"%s\" to publication",
@@ -99,7 +100,8 @@ static bool
is_publishable_class(Oid relid, Form_pg_class reltuple)
{
return (reltuple->relkind == RELKIND_RELATION ||
- reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
+ reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
+ reltuple->relkind == RELKIND_SEQUENCE) &&
!IsCatalogRelationOid(relid) &&
reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
relid >= FirstNormalObjectId;
@@ -318,6 +320,11 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Form_pg_publication_rel pubrel;
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+ /* skip sequences here */
+ if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+ continue;
+
result = GetPubPartitionOptionRelations(result, pub_partopt,
pubrel->prrelid);
}
@@ -328,6 +335,49 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
return result;
}
+/*
+ * Gets list of relation oids for a publication (sequences only).
+ *
+ * This should only be used for normal publications, the FOR ALL TABLES
+ * should use GetAllSequencesPublicationRelations().
+ */
+List *
+GetPublicationSequenceRelations(Oid pubid)
+{
+ List *result;
+ Relation pubrelsrel;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ HeapTuple tup;
+
+ /* Find all publications associated with the relation. */
+ pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
+
+ ScanKeyInit(&scankey,
+ Anum_pg_publication_rel_prpubid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(pubid));
+
+ scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
+ true, NULL, 1, &scankey);
+
+ result = NIL;
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_publication_rel pubrel;
+
+ pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+ if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+ result = lappend_oid(result, pubrel->prrelid);
+ }
+
+ systable_endscan(scan);
+ table_close(pubrelsrel, AccessShareLock);
+
+ return result;
+}
+
/*
* Gets list of publication oids for publications marked as FOR ALL TABLES.
*/
@@ -428,6 +478,46 @@ GetAllTablesPublicationRelations(bool pubviaroot)
return result;
}
+/*
+ * Gets list of all relation published by FOR ALL TABLES publication(s).
+ *
+ * If the publication publishes partition changes via their respective root
+ * partitioned tables, we must exclude partitions in favor of including the
+ * root partitioned tables.
+ */
+List *
+GetAllSequencesPublicationRelations(void)
+{
+ Relation classRel;
+ ScanKeyData key[1];
+ TableScanDesc scan;
+ HeapTuple tuple;
+ List *result = NIL;
+
+ classRel = table_open(RelationRelationId, AccessShareLock);
+
+ ScanKeyInit(&key[0],
+ Anum_pg_class_relkind,
+ BTEqualStrategyNumber, F_CHAREQ,
+ CharGetDatum(RELKIND_SEQUENCE));
+
+ scan = table_beginscan_catalog(classRel, 1, key);
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+ Oid relid = relForm->oid;
+
+ if (is_publishable_class(relid, relForm))
+ result = lappend_oid(result, relid);
+ }
+
+ table_endscan(scan);
+
+ table_close(classRel, AccessShareLock);
+ return result;
+}
+
/*
* Get publication using oid
*
@@ -450,10 +540,12 @@ GetPublication(Oid pubid)
pub->oid = pubid;
pub->name = pstrdup(NameStr(pubform->pubname));
pub->alltables = pubform->puballtables;
+ pub->allsequences = pubform->puballsequences;
pub->pubactions.pubinsert = pubform->pubinsert;
pub->pubactions.pubupdate = pubform->pubupdate;
pub->pubactions.pubdelete = pubform->pubdelete;
pub->pubactions.pubtruncate = pubform->pubtruncate;
+ pub->pubactions.pubsequence = pubform->pubsequence;
pub->pubviaroot = pubform->pubviaroot;
ReleaseSysCache(tup);
@@ -579,3 +671,56 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+
+/*
+ * Returns Oids of sequences in a publication.
+ */
+Datum
+pg_get_publication_sequences(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ Publication *publication;
+ List *sequences;
+
+ /* stuff done only on the first call of the function */
+ if (SRF_IS_FIRSTCALL())
+ {
+ MemoryContext oldcontext;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /* switch to memory context appropriate for multiple function calls */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ publication = GetPublicationByName(pubname, false);
+
+ /*
+ * Publications support partitioned tables, although all changes are
+ * replicated using leaf partition identity and schema, so we only
+ * need those.
+ */
+ if (publication->allsequences)
+ sequences = GetAllSequencesPublicationRelations();
+ else
+ sequences = GetPublicationSequenceRelations(publication->oid);
+
+ funcctx->user_fctx = (void *) sequences;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ /* stuff done on every call of the function */
+ funcctx = SRF_PERCALL_SETUP();
+ sequences = (List *) funcctx->user_fctx;
+
+ if (funcctx->call_cntr < list_length(sequences))
+ {
+ Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
+
+ SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 55f6e3711d..9bec49bc3e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -372,6 +372,16 @@ CREATE VIEW pg_publication_tables AS
pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE C.oid = GPT.relid;
+CREATE VIEW pg_publication_sequences AS
+ SELECT
+ P.pubname AS pubname,
+ N.nspname AS schemaname,
+ C.relname AS sequencename
+ FROM pg_publication P,
+ LATERAL pg_get_publication_sequences(P.pubname) GPT,
+ pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
+ WHERE C.oid = GPT.relid;
+
CREATE VIEW pg_locks AS
SELECT * FROM pg_lock_status() AS L;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 9c7f91611d..602226caef 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -16,6 +16,7 @@
#include "access/genam.h"
#include "access/htup_details.h"
+#include "access/relation.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
@@ -51,6 +52,12 @@ static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
AlterPublicationStmt *stmt);
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
+static List *OpenSequenceList(List *sequences);
+static void CloseSequenceList(List *rels);
+static void PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+ AlterPublicationStmt *stmt);
+static void PublicationDropSequences(Oid pubid, List *rels, bool missing_ok);
+
static void
parse_publication_options(ParseState *pstate,
List *options,
@@ -69,6 +76,7 @@ parse_publication_options(ParseState *pstate,
pubactions->pubupdate = true;
pubactions->pubdelete = true;
pubactions->pubtruncate = true;
+ pubactions->pubsequence = true;
*publish_via_partition_root = false;
/* Parse options */
@@ -93,6 +101,7 @@ parse_publication_options(ParseState *pstate,
pubactions->pubupdate = false;
pubactions->pubdelete = false;
pubactions->pubtruncate = false;
+ pubactions->pubsequence = false;
*publish_given = true;
publish = defGetString(defel);
@@ -115,6 +124,8 @@ parse_publication_options(ParseState *pstate,
pubactions->pubdelete = true;
else if (strcmp(publish_opt, "truncate") == 0)
pubactions->pubtruncate = true;
+ else if (strcmp(publish_opt, "sequence") == 0)
+ pubactions->pubsequence = true;
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -205,6 +216,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
BoolGetDatum(pubactions.pubdelete);
values[Anum_pg_publication_pubtruncate - 1] =
BoolGetDatum(pubactions.pubtruncate);
+ values[Anum_pg_publication_pubsequence - 1] =
+ BoolGetDatum(pubactions.pubsequence);
values[Anum_pg_publication_pubviaroot - 1] =
BoolGetDatum(publish_via_partition_root);
@@ -380,9 +393,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
rels = OpenTableList(stmt->tables);
- if (stmt->tableAction == DEFELEM_ADD)
+ if (stmt->action == DEFELEM_ADD)
PublicationAddTables(pubid, rels, false, stmt);
- else if (stmt->tableAction == DEFELEM_DROP)
+ else if (stmt->action == DEFELEM_DROP)
PublicationDropTables(pubid, rels, false);
else /* DEFELEM_SET */
{
@@ -440,6 +453,82 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
CloseTableList(rels);
}
+/*
+ * Add or remove sequence to/from publication.
+ */
+static void
+AlterPublicationSequences(AlterPublicationStmt *stmt, Relation rel,
+ HeapTuple tup)
+{
+ List *rels = NIL;
+ Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+ Oid pubid = pubform->oid;
+
+ /* Check that user is allowed to manipulate the publication tables. */
+ if (pubform->puballsequences)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
+ NameStr(pubform->pubname)),
+ errdetail("Sequences cannot be added to or dropped from FOR ALL TABLES publications.")));
+
+ Assert(list_length(stmt->sequences) > 0);
+
+ rels = OpenSequenceList(stmt->sequences);
+
+ if (stmt->action == DEFELEM_ADD)
+ PublicationAddSequences(pubid, rels, false, stmt);
+ else if (stmt->action == DEFELEM_DROP)
+ PublicationDropSequences(pubid, rels, false);
+ else /* DEFELEM_SET */
+ {
+ List *oldrelids = GetPublicationRelations(pubid,
+ PUBLICATION_PART_ROOT);
+ List *delrels = NIL;
+ ListCell *oldlc;
+
+ /* Calculate which relations to drop. */
+ foreach(oldlc, oldrelids)
+ {
+ Oid oldrelid = lfirst_oid(oldlc);
+ ListCell *newlc;
+ bool found = false;
+
+ foreach(newlc, rels)
+ {
+ Relation newrel = (Relation) lfirst(newlc);
+
+ if (RelationGetRelid(newrel) == oldrelid)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ {
+ Relation oldrel = relation_open(oldrelid,
+ ShareUpdateExclusiveLock);
+
+ delrels = lappend(delrels, oldrel);
+ }
+ }
+
+ /* And drop them. */
+ PublicationDropSequences(pubid, delrels, true);
+
+ /*
+ * Don't bother calculating the difference for adding, we'll catch and
+ * skip existing ones when doing catalog update.
+ */
+ PublicationAddSequences(pubid, rels, true, stmt);
+
+ CloseSequenceList(delrels);
+ }
+
+ CloseSequenceList(rels);
+}
+
/*
* Alter the existing publication.
*
@@ -473,8 +562,12 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
if (stmt->options)
AlterPublicationOptions(pstate, stmt, rel, tup);
- else
+ else if (stmt->tables)
AlterPublicationTables(stmt, rel, tup);
+ else if (stmt->sequences)
+ AlterPublicationSequences(stmt, rel, tup);
+ else
+ Assert(false);
/* Cleanup. */
heap_freetuple(tup);
@@ -727,6 +820,144 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
}
}
+/*
+ * Open relations specified by a PublicationTable list.
+ * In the returned list of PublicationRelInfo, tables are locked
+ * in ShareUpdateExclusiveLock mode in order to add them to a publication.
+ */
+static List *
+OpenSequenceList(List *sequences)
+{
+ List *relids = NIL;
+ List *rels = NIL;
+ ListCell *lc;
+
+ /*
+ * Open, share-lock, and check all the explicitly-specified relations
+ */
+ foreach(lc, sequences)
+ {
+ PublicationTable *s = lfirst_node(PublicationTable, lc);
+ Relation rel;
+ Oid myrelid;
+ PublicationRelInfo *pub_rel;
+
+ /* Allow query cancel in case this takes a long time */
+ CHECK_FOR_INTERRUPTS();
+
+ rel = table_openrv(s->relation, ShareUpdateExclusiveLock);
+ myrelid = RelationGetRelid(rel);
+
+ /*
+ * Filter out duplicates if user specifies "foo, foo".
+ *
+ * Note that this algorithm is known to not be very efficient (O(N^2))
+ * but given that it only works on list of tables given to us by user
+ * it's deemed acceptable.
+ */
+ if (list_member_oid(relids, myrelid))
+ {
+ table_close(rel, ShareUpdateExclusiveLock);
+ continue;
+ }
+
+ pub_rel = palloc(sizeof(PublicationRelInfo));
+ pub_rel->relation = rel;
+ rels = lappend(rels, pub_rel);
+ relids = lappend_oid(relids, myrelid);
+ }
+
+ list_free(relids);
+
+ return rels;
+}
+
+/*
+ * Close all relations in the list.
+ */
+static void
+CloseSequenceList(List *rels)
+{
+ ListCell *lc;
+
+ foreach(lc, rels)
+ {
+ PublicationRelInfo *pub_rel;
+
+ pub_rel = (PublicationRelInfo *) lfirst(lc);
+ table_close(pub_rel->relation, NoLock);
+ }
+}
+
+/*
+ * Add listed tables to the publication.
+ */
+static void
+PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+ AlterPublicationStmt *stmt)
+{
+ ListCell *lc;
+
+ Assert(!stmt || !stmt->for_all_sequences);
+
+ foreach(lc, rels)
+ {
+ PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
+ Relation rel = pub_rel->relation;
+ ObjectAddress obj;
+
+ /* Must be owner of the sequence or superuser. */
+ if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
+ RelationGetRelationName(rel));
+
+ obj = publication_add_relation(pubid, pub_rel, if_not_exists);
+ if (stmt)
+ {
+ EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+ (Node *) stmt);
+
+ InvokeObjectPostCreateHook(PublicationRelRelationId,
+ obj.objectId, 0);
+ }
+ }
+}
+
+/*
+ * Remove listed sequences from the publication.
+ */
+static void
+PublicationDropSequences(Oid pubid, List *rels, bool missing_ok)
+{
+ ObjectAddress obj;
+ ListCell *lc;
+ Oid prid;
+
+ foreach(lc, rels)
+ {
+ Relation rel = (Relation) lfirst(lc);
+ Oid relid = RelationGetRelid(rel);
+
+ prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(pubid));
+ if (!OidIsValid(prid))
+ {
+ if (missing_ok)
+ continue;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("relation \"%s\" is not part of the publication",
+ RelationGetRelationName(rel))));
+ }
+
+ ObjectAddressSet(obj, PublicationRelRelationId, prid);
+ performDeletion(&obj, DROP_CASCADE, 0);
+ }
+}
+
+
/*
* Internal workhorse for changing a publication owner
*/
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index a98fcc2e97..93a213919d 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -336,6 +336,85 @@ ResetSequence(Oid seq_relid)
relation_close(seq_rel, NoLock);
}
+/*
+ * Reset a sequence to its initial value.
+ *
+ * The change is made transactionally, so that on failure of the current
+ * transaction, the sequence will be restored to its previous state.
+ * We do that by creating a whole new relfilenode for the sequence; so this
+ * works much like the rewriting forms of ALTER TABLE.
+ *
+ * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
+ * which must not be released until end of transaction. Caller is also
+ * responsible for permissions checking.
+ */
+void
+ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
+{
+ Relation seq_rel;
+ SeqTable elm;
+ Form_pg_sequence_data seq;
+ Buffer buf;
+ HeapTupleData seqdatatuple;
+ HeapTuple tuple;
+
+ /*
+ * Read the old sequence. This does a bit more work than really
+ * necessary, but it's simple, and we do want to double-check that it's
+ * indeed a sequence.
+ */
+ init_sequence(seq_relid, &elm, &seq_rel);
+ (void) read_seq_tuple(seq_rel, &buf, &seqdatatuple);
+
+ /*
+ * Copy the existing sequence tuple.
+ */
+ tuple = heap_copytuple(&seqdatatuple);
+
+ /* Now we're done with the old page */
+ UnlockReleaseBuffer(buf);
+
+ /*
+ * Modify the copied tuple to execute the restart (compare the RESTART
+ * action in AlterSequence)
+ */
+ seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+ seq->last_value = last_value;
+ seq->is_called = is_called;
+ seq->log_cnt = log_cnt;
+
+ /*
+ * Create a new storage file for the sequence.
+ */
+ RelationSetNewRelfilenode(seq_rel, seq_rel->rd_rel->relpersistence);
+
+ /*
+ * Ensure sequence's relfrozenxid is at 0, since it won't contain any
+ * unfrozen XIDs. Same with relminmxid, since a sequence will never
+ * contain multixacts.
+ */
+ Assert(seq_rel->rd_rel->relfrozenxid == InvalidTransactionId);
+ Assert(seq_rel->rd_rel->relminmxid == InvalidMultiXactId);
+
+ /*
+ * Insert the modified tuple into the new storage file.
+ *
+ * XXX Maybe this should also use created=true, just like the other places
+ * calling fill_seq_with_data. That's probably needed for correct cascading
+ * replication.
+ *
+ * XXX That'd mean all fill_seq_with_data callers use created=true, making
+ * the parameter unnecessary.
+ */
+ fill_seq_with_data(seq_rel, tuple, false);
+
+ /* Clear local cache so that we don't think we have cached numbers */
+ /* Note that we do not change the currval() state */
+ elm->cached = elm->last;
+
+ relation_close(seq_rel, NoLock);
+}
+
/*
* Initialize a sequence's relation with the specified tuple as content
*/
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index c47ba26369..5086526904 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -84,6 +84,7 @@ typedef struct SubOpts
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -502,6 +503,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
char *err;
WalReceiverConn *wrconn;
List *tables;
+ List *sequences;
ListCell *lc;
char table_state;
@@ -540,6 +542,26 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
InvalidXLogRecPtr);
}
+ /*
+ * Get the sequence list from publisher and build local sequence
+ * status info.
+ */
+ sequences = fetch_sequence_list(wrconn, publications);
+ foreach(lc, sequences)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+ /* Check for supported relkind. */
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+
+ AddSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
+ }
+
/*
* If requested, create permanent slot for the subscription. We
* won't use the initial snapshot for anything, so no need to
@@ -712,6 +734,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
{
Oid relid = subrel_local_oids[off];
+ /* XXX ignore sequences - maybe do this in GetSubscriptionRelations? */
+ if (get_rel_relkind(relid) == RELKIND_SEQUENCE)
+ continue;
+
if (!bsearch(&relid, pubrel_local_oids,
list_length(pubrel_names), sizeof(Oid), oid_cmp))
{
@@ -803,6 +829,183 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
}
}
+
+ /*
+ * XXX now do the same thing for sequences, maybe before the preceding
+ * block, or earlier?
+ */
+
+ /* Get the table list from publisher. */
+ pubrel_names = fetch_sequence_list(wrconn, sub->publications);
+
+ /* Get local table list. */
+ subrel_states = GetSubscriptionRelations(sub->oid);
+
+ /*
+ * Build qsorted array of local table oids for faster lookup. This can
+ * potentially contain all tables in the database so speed of lookup
+ * is important.
+ */
+ subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+ off = 0;
+ foreach(lc, subrel_states)
+ {
+ SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+
+ subrel_local_oids[off++] = relstate->relid;
+ }
+ qsort(subrel_local_oids, list_length(subrel_states),
+ sizeof(Oid), oid_cmp);
+
+ /*
+ * Rels that we want to remove from subscription and drop any slots
+ * and origins corresponding to them.
+ */
+ sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+
+ /*
+ * Walk over the remote tables and try to match them to locally known
+ * tables. If the table is not known locally create a new state for
+ * it.
+ *
+ * Also builds array of local oids of remote tables for the next step.
+ */
+ off = 0;
+ pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+ foreach(lc, pubrel_names)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+ /* Check for supported relkind. */
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+
+ pubrel_local_oids[off++] = relid;
+
+ if (!bsearch(&relid, subrel_local_oids,
+ list_length(subrel_states), sizeof(Oid), oid_cmp))
+ {
+ AddSubscriptionRelState(sub->oid, relid,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
+ rv->schemaname, rv->relname, sub->name)));
+ }
+ }
+
+ /*
+ * Next remove state for tables we should not care about anymore using
+ * the data we collected above
+ */
+ qsort(pubrel_local_oids, list_length(pubrel_names),
+ sizeof(Oid), oid_cmp);
+
+ remove_rel_len = 0;
+ for (off = 0; off < list_length(subrel_states); off++)
+ {
+ Oid relid = subrel_local_oids[off];
+
+ /* XXX ignore non-sequences - maybe do this in GetSubscriptionRelations? */
+ if (get_rel_relkind(relid) != RELKIND_SEQUENCE)
+ continue;
+
+ if (!bsearch(&relid, pubrel_local_oids,
+ list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ {
+ char state;
+ XLogRecPtr statelsn;
+
+ /*
+ * Lock pg_subscription_rel with AccessExclusiveLock to
+ * prevent any race conditions with the apply worker
+ * re-launching workers at the same time this code is trying
+ * to remove those tables.
+ *
+ * Even if new worker for this particular rel is restarted it
+ * won't be able to make any progress as we hold exclusive
+ * lock on subscription_rel till the transaction end. It will
+ * simply exit as there is no corresponding rel entry.
+ *
+ * This locking also ensures that the state of rels won't
+ * change till we are done with this refresh operation.
+ */
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+ /* Last known rel state. */
+ state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+
+ sub_remove_rels[remove_rel_len].relid = relid;
+ sub_remove_rels[remove_rel_len++].state = state;
+
+ RemoveSubscriptionRel(sub->oid, relid);
+
+ logicalrep_worker_stop(sub->oid, relid);
+
+ /*
+ * For READY state, we would have already dropped the
+ * tablesync origin.
+ */
+ if (state != SUBREL_STATE_READY)
+ {
+ char originname[NAMEDATALEN];
+
+ /*
+ * Drop the tablesync's origin tracking if exists.
+ *
+ * It is possible that the origin is not yet created for
+ * tablesync worker, this can happen for the states before
+ * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+ * concurrently try to drop the origin and by this time
+ * the origin might be already removed. For these reasons,
+ * passing missing_ok = true.
+ */
+ ReplicationOriginNameForTablesync(sub->oid, relid, originname,
+ sizeof(originname));
+ replorigin_drop_by_name(originname, true, false);
+ }
+
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name)));
+ }
+ }
+
+ /*
+ * Drop the tablesync slots associated with removed tables. This has
+ * to be at the end because otherwise if there is an error while doing
+ * the database operations we won't be able to rollback dropped slots.
+ */
+ for (off = 0; off < remove_rel_len; off++)
+ {
+ if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
+ sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ /*
+ * For READY/SYNCDONE states we know the tablesync slot has
+ * already been dropped by the tablesync worker.
+ *
+ * For other states, there is no certainty, maybe the slot
+ * does not exist yet. Also, if we fail after removing some of
+ * the slots, next time, it will again try to drop already
+ * dropped slots and fail. For these reasons, we allow
+ * missing_ok = true for the drop.
+ */
+ ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
+ syncslotname, sizeof(syncslotname));
+ ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+ }
+ }
+
}
PG_FINALLY();
{
@@ -1607,6 +1810,75 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
return tablelist;
}
+/*
+ * Get the list of sequences which belong to specified publications on the
+ * publisher connection.
+ */
+static List *
+fetch_sequence_list(WalReceiverConn *wrconn, List *publications)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[2] = {TEXTOID, TEXTOID};
+ ListCell *lc;
+ bool first;
+ List *tablelist = NIL;
+
+ Assert(list_length(publications) > 0);
+
+ initStringInfo(&cmd);
+ appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n"
+ " FROM pg_catalog.pg_publication_sequences s\n"
+ " WHERE s.pubname IN (");
+ first = true;
+ foreach(lc, publications)
+ {
+ char *pubname = strVal(lfirst(lc));
+
+ if (first)
+ first = false;
+ else
+ appendStringInfoString(&cmd, ", ");
+
+ appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+ }
+ appendStringInfoChar(&cmd, ')');
+
+ res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not receive list of replicated tables from the publisher: %s",
+ res->err)));
+
+ /* Process tables. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *nspname;
+ char *relname;
+ bool isnull;
+ RangeVar *rv;
+
+ nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+ relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ rv = makeRangeVar(nspname, relname, -1);
+ tablelist = lappend(tablelist, rv);
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+
+ return tablelist;
+}
+
/*
* This is to report the connection failure while dropping replication slots.
* Here, we report the WARNING for all tablesync slots so that user can drop
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 574d7d27fd..6f0d42d551 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -608,7 +608,7 @@ void
CheckSubscriptionRelkind(char relkind, const char *nspname,
const char *relname)
{
- if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+ if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot use relation \"%s.%s\" as logical replication target",
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 228387eaee..39b40646a5 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4831,8 +4831,10 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from)
COPY_STRING_FIELD(pubname);
COPY_NODE_FIELD(options);
COPY_NODE_FIELD(tables);
+ COPY_NODE_FIELD(sequences);
COPY_SCALAR_FIELD(for_all_tables);
- COPY_SCALAR_FIELD(tableAction);
+ COPY_SCALAR_FIELD(for_all_sequences);
+ COPY_SCALAR_FIELD(action);
return newnode;
}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 800f588b5c..77c906f615 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2315,8 +2315,10 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a,
COMPARE_STRING_FIELD(pubname);
COMPARE_NODE_FIELD(options);
COMPARE_NODE_FIELD(tables);
+ COMPARE_NODE_FIELD(sequences);
COMPARE_SCALAR_FIELD(for_all_tables);
- COMPARE_SCALAR_FIELD(tableAction);
+ COMPARE_SCALAR_FIELD(for_all_sequences);
+ COMPARE_SCALAR_FIELD(action);
return true;
}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index e3068a374e..a686d897c8 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9596,6 +9596,7 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec
*****************************************************************************/
CreatePublicationStmt:
+ /* CREATE PUBLICATION name opt_publication_for_tables opt_publication_for_sequences opt_definition */
CREATE PUBLICATION name opt_publication_for_tables opt_definition
{
CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
@@ -9670,7 +9671,7 @@ AlterPublicationStmt:
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
- n->tableAction = DEFELEM_ADD;
+ n->action = DEFELEM_ADD;
$$ = (Node *)n;
}
| ALTER PUBLICATION name SET TABLE publication_table_list
@@ -9678,7 +9679,7 @@ AlterPublicationStmt:
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
- n->tableAction = DEFELEM_SET;
+ n->action = DEFELEM_SET;
$$ = (Node *)n;
}
| ALTER PUBLICATION name DROP TABLE publication_table_list
@@ -9686,7 +9687,31 @@ AlterPublicationStmt:
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
- n->tableAction = DEFELEM_DROP;
+ n->action = DEFELEM_DROP;
+ $$ = (Node *)n;
+ }
+ | ALTER PUBLICATION name ADD_P SEQUENCE publication_table_list
+ {
+ AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+ n->pubname = $3;
+ n->sequences = $6;
+ n->action = DEFELEM_ADD;
+ $$ = (Node *)n;
+ }
+ | ALTER PUBLICATION name SET SEQUENCE publication_table_list
+ {
+ AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+ n->pubname = $3;
+ n->sequences = $6;
+ n->action = DEFELEM_SET;
+ $$ = (Node *)n;
+ }
+ | ALTER PUBLICATION name DROP SEQUENCE publication_table_list
+ {
+ AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+ n->pubname = $3;
+ n->sequences = $6;
+ n->action = DEFELEM_DROP;
$$ = (Node *)n;
}
;
@@ -9935,6 +9960,12 @@ UnlistenStmt:
}
;
+/*
+ * FIXME
+ *
+ * opt_publication_for_sequences and publication_for_sequences should be
+ * copies for sequences
+ */
/*****************************************************************************
*
@@ -9943,6 +9974,12 @@ UnlistenStmt:
* BEGIN / COMMIT / ROLLBACK
* (also older versions END / ABORT)
*
+ * ALTER PUBLICATION name ADD SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name DROP SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name SET SEQUENCE sequence [, sequence2]
+ *
*****************************************************************************/
TransactionStmt:
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b639..9f53b4c47b 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -647,6 +647,58 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
pq_sendbytes(out, message, sz);
}
+/*
+ * Write SEQUENCE to stream
+ */
+void
+logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
+ XLogRecPtr lsn, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ uint8 flags = 0;
+ char *relname;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ pq_sendint8(out, flags);
+ pq_sendint64(out, lsn);
+
+ logicalrep_write_namespace(out, RelationGetNamespace(rel));
+ relname = RelationGetRelationName(rel);
+ pq_sendstring(out, relname);
+
+ pq_sendint8(out, transactional);
+ pq_sendint8(out, created);
+ pq_sendint64(out, last_value);
+ pq_sendint64(out, log_cnt);
+ pq_sendint8(out, is_called);
+}
+
+/*
+ * Read SEQUENCE from the stream.
+ */
+void
+logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
+{
+ /* XXX skipping flags and lsn */
+ pq_getmsgint(in, 1);
+ pq_getmsgint64(in);
+
+ /* Read relation name from stream */
+ seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
+ seqdata->seqname = pstrdup(pq_getmsgstring(in));
+
+ seqdata->transactional = pq_getmsgint(in, 1);
+ seqdata->created = pq_getmsgint(in, 1);
+ seqdata->last_value = pq_getmsgint64(in);
+ seqdata->log_cnt = pq_getmsgint64(in);
+ seqdata->is_called = pq_getmsgint(in, 1);
+}
+
/*
* Write relation description to the output stream.
*/
@@ -1203,6 +1255,8 @@ logicalrep_message_type(LogicalRepMsgType action)
return "STREAM ABORT";
case LOGICAL_REP_MSG_STREAM_PREPARE:
return "STREAM PREPARE";
+ case LOGICAL_REP_MSG_SEQUENCE:
+ return "SEQUENCE";
}
elog(ERROR, "invalid logical replication message type \"%c\"", action);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..24369a1522 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
+#include "commands/sequence.h"
#include "miscadmin.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
@@ -357,6 +358,12 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
*
* If the synchronization position is reached (SYNCDONE), then the table can
* be marked as READY and is no longer tracked.
+ *
+ * XXX This needs to handle sequences too - after AlterSubscription_refresh
+ * starts caring about sequences, GetSubscriptionNotReadyRelations won't
+ * return just tables, and we'll have to sync them here. Not sure it's worth
+ * creating a new "sync" worker per sequence, maybe we should just sync them
+ * in the current process (it's pretty light-weight).
*/
static void
process_syncing_tables_for_apply(XLogRecPtr current_lsn)
@@ -871,6 +878,99 @@ copy_table(Relation rel)
logicalrep_rel_close(relmapentry, NoLock);
}
+
+
+/*
+ * FIXME add comment
+ */
+static void
+fetch_sequence_data(char *nspname, char *relname,
+ int64 *last_value, int64 *log_cnt, bool *is_called)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID};
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
+ " FROM %s", quote_qualified_identifier(nspname, relname));
+
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not receive list of replicated tables from the publisher: %s",
+ res->err)));
+
+ /* Process the sequence. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ bool isnull;
+
+ *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+
+ *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull));
+ Assert(!isnull);
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+}
+
+/*
+ * Copy existing data of a sequence from publisher.
+ *
+ * Caller is responsible for locking the local relation.
+ */
+static void
+copy_sequence(Relation rel)
+{
+ LogicalRepRelMapEntry *relmapentry;
+ LogicalRepRelation lrel;
+ StringInfoData cmd;
+ int64 last_value = 0,
+ log_cnt = 0;
+ bool is_called = 0;
+
+ /* Get the publisher relation info. */
+ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
+ RelationGetRelationName(rel), &lrel);
+
+ /* Put the relation into relmap. */
+ logicalrep_relmap_update(&lrel);
+
+ /* Map the publisher relation to local one. */
+ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
+ Assert(rel == relmapentry->localrel);
+
+ /* Start copy on the publisher. */
+ initStringInfo(&cmd);
+
+ Assert(lrel.relkind == RELKIND_SEQUENCE);
+
+ fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
+
+ elog(WARNING, "sequence %s info last_value %ld log_cnt %ld is_called %d",
+ quote_qualified_identifier(lrel.nspname, lrel.relname),
+ last_value, log_cnt, is_called);
+
+ ResetSequence2(RelationGetRelid(rel), last_value, log_cnt, is_called);
+
+ logicalrep_rel_close(relmapentry, NoLock);
+}
+
+
+
+
/*
* Determine the tablesync slot name.
*
@@ -1106,10 +1206,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
originname)));
}
- /* Now do the initial data copy */
- PushActiveSnapshot(GetTransactionSnapshot());
- copy_table(rel);
- PopActiveSnapshot();
+ if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
+ {
+ /* Now do the initial sequence copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_sequence(rel);
+ PopActiveSnapshot();
+ }
+ else
+ {
+ /* Now do the initial data copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_table(rel);
+ PopActiveSnapshot();
+ }
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8d96c926b4..5ffd513eae 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -144,6 +144,7 @@
#include "catalog/pg_tablespace.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
+#include "commands/sequence.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
@@ -1091,6 +1092,61 @@ apply_handle_origin(StringInfo s)
errmsg_internal("ORIGIN message sent out of order")));
}
+/*
+ * Handle SEQUENCE message.
+ */
+static void
+apply_handle_sequence(StringInfo s)
+{
+ LogicalRepSequence seq;
+ Oid relid;
+
+ if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
+ return;
+
+ logicalrep_read_sequence(s, &seq);
+
+ /*
+ * Non-transactional sequence updates should not be part of a remote
+ * transaction. There should not be any running transaction.
+ */
+ Assert((!seq.transactional) || in_remote_transaction);
+ Assert(!(!seq.transactional && in_remote_transaction));
+ Assert(!(!seq.transactional && IsTransactionState()));
+
+ /*
+ * Make sure we're in a transaction (needed by ResetSequence2). For
+ * non-transactional updates we're guaranteed to start a new one,
+ * and we'll commit it at the end.
+ */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ maybe_reread_subscription();
+ }
+
+ relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
+ seq.seqname, -1),
+ RowExclusiveLock, false);
+
+ /* lock the sequence in AccessExclusiveLock, as expected by ResetSequence2 */
+ elog(WARNING, "locking sequence %d in exclusive mode", relid);
+ LockRelationOid(relid, AccessExclusiveLock);
+
+ elog(WARNING, "applying sequence %s.%s transactional %d created %d last_value %ld log_cnt %ld is_called %d",
+ seq.nspname, seq.seqname, seq.transactional, seq.created, seq.last_value, seq.log_cnt, seq.is_called);
+
+ /* apply the sequence change */
+ ResetSequence2(relid, seq.last_value, seq.log_cnt, seq.is_called);
+
+ /*
+ * Commit the per-stream transaction (we only do this when not in
+ * remote transaction, i.e. for non-transactional sequence updates.
+ */
+ if (!in_remote_transaction)
+ CommitTransactionCommand();
+}
+
/*
* Handle STREAM START message.
*/
@@ -2371,6 +2427,10 @@ apply_dispatch(StringInfo s)
*/
break;
+ case LOGICAL_REP_MSG_SEQUENCE:
+ apply_handle_sequence(s);
+ return;
+
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
break;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 14d737fd93..287d60f91f 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -49,6 +49,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pgoutput_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
@@ -159,6 +163,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
+ cb->sequence_cb = pgoutput_sequence;
cb->commit_cb = pgoutput_commit_txn;
cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -175,6 +180,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
cb->stream_message_cb = pgoutput_message;
+ cb->stream_sequence_cb = pgoutput_sequence;
cb->stream_truncate_cb = pgoutput_truncate;
/* transaction streaming - two-phase commit */
cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
@@ -188,6 +194,7 @@ parse_output_parameters(List *options, PGOutputData *data)
bool publication_names_given = false;
bool binary_option_given = false;
bool messages_option_given = false;
+ bool sequences_option_given = false;
bool streaming_given = false;
bool two_phase_option_given = false;
@@ -195,6 +202,7 @@ parse_output_parameters(List *options, PGOutputData *data)
data->streaming = false;
data->messages = false;
data->two_phase = false;
+ data->sequences = true;
foreach(lc, options)
{
@@ -260,6 +268,16 @@ parse_output_parameters(List *options, PGOutputData *data)
data->messages = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "sequences") == 0)
+ {
+ if (sequences_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ sequences_option_given = true;
+
+ data->sequences = defGetBoolean(defel);
+ }
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
@@ -856,6 +874,52 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+static void
+pgoutput_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional, bool created,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ TransactionId xid = InvalidTransactionId;
+ RelationSyncEntry *relentry;
+
+ if (!data->sequences)
+ return;
+
+ if (!is_publishable_relation(rel))
+ return;
+
+ /*
+ * Remember the xid for the message in streaming mode. See
+ * pgoutput_change.
+ */
+ if (in_streaming)
+ xid = txn->xid;
+
+ relentry = get_rel_sync_entry(data, RelationGetRelid(rel));
+
+ /*
+ * First check the sequence filter.
+ *
+ * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here.
+ */
+ if (!relentry->pubactions.pubsequence)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_sequence(ctx->out,
+ rel,
+ xid,
+ sequence_lsn,
+ transactional,
+ created,
+ last_value,
+ log_cnt,
+ is_called);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* Currently we always forward.
*/
@@ -1137,7 +1201,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->streamed_txns = NIL;
entry->replicate_valid = false;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
- entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+ entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
+ entry->pubactions.pubsequence = false;
entry->publish_as_relid = InvalidOid;
entry->map = NULL; /* will be set by maybe_send_schema() if
* needed */
@@ -1149,6 +1214,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
List *pubids = GetRelationPublications(relid);
ListCell *lc;
Oid publish_as_relid = relid;
+ bool is_sequence = (get_rel_relkind(relid) == RELKIND_SEQUENCE);
/* Reload publications if needed before use. */
if (!publications_valid)
@@ -1172,12 +1238,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
Publication *pub = lfirst(lc);
bool publish = false;
- if (pub->alltables)
+ if (pub->alltables && (!is_sequence))
{
publish = true;
if (pub->pubviaroot && am_partition)
publish_as_relid = llast_oid(get_partition_ancestors(relid));
}
+ else if (pub->allsequences && is_sequence)
+ {
+ publish = true;
+ }
+
+ /* if a sequence, just cross-check the list of publications */
+ if (!publish && is_sequence)
+ {
+ if (list_member_oid(pubids, pub->oid))
+ publish = true;
+ }
if (!publish)
{
@@ -1228,10 +1305,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+ entry->pubactions.pubsequence |= pub->pubactions.pubsequence;
}
if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
- entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
+ entry->pubactions.pubdelete && entry->pubactions.pubtruncate &&
+ entry->pubactions.pubsequence)
break;
}
@@ -1376,6 +1455,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
+ entry->pubactions.pubsequence = false;
}
}
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 13d9994af3..3dd39054e6 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5495,6 +5495,7 @@ GetRelationPublicationActions(Relation relation)
pubactions->pubupdate |= pubform->pubupdate;
pubactions->pubdelete |= pubform->pubdelete;
pubactions->pubtruncate |= pubform->pubtruncate;
+ pubactions->pubsequence |= pubform->pubsequence;
ReleaseSysCache(tup);
@@ -5503,7 +5504,8 @@ GetRelationPublicationActions(Relation relation)
* other publications.
*/
if (pubactions->pubinsert && pubactions->pubupdate &&
- pubactions->pubdelete && pubactions->pubtruncate)
+ pubactions->pubdelete && pubactions->pubtruncate &&
+ pubactions->pubsequence)
break;
}
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 5cd5838668..875882177d 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1644,7 +1644,7 @@ psql_completion(const char *text, int start, int end)
/* ALTER PUBLICATION <name> */
else if (Matches("ALTER", "PUBLICATION", MatchAny))
- COMPLETE_WITH("ADD TABLE", "DROP TABLE", "OWNER TO", "RENAME TO", "SET");
+ COMPLETE_WITH("ADD TABLE", "DROP TABLE", "ADD SEQUENCE", "DROP SEQUENCE", "OWNER TO", "RENAME TO", "SET");
/* ALTER PUBLICATION <name> SET */
else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET"))
COMPLETE_WITH("(", "TABLE");
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d068d6532e..476205ae9d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11509,6 +11509,11 @@
provolatile => 's', prorettype => 'oid', proargtypes => 'text',
proallargtypes => '{text,oid}', proargmodes => '{i,o}',
proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' },
+{ oid => '8000', descr => 'get OIDs of sequences in a publication',
+ proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't',
+ provolatile => 's', prorettype => 'oid', proargtypes => 'text',
+ proallargtypes => '{text,oid}', proargmodes => '{i,o}',
+ proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' },
{ oid => '6121',
descr => 'returns whether a relation can be part of a publication',
proname => 'pg_relation_is_publishable', provolatile => 's',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 82f2536c65..3ed613b455 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -40,6 +40,12 @@ CATALOG(pg_publication,6104,PublicationRelationId)
*/
bool puballtables;
+ /*
+ * indicates that this is special publication which should encompass all
+ * sequences in the database (except for the unlogged and temp ones)
+ */
+ bool puballsequences;
+
/* true if inserts are published */
bool pubinsert;
@@ -52,6 +58,9 @@ CATALOG(pg_publication,6104,PublicationRelationId)
/* true if truncates are published */
bool pubtruncate;
+ /* true if sequences are published */
+ bool pubsequence;
+
/* true if partition changes are published using root schema */
bool pubviaroot;
} FormData_pg_publication;
@@ -72,6 +81,7 @@ typedef struct PublicationActions
bool pubupdate;
bool pubdelete;
bool pubtruncate;
+ bool pubsequence;
} PublicationActions;
typedef struct Publication
@@ -79,6 +89,7 @@ typedef struct Publication
Oid oid;
char *name;
bool alltables;
+ bool allsequences;
bool pubviaroot;
PublicationActions pubactions;
} Publication;
@@ -115,6 +126,9 @@ extern List *GetPubPartitionOptionRelations(List *result,
PublicationPartOpt pub_partopt,
Oid relid);
+extern List *GetAllSequencesPublicationRelations(void);
+extern List *GetPublicationSequenceRelations(Oid pubid);
+
extern bool is_publishable_relation(Relation rel);
extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
bool if_not_exists);
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 5919fb90ee..c28e8695cb 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -60,6 +60,7 @@ extern ObjectAddress DefineSequence(ParseState *pstate, CreateSeqStmt *stmt);
extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
extern void DeleteSequenceTuple(Oid relid);
extern void ResetSequence(Oid seq_relid);
+extern void ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called);
extern void ResetSequenceCaches(void);
extern void seq_redo(XLogReaderState *rptr);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 3138877553..d00b18ac47 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3661,8 +3661,10 @@ typedef struct AlterPublicationStmt
/* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
List *tables; /* List of tables to add/drop */
+ List *sequences; /* List of sequences to add/drop */
bool for_all_tables; /* Special publication for all tables in db */
- DefElemAction tableAction; /* What action to perform with the tables */
+ bool for_all_sequences; /* Special publication for all tables in db */
+ DefElemAction action; /* What action to perform with the tables/sequences */
} AlterPublicationStmt;
typedef struct CreateSubscriptionStmt
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 83741dcf42..d468c3ccfb 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -60,6 +60,7 @@ typedef enum LogicalRepMsgType
LOGICAL_REP_MSG_RELATION = 'R',
LOGICAL_REP_MSG_TYPE = 'Y',
LOGICAL_REP_MSG_MESSAGE = 'M',
+ LOGICAL_REP_MSG_SEQUENCE = 'X', /* FIXME change */
LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
LOGICAL_REP_MSG_PREPARE = 'P',
LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
@@ -117,6 +118,19 @@ typedef struct LogicalRepTyp
char *typname; /* name of the remote type */
} LogicalRepTyp;
+/* Sequence info */
+typedef struct LogicalRepSequence
+{
+ Oid remoteid; /* unique id of the remote sequence */
+ char *nspname; /* schema name of remote sequence */
+ char *seqname; /* name of the remote sequence */
+ bool transactional;
+ bool created;
+ int64 last_value;
+ int64 log_cnt; /* XXX probably not needed? */
+ bool is_called; /* XXX probably not needed? */
+} LogicalRepSequence;
+
/* Transaction info */
typedef struct LogicalRepBeginData
{
@@ -227,6 +241,12 @@ extern List *logicalrep_read_truncate(StringInfo in,
bool *cascade, bool *restart_seqs);
extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
bool transactional, const char *prefix, Size sz, const char *message);
+extern void logicalrep_write_sequence(StringInfo out, Relation rel,
+ TransactionId xid, XLogRecPtr lsn,
+ bool transactional, bool created,
+ int64 last_value, int64 log_cnt,
+ bool is_called);
+extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 0dc460fb70..97eb2a7ef1 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -28,6 +28,7 @@ typedef struct PGOutputData
bool streaming;
bool messages;
bool two_phase;
+ bool sequences;
} PGOutputData;
#endif /* PGOUTPUT_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 2fa00a3c29..b32a1e6729 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,6 +1451,14 @@ pg_prepared_xacts| SELECT p.transaction,
FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_publication_sequences| SELECT p.pubname,
+ n.nspname AS schemaname,
+ c.relname AS sequencename
+ FROM pg_publication p,
+ LATERAL pg_get_publication_sequences((p.pubname)::text) gpt(relid),
+ (pg_class c
+ JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
+ WHERE (c.oid = gpt.relid);
pg_publication_tables| SELECT p.pubname,
n.nspname AS schemaname,
c.relname AS tablename
diff --git a/src/test/subscription/t/023_sequences.pl b/src/test/subscription/t/023_sequences.pl
new file mode 100644
index 0000000000..1f7768acd6
--- /dev/null
+++ b/src/test/subscription/t/023_sequences.pl
@@ -0,0 +1,196 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# This tests that sequences are replicated correctly by logical replication
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 6;
+
+# Initialize publisher node
+my $node_publisher = PostgresNode->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgresNode->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+my $ddl = qq(
+ CREATE SEQUENCE s;
+);
+
+# Setup structure on the publisher
+$node_publisher->safe_psql('postgres', $ddl);
+
+# Create some the same structure on subscriber, and an extra sequence that
+# we'll create on the publisher later
+$ddl = qq(
+ CREATE SEQUENCE s;
+ CREATE SEQUENCE s2;
+);
+
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION seq_pub");
+
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION seq_pub ADD SEQUENCE s");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub WITH (slot_name = seq_sub_slot)"
+);
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Wait for initial sync to finish as well
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Insert initial test data
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ -- generate a number of values using the sequence
+ SELECT nextval('s') FROM generate_series(1,100);
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+my $result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s;
+));
+
+is( $result, '132|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+# advance the sequence in a rolled-back transaction - should be replicated
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ SELECT nextval('s') FROM generate_series(1,100);
+ ROLLBACK;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s;
+));
+
+is( $result, '231|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+# create a new sequence and roll it back - should not be replicated, due to
+# the transactional behavior
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ CREATE SEQUENCE s2;
+ ALTER PUBLICATION seq_pub ADD SEQUENCE s2;
+ SELECT nextval('s2') FROM generate_series(1,100);
+ ROLLBACK;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s2;
+));
+
+is( $result, '1|0|f',
+ 'check replicated sequence values on subscriber');
+
+
+# create a new sequence, advance it in a rolled-back transaction, but commit
+# the create - the advance should be replicated nevertheless
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ CREATE SEQUENCE s2;
+ ALTER PUBLICATION seq_pub ADD SEQUENCE s2;
+ SAVEPOINT sp1;
+ SELECT nextval('s2') FROM generate_series(1,100);
+ ROLLBACK TO sp1;
+ COMMIT;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Wait for sync of the second sequence we just added to finish
+$synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s2;
+));
+
+is( $result, '132|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+# advance the new sequence in a transaction, and roll it back - in this case
+# it should be replicated as the behavior is non-transactional
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ SELECT nextval('s2') FROM generate_series(1,100);
+ ROLLBACK;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s2;
+));
+
+is( $result, '231|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+# advance the sequence in a subtransaction - the subtransaction gets rolled
+# back, but commit the main one - the changes should still be replicated
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ SAVEPOINT s1;
+ SELECT nextval('s2') FROM generate_series(1,100);
+ ROLLBACK TO s1;
+ COMMIT;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s2;
+));
+
+is( $result, '330|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
--
2.31.1