From 524d47418b800bccc89f18b3ef2bd89ecef625b4 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 24 Feb 2022 16:56:58 +0900
Subject: [PATCH v3 2/3] Add the origin name and remote commit-LSN to logical
 replication worker errcontext.

This commits adds both the commit-LSN and replication origin name to
the existing error context message.

This will help users in specifying the origin name and commit-LSN to
pg_replication_origin_advance() SQL function to skip the particular transaction.
---
 doc/src/sgml/logical-replication.sgml    | 21 +++++--
 src/backend/replication/logical/worker.c | 73 ++++++++++++++++++------
 2 files changed, 70 insertions(+), 24 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index fb4472356d..ca0db358b0 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -352,12 +352,23 @@
   <para>
    The resolution can be done either by changing data or permissions on the subscriber so
    that it does not conflict with the incoming change or by skipping the
-   transaction that conflicts with the existing data.  The transaction can be
-   skipped by calling the <link linkend="pg-replication-origin-advance">
+   transaction that conflicts with the existing data.  When a conflict produces
+   an error, it is shown in the subscriber's server logs as follows:
+<screen>
+ERROR:  duplicate key value violates unique constraint "test_pkey"
+DETAIL:  Key (c)=(1) already exists.
+CONTEXT:  processing remote data during "INSERT" for replication target relation "public.test" in transaction 725 committed at LSN 0/14BFA88 from replication origin "pg_16395"
+</screen>
+   The LSN of the transaction that contains the change violating the constraint and
+   the replication origin name can be found from those outputs (LSN 0/14C0378 and
+   replication origin <literal>pg_16395</literal> in the above case).  To skip the
+   transaction, the subscription needs to be disabled temporarily by
+   <command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
+   can be skipped by calling the <link linkend="pg-replication-origin-advance">
    <function>pg_replication_origin_advance()</function></link> function with
-   a <parameter>node_name</parameter> corresponding to the subscription name,
-   and a position.  The current position of origins can be seen in the
-   <link linkend="view-pg-replication-origin-status">
+   the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
+   next LSN of the commit LSN (i.e., LSN 0/14C0379) from those outputs. The current
+   position of origins can be seen in the <link linkend="view-pg-replication-origin-status">
    <structname>pg_replication_origin_status</structname></link> system view.
   </para>
  </sect1>
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 92aa794706..b9d0336a34 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
 	/* Remote node information */
 	int			remote_attnum;	/* -1 if invalid */
 	TransactionId remote_xid;
+	XLogRecPtr	commit_lsn;
+	char	   *origin_name;
 } ApplyErrorCallbackArg;
 
 static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 	.rel = NULL,
 	.remote_attnum = -1,
 	.remote_xid = InvalidTransactionId,
+	.commit_lsn = InvalidXLogRecPtr,
+	.origin_name = NULL,
 };
 
 static MemoryContext ApplyMessageContext = NULL;
@@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
 /* Functions for apply error callback */
 static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
 static inline void reset_apply_error_context_info(void);
 
 /*
@@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s)
 	LogicalRepBeginData begin_data;
 
 	logicalrep_read_begin(s, &begin_data);
-	set_apply_error_context_xact(begin_data.xid);
+	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
 	remote_final_lsn = begin_data.final_lsn;
 
@@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s)
 				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
 
 	logicalrep_read_begin_prepare(s, &begin_data);
-	set_apply_error_context_xact(begin_data.xid);
+	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
 
 	remote_final_lsn = begin_data.prepare_lsn;
 
@@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s)
 	char		gid[GIDSIZE];
 
 	logicalrep_read_commit_prepared(s, &prepare_data);
-	set_apply_error_context_xact(prepare_data.xid);
+	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
 
 	/* Compute GID for two_phase transactions. */
 	TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s)
 	char		gid[GIDSIZE];
 
 	logicalrep_read_rollback_prepared(s, &rollback_data);
-	set_apply_error_context_xact(rollback_data.xid);
+	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
 
 	/* Compute GID for two_phase transactions. */
 	TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s)
 				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
 
 	logicalrep_read_stream_prepare(s, &prepare_data);
