From 6c449c8c44fc21398d23e68aeb58783ff3fe3550 Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Tue, 4 Jul 2023 22:13:52 +0300
Subject: [PATCH v24 3/3] Reuse connection when tablesync workers change the
 target

Previously tablesync workers establish new connections when it changes the syncing
table, but this might have additional overhead. This patch allows the existing
connection to be reused.

As for the publisher node, this patch allows to reuse logical walsender processes
after the streaming is done once.
---
 src/backend/replication/logical/launcher.c  |  1 +
 src/backend/replication/logical/tablesync.c | 60 ++++++++++++++-------
 src/backend/replication/logical/worker.c    | 18 ++++---
 src/backend/replication/walsender.c         |  6 +++
 src/include/replication/worker_internal.h   |  3 ++
 5 files changed, 61 insertions(+), 27 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 25dd06b8af..b19437f9d0 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -441,6 +441,7 @@ retry:
 	worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
 	worker->parallel_apply = is_parallel_apply_worker;
 	worker->relsync_completed = false;
+	worker->slot_number = slot;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2ce3ae67c7..a49b67243e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -151,16 +151,6 @@ finish_sync_worker(bool reuse_worker)
 		pgstat_report_stat(true);
 	}
 
-	/*
-	 * Disconnect from the publisher otherwise reusing the sync worker can
-	 * error due to exceeding max_wal_senders.
-	 */
-	if (LogRepWorkerWalRcvConn != NULL)
-	{
-		walrcv_disconnect(LogRepWorkerWalRcvConn);
-		LogRepWorkerWalRcvConn = NULL;
-	}
-
 	/* And flush all writes. */
 	XLogFlush(GetXLogWriteRecPtr());
 
@@ -1264,6 +1254,27 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
 			 relid, GetSystemIdentifier());
 }
 
+/*
+ * Determine the application_name for tablesync workers.
+ *
+ * Previously, the replication slot name was used as application_name. Since
+ * it's possible to reuse tablesync workers now, a tablesync worker can handle
+ * several different replication slots during its lifetime. Therefore, we
+ * cannot use the slot name as application_name anymore. Instead, the slot
+ * number of the tablesync worker is used as a part of the application_name.
+ *
+ * XXX: if the tablesync worker starts to reuse the replication slot during
+ * synchronization, we should again use the replication slot name as
+ * application_name.
+ */
+static void
+ApplicationNameForTablesync(Oid suboid, int worker_slot,
+							char *application_name, Size szapp)
+{
+	snprintf(application_name, szapp, "pg_%u_sync_%i_" UINT64_FORMAT, suboid,
+			 worker_slot, GetSystemIdentifier());
+}
+
 /*
  * Start syncing the table in the sync worker.
  *
@@ -1325,15 +1336,26 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 									slotname,
 									NAMEDATALEN);
 
-	/*
-	 * Here we use the slot name instead of the subscription name as the
-	 * application_name, so that it is different from the leader apply worker,
-	 * so that synchronous replication can distinguish them.
-	 */
-	LogRepWorkerWalRcvConn =
-		walrcv_connect(MySubscription->conninfo, true,
-					   must_use_password,
-					   slotname, &err);
+	/* Connect to the publisher if haven't done so already. */
+	if (LogRepWorkerWalRcvConn == NULL)
+	{
+		char application_name[NAMEDATALEN];
+
+		/*
+		 * The application_name must differ from the subscription name (used by
+		 * the leader apply worker) because synchronous replication has to be
+		 * able to distinguish this worker from the leader apply worker.
+		 */
+		ApplicationNameForTablesync(MySubscription->oid,
+									MyLogicalRepWorker->slot_number,
+									application_name,
+									NAMEDATALEN);
+		LogRepWorkerWalRcvConn =
+			walrcv_connect(MySubscription->conninfo, true,
+						   must_use_password,
+						   application_name, &err);
+	}
+
 	if (LogRepWorkerWalRcvConn == NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_CONNECTION_FAILURE),
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 02c04ccafd..958b32b458 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3494,20 +3494,22 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	ErrorContextCallback errcallback;
 
 	/*
-	 * Init the ApplyMessageContext which we clean up after each replication
-	 * protocol message.
+	 * Init the ApplyMessageContext if needed. This context is cleaned up
+	 * after each replication protocol message.
 	 */
-	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
-												"ApplyMessageContext",
-												ALLOCSET_DEFAULT_SIZES);
+	if (!ApplyMessageContext)
+		ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+													"ApplyMessageContext",
+													ALLOCSET_DEFAULT_SIZES);
 
 	/*
 	 * This memory context is used for per-stream data when the streaming mode
 	 * is enabled. This context is reset on each stream stop.
 	 */
-	LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
-													"LogicalStreamingContext",
-													ALLOCSET_DEFAULT_SIZES);
+	if (!LogicalStreamingContext)
+		LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
+														"LogicalStreamingContext",
+														ALLOCSET_DEFAULT_SIZES);
 
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d27ef2985d..1d2a87cdcd 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1808,6 +1808,12 @@ exec_replication_command(const char *cmd_string)
 		case T_CreateReplicationSlotCmd:
 			cmdtag = "CREATE_REPLICATION_SLOT";
 			set_ps_display(cmdtag);
+
+			/*
+			 * Reset flags because reusing tablesync workers can mean
+			 * this is the second time here.
+			 */
+			streamingDoneSending = streamingDoneReceiving = false;
 			CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
 			EndReplicationCommand(cmdtag);
 			break;
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 39b1721dee..94169e07b4 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -39,6 +39,9 @@ typedef struct LogicalRepWorker
 	/* Increased every time the slot is taken by new worker. */
 	uint16		generation;
 
+	/* Slot number of this worker. */
+	int			slot_number;
+
 	/* Pointer to proc array. NULL if not running. */
 	PGPROC	   *proc;
 
-- 
2.25.1

