From ac8da0bca98f24c1498346b85e2b2751e0aa9ef6 Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Tue, 4 Jul 2023 22:13:52 +0300
Subject: [PATCH 3/4] reuse connection when tablesync workers change the target

---
 src/backend/replication/logical/tablesync.c | 53 ++++++++++++++-------
 src/backend/replication/logical/worker.c    | 30 +++++++-----
 src/backend/replication/walsender.c         |  6 +++
 src/include/replication/worker_internal.h   |  3 +-
 4 files changed, 61 insertions(+), 31 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 605c5bd4ec..f042d9ae00 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -144,16 +144,6 @@ clean_sync_worker(void)
 		pgstat_report_stat(true);
 	}
 
-	/*
-	 * Disconnect from publisher. Otherwise reused sync workers causes
-	 * exceeding max_wal_senders
-	 */
-	if (LogRepWorkerWalRcvConn != NULL)
-	{
-		walrcv_disconnect(LogRepWorkerWalRcvConn);
-		LogRepWorkerWalRcvConn = NULL;
-	}
-
 	/* Find the leader apply worker and signal it. */
 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
 }
@@ -167,6 +157,16 @@ finish_sync_worker(void)
 {
 	clean_sync_worker();
 
+	/*
+	 * Disconnect from publisher. Otherwise reused sync workers causes
+	 * exceeding max_wal_senders.
+	 */
+	if (LogRepWorkerWalRcvConn != NULL)
+	{
+		walrcv_disconnect(LogRepWorkerWalRcvConn);
+		LogRepWorkerWalRcvConn = NULL;
+	}
+
 	/* And flush all writes. */
 	XLogFlush(GetXLogWriteRecPtr());
 
@@ -1268,7 +1268,7 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
  * The returned slot name is palloc'ed in current memory context.
  */
 char *
-LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
+LogicalRepSyncTableStart(XLogRecPtr *origin_startpos, int worker_slot)
 {
 	char	   *slotname;
 	char	   *err;
@@ -1321,14 +1321,31 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 									NAMEDATALEN);
 
 	/*
-	 * Here we use the slot name instead of the subscription name as the
-	 * application_name, so that it is different from the leader apply worker,
-	 * so that synchronous replication can distinguish them.
+	 * Connect to publisher if not yet. The application_name must be also
+	 * different from the leader apply worker because synchronous replication
+	 * must distinguish them.
 	 */
-	LogRepWorkerWalRcvConn =
-		walrcv_connect(MySubscription->conninfo, true,
-					   must_use_password,
-					   slotname, &err);
+	if (LogRepWorkerWalRcvConn == NULL)
+	{
+		char application_name[NAMEDATALEN];
+
+		/*
+		 * FIXME: set appropriate application_name. Previously, the slot name
+		 * was used becasue the lifetime of the tablesync worker was same as
+		 * that, but now the tablesync worker handles many slots during the
+		 * synchronization so that it is not suitable. So what should be?
+		 * Note that if the tablesync worker starts to reuse the replication
+		 * slot during synchronization, we should use the slot name as
+		 * application_name again.
+		 */
+		snprintf(application_name, NAMEDATALEN, "pg_%u_sync_%i",
+				 MySubscription->oid, worker_slot);
+		LogRepWorkerWalRcvConn =
+			walrcv_connect(MySubscription->conninfo, true,
+						   must_use_password,
+						   application_name, &err);
+	}
+
 	if (LogRepWorkerWalRcvConn == NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_CONNECTION_FAILURE),
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 81f7a6de66..f77bc55e34 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3486,19 +3486,21 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 	/*
 	 * Init the ApplyMessageContext which we clean up after each replication
-	 * protocol message.
+	 * protocol message, if needed.
 	 */
-	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
-												"ApplyMessageContext",
-												ALLOCSET_DEFAULT_SIZES);
+	if (!ApplyMessageContext)
+		ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+													"ApplyMessageContext",
+													ALLOCSET_DEFAULT_SIZES);
 
 	/*
 	 * This memory context is used for per-stream data when the streaming mode
 	 * is enabled. This context is reset on each stream stop.
 	 */
-	LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
-													"LogicalStreamingContext",
-													ALLOCSET_DEFAULT_SIZES);
+	if (!LogicalStreamingContext)
+		LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
+														"LogicalStreamingContext",
+														ALLOCSET_DEFAULT_SIZES);
 
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
@@ -4451,7 +4453,9 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
  * are not repeatable.
  */
 static void
-start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+start_table_sync(XLogRecPtr *origin_startpos,
+				 char **myslotname,
+				 int worker_slot)
 {
 	char	   *syncslotname = NULL;
 
@@ -4460,7 +4464,7 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
 	PG_TRY();
 	{
 		/* Call initial sync. */
-		syncslotname = LogicalRepSyncTableStart(origin_startpos);
+		syncslotname = LogicalRepSyncTableStart(origin_startpos, worker_slot);
 	}
 	PG_CATCH();
 	{
@@ -4531,12 +4535,13 @@ run_tablesync_worker(WalRcvStreamOptions *options,
 					 char *slotname,
 					 char *originname,
 					 int originname_size,
-					 XLogRecPtr *origin_startpos)
+					 XLogRecPtr *origin_startpos,
+					 int worker_slot)
 {
 	MyLogicalRepWorker->is_sync_completed = false;
 
 	/* Start table synchronization. */
-	start_table_sync(origin_startpos, &slotname);
+	start_table_sync(origin_startpos, &slotname, worker_slot);
 
 	ReplicationOriginNameForLogicalRep(MySubscription->oid,
 									   MyLogicalRepWorker->relid,
@@ -4837,7 +4842,8 @@ TablesyncWorkerMain(Datum main_arg)
 							 myslotname,
 							 originname,
 							 sizeof(originname),
-							 &origin_startpos);
+							 &origin_startpos,
+							 worker_slot);
 
 		if (IsTransactionState())
 			CommitTransactionCommand();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d27ef2985d..6e878e7bf5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1827,6 +1827,12 @@ exec_replication_command(const char *cmd_string)
 				set_ps_display(cmdtag);
 				PreventInTransactionBlock(true, cmdtag);
 
+				/*
+				 * Initialize the flag again because this streaming may be
+				 * second time.
+				 */
+				streamingDoneSending = streamingDoneReceiving = false;
+
 				if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 					StartReplication(cmd);
 				else
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1e9f8e6e72..af6fd339f7 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -249,7 +249,8 @@ extern int	logicalrep_sync_worker_count(Oid subid);
 
 extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 											   char *originname, Size szoriginname);
-extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
+extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos,
+									  int worker_slot);
 
 extern bool AllTablesyncsReady(void);
 extern void UpdateTwoPhaseState(Oid suboid, char new_state);
-- 
2.34.1

