On 3/25/22 12:59, Tomas Vondra wrote:
> 
> On 3/25/22 12:21, Amit Kapila wrote:
>> On Fri, Mar 25, 2022 at 3:56 PM Tomas Vondra
>> <tomas.von...@enterprisedb.com> wrote:
>>>
>>>
>>> On 3/25/22 05:01, Amit Kapila wrote:
>>>> On Fri, Mar 25, 2022 at 3:29 AM Tomas Vondra
>>>> <tomas.von...@enterprisedb.com> wrote:
>>>>>
>>>>> Pushed.
>>>>>
>>>>
>>>> Some of the comments given by me [1] don't seem to be addressed or
>>>> responded to. Let me try to say again for the ease of discussion:
>>>>
>>>
>>> D'oh! I got distracted by Petr's response to that message, and missed
>>> this part ...
>>>
>>>> * Don't we need some syncing mechanism between apply worker and
>>>> sequence sync worker so that apply worker skips the sequence changes
>>>> till the sync worker is finished, otherwise, there is a risk of one
>>>> overriding the values of the other? See how we take care of this for a
>>>> table in should_apply_changes_for_rel() and its callers. If we don't
>>>> do this for sequences for some reason then probably a comment
>>>> somewhere is required.
>>>>
>>>
>>> How would that happen? If we're effectively setting the sequence as a
>>> side effect of inserting the data, then why should we even replicate the
>>> sequence?
>>>
>>
>> I was talking just about sequence values here, considering that some
>> sequence is just replicating based on nextval. I think the problem is
>> that apply worker might override what copy has done if an apply worker
>> is behind the sequence sync worker as both can run in parallel. Let me
>> try to take some theoretical example to explain this:
>>
>> Assume, at LSN 10000, the value of sequence s1 is 10. Then by LSN
>> 12000, the value of s1 becomes 20. Now, say copy decides to copy the
>> sequence value till LSN 12000 which means it will make the value as 20
>> on the subscriber, now, in parallel, apply worker can process LSN
>> 10000 and make it again 10. Apply worker might end up redoing all
>> sequence operations along with some transactional ones where we
>> recreate the file. I am not sure what exact problem it can lead to but
>> I think we don't need to redo the work.
>>
>>  We'll have the problem later too, no?
>>
> 
> Ah, I was confused why this would be an issue for sequences and not for
> plain tables, but now I realize apply_handle_sequence() is not called in
> apply_handle_sequence. Yes, that's probably a thinko.
> 

Hmm, so fixing this might be a bit trickier than I expected.

Firstly, currently we only send nspname/relname in the sequence message,
not the remote OID or schema. The idea was that for sequences we don't
really need schema info, so this seemed OK.

But should_apply_changes_for_rel() needs LogicalRepRelMapEntry, and to
create/maintain that those records we need to send the schema.

Attached is a WIP patch does that.

Two places need more work, I think:

1) maybe_send_schema needs ReorderBufferChange, but we don't have that
for sequences, we only have TXN. I created a simple wrapper, but maybe
we should just tweak maybe_send_schema to use TXN.

2) The transaction handling in is a bit confusing. The non-transactional
increments won't have any explicit commit later, so we can't just rely
on begin_replication_step/end_replication_step. But I want to try
spending a bit more time on this.


But there's a more serious issue, I think. So far, we allowed this:

  BEGIN;
  CREATE SEQUENCE s2;
  ALTER PUBLICATION p ADD SEQUENCE s2;
  INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100);
  COMMIT;

and the behavior was that we replicated the changes. But with the patch
applied, that no longer happens, because should_apply_changes_for_rel
says the change should not be applied.

And after thinking about this, I think that's correct - we can't apply
changes until ALTER SUBSCRIPTION ... REFRESH PUBLICATION gets executed,
and we can't do that until the transaction commits.

So I guess that's correct, and the current behavior is a bug.

For a while I was thinking that maybe this means we don't need the
transactional behavior at all, but I think we do - we have to handle
ALTER SEQUENCE cases that are transactional.