-	set_apply_error_context_xact(prepare_data.xid);
+	set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
 
 	elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
 
@@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s)
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("invalid transaction ID in streamed replication transaction")));
 
-	set_apply_error_context_xact(stream_xid);
+	set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
 
 	/*
 	 * Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s)
 	 */
 	if (xid == subxid)
 	{
-		set_apply_error_context_xact(xid);
+		set_apply_error_context_xact(xid, InvalidXLogRecPtr);
 		stream_cleanup_files(MyLogicalRepWorker->subid, xid);
 	}
 	else
@@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s)
 		bool		found = false;
 		char		path[MAXPGPATH];
 
-		set_apply_error_context_xact(subxid);
+		set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
 
 		subidx = -1;
 		begin_replication_step();
@@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s)
 				 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
 
 	xid = logicalrep_read_stream_commit(s, &commit_data);
-	set_apply_error_context_xact(xid);
+	set_apply_error_context_xact(xid, commit_data.commit_lsn);
 
 	elog(DEBUG1, "received commit for streamed transaction %u", xid);
 
@@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
 		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
 
 		pfree(syncslotname);
+
+		/*
+		 * Allocate the origin name in long-lived context for error context
+		 * message.
+		 */
+		ReplicationOriginNameForTablesync(MySubscription->oid,
+										  MyLogicalRepWorker->relid,
+										  originname,
+										  sizeof(originname));
+		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+																   originname);
 	}
 	else
 	{
@@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg)
 		 * does some initializations on the upstream so let's still call it.
 		 */
 		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+		/*
+		 * Allocate the origin name in long-lived context for error context
+		 * message.
+		 */
+		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+																   originname);
 	}
 
 	/*
@@ -3654,33 +3676,46 @@ apply_error_callback(void *arg)
 	if (errarg->rel == NULL)
 	{
 		if (!TransactionIdIsValid(errarg->remote_xid))
-			errcontext("processing remote data during \"%s\"",
+			errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
+					   errarg->origin_name,
 					   logicalrep_message_type(errarg->command));
-		else
-			errcontext("processing remote data during \"%s\" in transaction %u",
+		else if (XLogRecPtrIsInvalid(errarg->commit_lsn))
+			errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
+					   errarg->origin_name,
 					   logicalrep_message_type(errarg->command),
 					   errarg->remote_xid);
+		else
+			errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
+					   errarg->origin_name,
+					   logicalrep_message_type(errarg->command),
+					   errarg->remote_xid,
+					   LSN_FORMAT_ARGS(errarg->commit_lsn));
 	}
 	else if (errarg->remote_attnum < 0)
-		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+		errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
+				   errarg->origin_name,
 				   logicalrep_message_type(errarg->command),
 				   errarg->rel->remoterel.nspname,
 				   errarg->rel->remoterel.relname,
-				   errarg->remote_xid);
+				   errarg->remote_xid,
+				   LSN_FORMAT_ARGS(errarg->commit_lsn));
 	else
-		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+		errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
+				   errarg->origin_name,
 				   logicalrep_message_type(errarg->command),
 				   errarg->rel->remoterel.nspname,
 				   errarg->rel->remoterel.relname,
 				   errarg->rel->remoterel.attnames[errarg->remote_attnum],
-				   errarg->remote_xid);
+				   errarg->remote_xid,
+				   LSN_FORMAT_ARGS(errarg->commit_lsn));
 }
 
 /* Set transaction information of apply error callback */
 static inline void
-set_apply_error_context_xact(TransactionId xid)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
 {
 	apply_error_callback_arg.remote_xid = xid;
+	apply_error_callback_arg.commit_lsn = lsn;
 }
 
 /* Reset all information of apply error callback */
@@ -3690,5 +3725,5 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.command = 0;
 	apply_error_callback_arg.rel = NULL;
 	apply_error_callback_arg.remote_attnum = -1;
-	set_apply_error_context_xact(InvalidTransactionId);
+	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
-- 
2.24.3 (Apple Git-128)

