On 3/25/22 12:59, Tomas Vondra wrote:
>
> On 3/25/22 12:21, Amit Kapila wrote:
>> On Fri, Mar 25, 2022 at 3:56 PM Tomas Vondra
>> <tomas.von...@enterprisedb.com> wrote:
>>>
>>>
>>> On 3/25/22 05:01, Amit Kapila wrote:
>>>> On Fri, Mar 25, 2022 at 3:29 AM Tomas Vondra
>>>> <tomas.von...@enterprisedb.com> wrote:
>>>>>
>>>>> Pushed.
>>>>>
>>>>
>>>> Some of the comments given by me [1] don't seem to be addressed or
>>>> responded to. Let me try to say again for the ease of discussion:
>>>>
>>>
>>> D'oh! I got distracted by Petr's response to that message, and missed
>>> this part ...
>>>
>>>> * Don't we need some syncing mechanism between apply worker and
>>>> sequence sync worker so that apply worker skips the sequence changes
>>>> till the sync worker is finished, otherwise, there is a risk of one
>>>> overriding the values of the other? See how we take care of this for a
>>>> table in should_apply_changes_for_rel() and its callers. If we don't
>>>> do this for sequences for some reason then probably a comment
>>>> somewhere is required.
>>>>
>>>
>>> How would that happen? If we're effectively setting the sequence as a
>>> side effect of inserting the data, then why should we even replicate the
>>> sequence?
>>>
>>
>> I was talking just about sequence values here, considering that some
>> sequence is just replicating based on nextval. I think the problem is
>> that apply worker might override what copy has done if an apply worker
>> is behind the sequence sync worker as both can run in parallel. Let me
>> try to take some theoretical example to explain this:
>>
>> Assume, at LSN 10000, the value of sequence s1 is 10. Then by LSN
>> 12000, the value of s1 becomes 20. Now, say copy decides to copy the
>> sequence value till LSN 12000 which means it will make the value as 20
>> on the subscriber, now, in parallel, apply worker can process LSN
>> 10000 and make it again 10. Apply worker might end up redoing all
>> sequence operations along with some transactional ones where we
>> recreate the file. I am not sure what exact problem it can lead to but
>> I think we don't need to redo the work.
>>
>> We'll have the problem later too, no?
>>
>
> Ah, I was confused why this would be an issue for sequences and not for
> plain tables, but now I realize apply_handle_sequence() is not called in
> apply_handle_sequence. Yes, that's probably a thinko.
>
Hmm, so fixing this might be a bit trickier than I expected.
Firstly, currently we only send nspname/relname in the sequence message,
not the remote OID or schema. The idea was that for sequences we don't
really need schema info, so this seemed OK.
But should_apply_changes_for_rel() needs LogicalRepRelMapEntry, and to
create/maintain that those records we need to send the schema.
Attached is a WIP patch does that.
Two places need more work, I think:
1) maybe_send_schema needs ReorderBufferChange, but we don't have that
for sequences, we only have TXN. I created a simple wrapper, but maybe
we should just tweak maybe_send_schema to use TXN.
2) The transaction handling in is a bit confusing. The non-transactional
increments won't have any explicit commit later, so we can't just rely
on begin_replication_step/end_replication_step. But I want to try
spending a bit more time on this.
But there's a more serious issue, I think. So far, we allowed this:
BEGIN;
CREATE SEQUENCE s2;
ALTER PUBLICATION p ADD SEQUENCE s2;
INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100);
COMMIT;
and the behavior was that we replicated the changes. But with the patch
applied, that no longer happens, because should_apply_changes_for_rel
says the change should not be applied.
And after thinking about this, I think that's correct - we can't apply
changes until ALTER SUBSCRIPTION ... REFRESH PUBLICATION gets executed,
and we can't do that until the transaction commits.
So I guess that's correct, and the current behavior is a bug.
For a while I was thinking that maybe this means we don't need the
transactional behavior at all, but I think we do - we have to handle
ALTER SEQUENCE cases that are transactional.
Does that make sense, Amit?
regards
--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 3dbe85d61a..0ae0378191 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -657,7 +657,6 @@ logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
int64 last_value, int64 log_cnt, bool is_called)
{
uint8 flags = 0;
- char *relname;
pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
@@ -668,9 +667,8 @@ logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
pq_sendint8(out, flags);
pq_sendint64(out, lsn);
- logicalrep_write_namespace(out, RelationGetNamespace(rel));
- relname = RelationGetRelationName(rel);
- pq_sendstring(out, relname);
+ /* OID ad sequence identifier */
+ pq_sendint32(out, RelationGetRelid(rel));
pq_sendint8(out, transactional);
pq_sendint64(out, last_value);
@@ -688,9 +686,8 @@ logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
pq_getmsgint(in, 1);
pq_getmsgint64(in);
- /* Read relation name from stream */
- seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
- seqdata->seqname = pstrdup(pq_getmsgstring(in));
+ /* Read relation OID from stream */
+ seqdata->remoteid = pq_getmsgint(in, 4);
seqdata->transactional = pq_getmsgint(in, 1);
seqdata->last_value = pq_getmsgint64(in);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f3868b3e1f..d2acb08996 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1151,7 +1151,8 @@ static void
apply_handle_sequence(StringInfo s)
{
LogicalRepSequence seq;
- Oid relid;
+ LogicalRepRelMapEntry *rel;
+ LOCKMODE lockmode;
if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
return;
@@ -1167,9 +1168,8 @@ apply_handle_sequence(StringInfo s)
Assert(!(!seq.transactional && IsTransactionState()));
/*
- * Make sure we're in a transaction (needed by SetSequence). For
- * non-transactional updates we're guaranteed to start a new one,
- * and we'll commit it at the end.
+ * Make sure there's a transaction (the non-transactional case may can't
+ * have one yet).
*/
if (!IsTransactionState())
{
@@ -1177,15 +1177,38 @@ apply_handle_sequence(StringInfo s)
maybe_reread_subscription();
}
- relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
- seq.seqname, -1),
- RowExclusiveLock, false);
+ /*
+ * Use the necessary lock mode. For transactional changes we need
+ * AccessExclusiveLock (just like ResetSequence), while for regular
+ * updates RowExclusiveLock is enough).
+ */
+ lockmode = (seq.transactional ? AccessExclusiveLock : RowExclusiveLock);
- /* lock the sequence in AccessExclusiveLock, as expected by SetSequence */
- LockRelationOid(relid, AccessExclusiveLock);
+ rel = logicalrep_rel_open(seq.remoteid, lockmode);
+
+ if (!should_apply_changes_for_rel(rel))
+ {
+ /*
+ * The relation can't become interesting in the middle of the
+ * transaction so it's safe to unlock it.
+ */
+ logicalrep_rel_close(rel, RowExclusiveLock);
+
+ /*
+ * In non-transactional case there won't be any other changes, so
+ * abort the whole transaction.
+ */
+ if (!seq.transactional)
+ AbortCurrentTransaction();
+
+ return;
+ }
/* apply the sequence change */
- SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
+ SetSequence(RelationGetRelid(rel->localrel),
+ seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
+
+ logicalrep_rel_close(rel, NoLock);
/*
* Commit the per-stream transaction (we only do this when not in
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 4cdc698cbb..366faf9564 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -568,9 +568,9 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
* done yet.
*/
static void
-maybe_send_schema(LogicalDecodingContext *ctx,
- ReorderBufferChange *change,
- Relation relation, RelationSyncEntry *relentry)
+maybe_send_schema_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation, RelationSyncEntry *relentry)
{
bool schema_sent;
TransactionId xid = InvalidTransactionId;
@@ -585,10 +585,10 @@ maybe_send_schema(LogicalDecodingContext *ctx,
* the write methods will not include it.
*/
if (in_streaming)
- xid = change->txn->xid;
+ xid = txn->xid;
- if (change->txn->toptxn)
- topxid = change->txn->toptxn->xid;
+ if (txn->toptxn)
+ topxid = txn->toptxn->xid;
else
topxid = xid;
@@ -634,6 +634,14 @@ maybe_send_schema(LogicalDecodingContext *ctx,
relentry->schema_sent = true;
}
+static void
+maybe_send_schema(LogicalDecodingContext *ctx,
+ ReorderBufferChange *change,
+ Relation relation, RelationSyncEntry *relentry)
+{
+ maybe_send_schema_txn(ctx, change->txn, relation, relentry);
+}
+
/*
* Sends a relation
*/
@@ -1492,6 +1500,8 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
if (!relentry->pubactions.pubsequence)
return;
+ maybe_send_schema_txn(ctx, txn, relation, relentry);
+
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_sequence(ctx->out,
relation,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index fb86ca022d..b7ab68d9a2 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -122,9 +122,10 @@ typedef struct LogicalRepTyp
/* Sequence info */
typedef struct LogicalRepSequence
{
- Oid remoteid; /* unique id of the remote sequence */
- char *nspname; /* schema name of remote sequence */
- char *seqname; /* name of the remote sequence */
+ // Oid remoteid; /* unique id of the remote sequence */
+ LogicalRepRelId remoteid; /* unique id of the sequence */
+ // char *nspname; /* schema name of remote sequence */
+ // char *seqname; /* name of the remote sequence */
bool transactional;
int64 last_value;
int64 log_cnt;