Does that make sense, Amit?


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 3dbe85d61a..0ae0378191 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -657,7 +657,6 @@ logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
 						  int64 last_value, int64 log_cnt, bool is_called)
 {
 	uint8		flags = 0;
-	char	   *relname;
 
 	pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
 
@@ -668,9 +667,8 @@ logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
 	pq_sendint8(out, flags);
 	pq_sendint64(out, lsn);
 
-	logicalrep_write_namespace(out, RelationGetNamespace(rel));
-	relname = RelationGetRelationName(rel);
-	pq_sendstring(out, relname);
+	/* OID ad sequence identifier */
+	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendint8(out, transactional);
 	pq_sendint64(out, last_value);
@@ -688,9 +686,8 @@ logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
 	pq_getmsgint(in, 1);
 	pq_getmsgint64(in);
 
-	/* Read relation name from stream */
-	seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
-	seqdata->seqname = pstrdup(pq_getmsgstring(in));
+	/* Read relation OID from stream */
+	seqdata->remoteid = pq_getmsgint(in, 4);
 
 	seqdata->transactional = pq_getmsgint(in, 1);
 	seqdata->last_value = pq_getmsgint64(in);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f3868b3e1f..d2acb08996 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1151,7 +1151,8 @@ static void
 apply_handle_sequence(StringInfo s)
 {
 	LogicalRepSequence	seq;
-	Oid					relid;
+	LogicalRepRelMapEntry *rel;
+	LOCKMODE			lockmode;
 
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
 		return;
@@ -1167,9 +1168,8 @@ apply_handle_sequence(StringInfo s)
 	Assert(!(!seq.transactional && IsTransactionState()));
 
 	/*
-	 * Make sure we're in a transaction (needed by SetSequence). For
-	 * non-transactional updates we're guaranteed to start a new one,
-	 * and we'll commit it at the end.
+	 * Make sure there's a transaction (the non-transactional case may can't
+	 * have one yet).
 	 */
 	if (!IsTransactionState())
 	{
@@ -1177,15 +1177,38 @@ apply_handle_sequence(StringInfo s)
 		maybe_reread_subscription();
 	}
 
-	relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
-										  seq.seqname, -1),
-							 RowExclusiveLock, false);
+	/*
+	 * Use the necessary lock mode. For transactional changes we need
+	 * AccessExclusiveLock (just like ResetSequence), while for regular
+	 * updates RowExclusiveLock is enough).
+	 */
+	lockmode = (seq.transactional ? AccessExclusiveLock : RowExclusiveLock);
 
-	/* lock the sequence in AccessExclusiveLock, as expected by SetSequence */
-	LockRelationOid(relid, AccessExclusiveLock);
+	rel = logicalrep_rel_open(seq.remoteid, lockmode);
+
+	if (!should_apply_changes_for_rel(rel))
+	{
+		/*
+		 * The relation can't become interesting in the middle of the
+		 * transaction so it's safe to unlock it.
+		 */
+		logicalrep_rel_close(rel, RowExclusiveLock);
+
+		/*
+		 * In non-transactional case there won't be any other changes, so
+		 * abort the whole transaction.
+		 */
+		if (!seq.transactional)
+			AbortCurrentTransaction();
+
+		return;
+	}
 
 	/* apply the sequence change */
-	SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
+	SetSequence(RelationGetRelid(rel->localrel),
+				seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
+
+	logicalrep_rel_close(rel, NoLock);
 
 	/*
 	 * Commit the per-stream transaction (we only do this when not in
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 4cdc698cbb..366faf9564 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -568,9 +568,9 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * done yet.
  */
 static void
-maybe_send_schema(LogicalDecodingContext *ctx,
-				  ReorderBufferChange *change,
-				  Relation relation, RelationSyncEntry *relentry)
+maybe_send_schema_txn(LogicalDecodingContext *ctx,
+					  ReorderBufferTXN *txn,
+					  Relation relation, RelationSyncEntry *relentry)
 {
 	bool		schema_sent;
 	TransactionId xid = InvalidTransactionId;
@@ -585,10 +585,10 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 	 * the write methods will not include it.
 	 */
 	if (in_streaming)
-		xid = change->txn->xid;
+		xid = txn->xid;
 
-	if (change->txn->toptxn)
-		topxid = change->txn->toptxn->xid;
+	if (txn->toptxn)
+		topxid = txn->toptxn->xid;
 	else
 		topxid = xid;
 
@@ -634,6 +634,14 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 		relentry->schema_sent = true;
 }
 
+static void
+maybe_send_schema(LogicalDecodingContext *ctx,
+				  ReorderBufferChange *change,
+				  Relation relation, RelationSyncEntry *relentry)
+{
+	maybe_send_schema_txn(ctx, change->txn, relation, relentry);
+}
+
 /*
  * Sends a relation
  */
@@ -1492,6 +1500,8 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
 	if (!relentry->pubactions.pubsequence)
 		return;
 
+	maybe_send_schema_txn(ctx, txn, relation, relentry);
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_sequence(ctx->out,
 							  relation,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index fb86ca022d..b7ab68d9a2 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -122,9 +122,10 @@ typedef struct LogicalRepTyp
 /* Sequence info */
 typedef struct LogicalRepSequence
 {
-	Oid			remoteid;		/* unique id of the remote sequence */
-	char	   *nspname;		/* schema name of remote sequence */
-	char	   *seqname;		/* name of the remote sequence */
+	// Oid			remoteid;		/* unique id of the remote sequence */
+	LogicalRepRelId remoteid;	/* unique id of the sequence */
+	// char	   *nspname;		/* schema name of remote sequence */
+	// char	   *seqname;		/* name of the remote sequence */
 	bool		transactional;
 	int64		last_value;
 	int64		log_cnt;

Reply via email to