From a8d3a4a61367319cb23bd61e72ea85bda81a2739 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 3 Jun 2025 12:52:22 -0700
Subject: [PATCH v3 1/3] Enable logical decoding dynamically based on logical
 slot presence.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
---
 src/backend/access/transam/xlog.c             | 107 ++++-
 src/backend/commands/publicationcmds.c        |   7 +-
 src/backend/replication/logical/Makefile      |   1 +
 src/backend/replication/logical/decode.c      |  48 +--
 src/backend/replication/logical/logical.c     |  34 +-
 src/backend/replication/logical/logicalctl.c  | 405 ++++++++++++++++++
 src/backend/replication/logical/meson.build   |   1 +
 src/backend/replication/logical/slotsync.c    |  14 +-
 src/backend/replication/slot.c                |  24 +-
 src/backend/replication/slotfuncs.c           |   7 +-
 src/backend/replication/walsender.c           |   2 +
 src/backend/storage/ipc/ipci.c                |   3 +
 src/backend/storage/ipc/procsignal.c          |   4 +
 src/backend/storage/ipc/standby.c             |   7 +-
 .../utils/activity/wait_event_names.txt       |   2 +
 src/backend/utils/init/postinit.c             |   4 +
 src/backend/utils/misc/guc_tables.c           |  11 +
 src/include/access/xlog.h                     |   5 +-
 src/include/catalog/pg_control.h              |   3 +
 src/include/replication/logicalctl.h          |  59 +++
 src/include/replication/slot.h                |   3 +
 src/include/storage/lwlocklist.h              |   1 +
 src/include/storage/procsignal.h              |   2 +
 src/include/utils/guc_hooks.h                 |   1 +
 .../t/035_standby_logical_decoding.pl         |   2 +-
 src/test/subscription/t/001_rep_changes.pl    |   2 +-
 26 files changed, 683 insertions(+), 76 deletions(-)
 create mode 100644 src/backend/replication/logical/logicalctl.c
 create mode 100644 src/include/replication/logicalctl.h

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8e7827c6ed9..260d43f7ff0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -78,6 +78,7 @@
 #include "postmaster/walsummarizer.h"
 #include "postmaster/walwriter.h"
 #include "replication/origin.h"
+#include "replication/logicalctl.h"
 #include "replication/slot.h"
 #include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
@@ -141,6 +142,7 @@ bool		XLOG_DEBUG = false;
 #endif
 
 int			wal_segment_size = DEFAULT_XLOG_SEG_SIZE;
+int			effective_wal_level = WAL_LEVEL_REPLICA;
 
 /*
  * Number of WAL insertion locks to use. A higher value allows more insertions
@@ -4991,6 +4993,48 @@ show_in_hot_standby(void)
 	return RecoveryInProgress() ? "on" : "off";
 }
 
+/*
+ * GUC show_hook for effective_wal_level
+ */
+const char *
+show_effective_wal_level(void)
+{
+	char	   *str;
+
+	if (wal_level == WAL_LEVEL_MINIMAL)
+		return "minimal";
+
+	/*
+	 * During the recovery, we synchronously update the XLogLogicalInfo so
+	 * need to check the shared state.
+	 */
+	if (RecoveryInProgress())
+		return IsXLogLogicalInfoEnabled() ? "logical" : "replica";
+
+	if (wal_level == WAL_LEVEL_REPLICA)
+	{
+		bool		in_transition;
+
+		/* check if the logical decoding status is being changed */
+		LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
+		in_transition = LogicalDecodingCtl->transition_in_progress;
+		LWLockRelease(LogicalDecodingControlLock);
+
+		/*
+		 * With wal_level='replica', XLogLogicalInfo indicates the actual WAL
+		 * level unless we're in the status change.
+		 */
+		if (XLogLogicalInfo && !in_transition)
+			str = "logical";
+		else
+			str = "replica";
+	}
+	else
+		str = "logical";
+
+	return str;
+}
+
 /*
  * Read the control file, set respective GUCs.
  *
@@ -5758,6 +5802,12 @@ StartupXLOG(void)
 	 */
 	RelationCacheInitFileRemove();
 
+	/*
+	 * Startup the logical decoding status, needs to be setup before
+	 * initializing replication slots as it requires logical decoding status.
+	 */
+	StartupLogicalDecodingStatus(ControlFile->logicalDecodingEnabled);
+
 	/*
 	 * Initialize replication slots, before there's a chance to remove
 	 * required resources.
@@ -6293,6 +6343,8 @@ StartupXLOG(void)
 	Insert->fullPageWrites = lastFullPageWrites;
 	UpdateFullPageWrites();
 
+	UpdateLogicalDecodingStatusEndOfRecovery();
+
 	/*
 	 * Emit checkpoint or end-of-recovery record in XLOG, if required.
 	 */
@@ -7432,6 +7484,8 @@ CreateCheckPoint(int flags)
 	 */
 	ControlFile->unloggedLSN = pg_atomic_read_membarrier_u64(&XLogCtl->unloggedLSN);
 
+	ControlFile->logicalDecodingEnabled = IsLogicalDecodingEnabled();
+
 	UpdateControlFile();
 	LWLockRelease(ControlFileLock);
 
@@ -8659,19 +8713,26 @@ xlog_redo(XLogReaderState *record)
 		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
 
 		/*
-		 * Invalidate logical slots if we are in hot standby and the primary
-		 * does not have a WAL level sufficient for logical decoding. No need
-		 * to search for potentially conflicting logically slots if standby is
-		 * running with wal_level lower than logical, because in that case, we
-		 * would have either disallowed creation of logical slots or
-		 * invalidated existing ones.
+		 * Change the logical decoding status upon wal_level change on the
+		 * primary server.
 		 */
-		if (InRecovery && InHotStandby &&
-			xlrec.wal_level < WAL_LEVEL_LOGICAL &&
-			wal_level >= WAL_LEVEL_LOGICAL)
-			InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
-											   0, InvalidOid,
-											   InvalidTransactionId);
+		if (xlrec.wal_level == WAL_LEVEL_LOGICAL)
+		{
+			/*
+			 * If the primary increase WAL level to 'logical', we can
+			 * unconditionally enable the logical decoding on the standby.
+			 */
+			UpdateLogicalDecodingStatus(true);
+		}
+		else if (xlrec.wal_level == WAL_LEVEL_REPLICA &&
+				 pg_atomic_read_u32(&ReplicationSlotCtl->n_inuse_logical_slots) == 0)
+		{
+			/*
+			 * Disable the logical decoding if there is no in-use logical slot
+			 * on the standby.
+			 */
+			UpdateLogicalDecodingStatus(false);
+		}
 
 		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
 		ControlFile->MaxConnections = xlrec.MaxConnections;
@@ -8681,6 +8742,7 @@ xlog_redo(XLogReaderState *record)
 		ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;
 		ControlFile->wal_level = xlrec.wal_level;
 		ControlFile->wal_log_hints = xlrec.wal_log_hints;
+		ControlFile->logicalDecodingEnabled = IsLogicalDecodingEnabled();
 
 		/*
 		 * Update minRecoveryPoint to ensure that if recovery is aborted, we
@@ -8740,6 +8802,27 @@ xlog_redo(XLogReaderState *record)
 	{
 		/* nothing to do here, just for informational purposes */
 	}
+	else if (info == XLOG_LOGICAL_DECODING_STATUS_CHANGE)
+	{
+		bool		logical_decoding;
+
+		memcpy(&logical_decoding, XLogRecGetData(record), sizeof(bool));
+		UpdateLogicalDecodingStatus(logical_decoding);
+
+		/*
+		 * Invalidate logical slots if we are in hot standby and the primary
+		 * disabled the logical decoding.
+		 */
+		if (!logical_decoding && InRecovery && InHotStandby)
+			InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
+											   0, InvalidOid,
+											   InvalidTransactionId);
+
+		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+		ControlFile->logicalDecodingEnabled = logical_decoding;
+		UpdateControlFile();
+		LWLockRelease(ControlFileLock);
+	}
 }
 
 /*
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 1bf7eaae5b3..f21fddd2722 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -38,6 +38,7 @@
 #include "parser/parse_clause.h"
 #include "parser/parse_collate.h"
 #include "parser/parse_relation.h"
+#include "replication/logicalctl.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/lmgr.h"
 #include "utils/acl.h"
@@ -960,11 +961,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 
 	InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
 
-	if (wal_level != WAL_LEVEL_LOGICAL)
+	if (!IsLogicalDecodingEnabled())
 		ereport(WARNING,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("\"wal_level\" is insufficient to publish logical changes"),
-				 errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
+				 errmsg("logical decoding needs to be enabled to publish logical changes"),
+				 errhint("Set \"wal_level\" to \"logical\" or create a logical replication slot with \"replica\" \"wal_level\" before creating subscriptions.")));
 
 	return myself;
 }
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 1e08bbbd4eb..50ec127e9ef 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -20,6 +20,7 @@ OBJS = \
 	decode.o \
 	launcher.o \
 	logical.o \
+	logicalctl.o \
 	logicalfuncs.o \
 	message.o \
 	origin.o \
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index cc03f0706e9..269be167be5 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -150,21 +150,30 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 */
 			break;
 		case XLOG_PARAMETER_CHANGE:
+
+			/*
+			 * XXX: even if wal_level on the primary got decreased to
+			 * 'replica' it doesn't necessarily mean to disable the logical
+			 * decoding as long as we have at least one logical slot. So we
+			 * can ignore wal_level change here.
+			 */
+			break;
+		case XLOG_NOOP:
+		case XLOG_NEXTOID:
+		case XLOG_SWITCH:
+		case XLOG_BACKUP_END:
+		case XLOG_RESTORE_POINT:
+		case XLOG_FPW_CHANGE:
+		case XLOG_FPI_FOR_HINT:
+		case XLOG_FPI:
+		case XLOG_OVERWRITE_CONTRECORD:
+		case XLOG_CHECKPOINT_REDO:
+			break;
+		case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
 			{
-				xl_parameter_change *xlrec =
-					(xl_parameter_change *) XLogRecGetData(buf->record);
+				bool	   *logical_decoding = (bool *) XLogRecGetData(buf->record);
 
-				/*
-				 * If wal_level on the primary is reduced to less than
-				 * logical, we want to prevent existing logical slots from
-				 * being used.  Existing logical slots on the standby get
-				 * invalidated when this WAL record is replayed; and further,
-				 * slot creation fails when wal_level is not sufficient; but
-				 * all these operations are not synchronized, so a logical
-				 * slot may creep in while the wal_level is being reduced.
-				 * Hence this extra check.
-				 */
-				if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+				if (!(*logical_decoding))
 				{
 					/*
 					 * This can occur only on a standby, as a primary would
@@ -174,20 +183,9 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 					Assert(RecoveryInProgress());
 					ereport(ERROR,
 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-							 errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
+							 errmsg("logical decoding must be enabled on the primary")));
 				}
-				break;
 			}
-		case XLOG_NOOP:
-		case XLOG_NEXTOID:
-		case XLOG_SWITCH:
-		case XLOG_BACKUP_END:
-		case XLOG_RESTORE_POINT:
-		case XLOG_FPW_CHANGE:
-		case XLOG_FPI_FOR_HINT:
-		case XLOG_FPI:
-		case XLOG_OVERWRITE_CONTRECORD:
-		case XLOG_CHECKPOINT_REDO:
 			break;
 		default:
 			elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 7e363a7c05b..236f7508309 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -36,6 +36,7 @@
 #include "pgstat.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/logicalctl.h"
 #include "replication/reorderbuffer.h"
 #include "replication/slotsync.h"
 #include "replication/snapbuild.h"
@@ -117,31 +118,24 @@ CheckLogicalDecodingRequirements(void)
 	 * needs the same check.
 	 */
 
-	if (wal_level < WAL_LEVEL_LOGICAL)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("logical decoding requires \"wal_level\" >= \"logical\"")));
+	if (!IsLogicalDecodingEnabled())
+	{
+		if (RecoveryInProgress())
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical decoding needs to be enabled on the primary"),
+					 errhint("Set \"wal_level\" >= \"logical\" or create at least one logical slot on the primary .")));
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical decoding is not enabled"),
+					 errhint("Set \"wal_level\" >= \"logical\" or create at least one logical slot.")));
+	}
 
 	if (MyDatabaseId == InvalidOid)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires a database connection")));
-
-	if (RecoveryInProgress())
-	{
-		/*
-		 * This check may have race conditions, but whenever
-		 * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
-		 * verify that there are no existing logical replication slots. And to
-		 * avoid races around creating a new slot,
-		 * CheckLogicalDecodingRequirements() is called once before creating
-		 * the slot, and once when logical decoding is initially starting up.
-		 */
-		if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
-	}
 }
 
 /*
diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c
new file mode 100644
index 00000000000..320a43b0574
--- /dev/null
+++ b/src/backend/replication/logical/logicalctl.c
@@ -0,0 +1,405 @@
+/*-------------------------------------------------------------------------
+ * logicalctl.c
+ *		Functionality to control logical decoding status.
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/replication/logical/logicalctl.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/xlog_internal.h"
+#include "access/xlogutils.h"
+#include "access/xloginsert.h"
+#include "catalog/pg_control.h"
+#include "port/atomics.h"
+#include "miscadmin.h"
+#include "storage/lwlock.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/shmem.h"
+#include "storage/standby.h"
+#include "replication/logicalctl.h"
+#include "replication/slot.h"
+#include "utils/guc.h"
+#include "utils/wait_event_types.h"
+
+LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
+
+/*
+ * Process local cache of LogicalDecodingCtl->xlog_logical_info. This is
+ * initialized at process startup time, and could be updated when absorbing
+ * process barrier in ProcessBarrierUpdateXLogLogicalInfo().
+ */
+bool		XLogLogicalInfo = false;
+
+Size
+LogicalDecodingCtlShmemSize(void)
+{
+	return sizeof(LogicalDecodingCtlData);
+}
+
+void
+LogicalDecodingCtlShmemInit(void)
+{
+	bool		found;
+
+	LogicalDecodingCtl = ShmemInitStruct("Logical information control",
+										 LogicalDecodingCtlShmemSize(),
+										 &found);
+
+	if (!found)
+	{
+		LogicalDecodingCtl->transition_in_progress = false;
+		ConditionVariableInit(&LogicalDecodingCtl->transition_cv);
+		pg_atomic_init_flag(&LogicalDecodingCtl->xlog_logical_info);
+		pg_atomic_init_flag(&LogicalDecodingCtl->logical_decoding_enabled);
+	}
+}
+
+/*
+ * Initialize the logical decoding status on shmem at server startup. This
+ * must be called ONCE during postmaster or standalone-backend startup,
+ * before initializing replication slots.
+ */
+void
+StartupLogicalDecodingStatus(bool status_in_control_file)
+{
+	if (wal_level == WAL_LEVEL_MINIMAL)
+		return;
+
+	/*
+	 * If the logical decoding was enabled before the last shutdown, we
+	 * continue enabling it as we might have set wal_level='logical' or have a
+	 * few logical slots. On primary, wal_level setting can overwrite the
+	 * status.
+	 */
+	if (status_in_control_file ||
+		(!StandbyMode && wal_level >= WAL_LEVEL_LOGICAL))
+	{
+		pg_atomic_test_set_flag(&(LogicalDecodingCtl->logical_decoding_enabled));
+		pg_atomic_test_set_flag(&(LogicalDecodingCtl->xlog_logical_info));
+	}
+}
+
+/*
+ * Update the XLogLogicalInfo cache.
+ */
+static void
+update_xlog_logical_info(void)
+{
+	XLogLogicalInfo = IsXLogLogicalInfoEnabled();
+}
+
+/*
+ * Initialize XLogLogicalInfo backend-private cache.
+ */
+void
+InitializeProcessXLogLogicalInfo(void)
+{
+	update_xlog_logical_info();
+}
+
+bool
+ProcessBarrierUpdateXLogLogicalInfo(void)
+{
+	update_xlog_logical_info();
+	return true;
+}
+
+/*
+ * Check the shared memory state and return true if the logical decoding is
+ * enabled on the system.
+ */
+bool
+IsLogicalDecodingEnabled(void)
+{
+	return !pg_atomic_unlocked_test_flag(&(LogicalDecodingCtl->logical_decoding_enabled));
+}
+
+/*
+ * Check the shared memory state and return true if WAL logging logical
+ * information is enabled.
+ */
+bool
+IsXLogLogicalInfoEnabled(void)
+{
+	return !pg_atomic_unlocked_test_flag(&(LogicalDecodingCtl->xlog_logical_info));
+}
+
+/*
+ * Enable/Disable both status of logical info WAL logging and logical decoding
+ * on shared memory based on the given new status.
+ */
+
+void
+UpdateLogicalDecodingStatus(bool new_status)
+{
+	if (new_status)
+	{
+		pg_atomic_test_set_flag(&(LogicalDecodingCtl->xlog_logical_info));
+		pg_atomic_test_set_flag(&(LogicalDecodingCtl->logical_decoding_enabled));
+	}
+	else
+	{
+		pg_atomic_clear_flag(&(LogicalDecodingCtl->logical_decoding_enabled));
+		pg_atomic_clear_flag(&(LogicalDecodingCtl->xlog_logical_info));
+	}
+
+	elog(DEBUG1, "update logical decoding status to %d", new_status);
+}
+
+/*
+ * A PG_ENSURE_ERROR_CLEANUP callback for making the logical decoding enabled.
+ */
+static void
+abort_enabling_logical_decoding(int code, Datum arg)
+{
+	Assert(LogicalDecodingCtl->transition_in_progress);
+
+	elog(DEBUG1, "aborting the process of enabling logical decoding");
+
+	pg_atomic_clear_flag(&(LogicalDecodingCtl->logical_decoding_enabled));
+	pg_atomic_clear_flag(&(LogicalDecodingCtl->xlog_logical_info));
+
+	/* XXX really no need to wait here? */
+	EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
+
+	LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+	LogicalDecodingCtl->transition_in_progress = false;
+	LWLockRelease(LogicalDecodingControlLock);
+
+	/* Let waiters know the WAL level change completed */
+	ConditionVariableBroadcast(&LogicalDecodingCtl->transition_cv);
+}
+
+/*
+ * Enable the logical decoding if disabled.
+ *
+ * Note that there is no interlock between logical decoding [de]activation
+ * and the slot creation. To ensure enabling the logical decoding the caller
+ * must create a logical slot first without initializing the logical decoding
+ * context and call this function.
+ */
+void
+EnsureLogicalDecodingEnabled(void)
+{
+	if (IsLogicalDecodingEnabled())
+		return;
+
+	/* Standby cannot change the logical decoding status */
+	if (RecoveryInProgress())
+		return;
+
+retry:
+	LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+
+	if (LogicalDecodingCtl->transition_in_progress)
+	{
+		LWLockRelease(LogicalDecodingControlLock);
+
+		/* Wait for someone to complete the transition */
+		ConditionVariableSleep(&LogicalDecodingCtl->transition_cv,
+							   WAIT_EVENT_LOGICAL_DECODING_STATUS_CHANGE);
+
+		goto retry;
+	}
+
+	if (IsLogicalDecodingEnabled())
+	{
+		LWLockRelease(LogicalDecodingControlLock);
+		return;
+	}
+
+	LogicalDecodingCtl->transition_in_progress = true;
+	LWLockRelease(LogicalDecodingControlLock);
+
+	PG_ENSURE_ERROR_CLEANUP(abort_enabling_logical_decoding, (Datum) 0);
+	{
+		RunningTransactions running;
+
+		/*
+		 * Set logical info WAL logging on the shmem. All process starts after
+		 * this point will include the information required by the logical
+		 * decoding to WAL records.
+		 */
+		pg_atomic_test_set_flag(&(LogicalDecodingCtl->xlog_logical_info));
+
+		running = GetRunningTransactionData();
+		LWLockRelease(ProcArrayLock);
+		LWLockRelease(XidGenLock);
+
+		/*
+		 * Order all running processes to reflect the xlog_logical_info
+		 * update, and wait.
+		 */
+		WaitForProcSignalBarrier(
+								 EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
+
+		/*
+		 * Wait for all running transactions to finish as some transaction
+		 * might have started with the old state.
+		 */
+		for (int i = 0; i < running->xcnt; i++)
+		{
+			TransactionId xid = running->xids[i];
+
+			if (TransactionIdIsCurrentTransactionId(xid))
+				continue;
+
+			XactLockTableWait(xid, NULL, NULL, XLTW_None);
+		}
+
+		/*
+		 * Here, we can ensure that all running transactions are using the new
+		 * xlog_logical_info value, writing logical information to WAL
+		 * records. So now enable the logical decoding globally.
+		 */
+		pg_atomic_test_set_flag(&(LogicalDecodingCtl->logical_decoding_enabled));
+
+		LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+		LogicalDecodingCtl->transition_in_progress = false;
+		LWLockRelease(LogicalDecodingControlLock);
+	}
+	PG_END_ENSURE_ERROR_CLEANUP(abort_enabling_logical_decoding, (Datum) 0);
+
+	/* Let waiters know the work finished */
+	ConditionVariableBroadcast(&LogicalDecodingCtl->transition_cv);
+
+	if (XLogStandbyInfoActive() && !RecoveryInProgress())
+	{
+		XLogRecPtr	recptr;
+		bool		logical_decoding = true;
+
+		XLogBeginInsert();
+		XLogRegisterData(&logical_decoding, sizeof(bool));
+		recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE);
+		XLogFlush(recptr);
+	}
+
+	ereport(LOG,
+			(errmsg("logical decoding is enabled upon creating a new logical replication slot")));
+}
+
+/*
+ * Disable the logical decoding if enabled.
+ *
+ * XXX: This function could write a WAL record in order to tell the standbys
+ * know the logical decoding got disabled. However, we need to note that this
+ * function could be called during process exits (e.g., by ReplicationSlotCleanup()
+ * via before_shmem_exit callbacks), which looks something that we want to
+ * avoid.
+ */
+void
+DisableLogicalDecodingIfNecessary(void)
+{
+	/* Standby cannot change the logical decoding status */
+	if (RecoveryInProgress())
+		return;
+
+	if (wal_level >= WAL_LEVEL_LOGICAL || !IsLogicalDecodingEnabled())
+		return;
+
+	if (pg_atomic_read_u32(&ReplicationSlotCtl->n_inuse_logical_slots) > 0)
+		return;
+
+	if (XLogStandbyInfoActive() && !RecoveryInProgress())
+	{
+		bool		logical_decoding = false;
+		XLogRecPtr	recptr;
+
+		XLogBeginInsert();
+		XLogRegisterData(&logical_decoding, sizeof(bool));
+		recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE);
+		XLogFlush(recptr);
+	}
+
+	LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+	LogicalDecodingCtl->transition_in_progress = true;
+	LWLockRelease(LogicalDecodingControlLock);
+
+	pg_atomic_clear_flag(&(LogicalDecodingCtl->logical_decoding_enabled));
+	pg_atomic_clear_flag(&(LogicalDecodingCtl->xlog_logical_info));
+
+	/*
+	 * XXX is it okay not to wait for the signal to be absorbed?
+	 */
+	EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
+
+	/* XXX need to wait for transaction finishes? */
+
+	LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+	LogicalDecodingCtl->transition_in_progress = false;
+	LWLockRelease(LogicalDecodingControlLock);
+
+	/* Let waiters know the work finished */
+	ConditionVariableBroadcast(&LogicalDecodingCtl->transition_cv);
+
+	ereport(LOG,
+			(errmsg("logical decoding is disabled because all logical replication slots are removed")));
+}
+
+/*
+ * Update the logical decoding status at end of the recovery. This function
+ * must be called ONCE before accepting writes.
+ */
+void
+UpdateLogicalDecodingStatusEndOfRecovery(void)
+{
+	bool		new_status = false;
+
+	if (!IsUnderPostmaster)
+		return;
+
+	if (wal_level == WAL_LEVEL_MINIMAL)
+		return;
+
+#ifdef USE_ASSERT_CHECKING
+	{
+		bool		xlog_logical_info = IsXLogLogicalInfoEnabled();
+		bool		logical_decoding = IsLogicalDecodingEnabled();
+
+		/* Verify we're not in intermediate status */
+		Assert(xlog_logical_info == logical_decoding);
+	}
+#endif
+
+	/*
+	 * We can use logical decoding if we're using 'logical' WAL level or there
+	 * is at least one logical replication slot.
+	 */
+	if (wal_level == WAL_LEVEL_LOGICAL ||
+		pg_atomic_read_u32(&ReplicationSlotCtl->n_inuse_logical_slots) > 0)
+		new_status = true;
+
+	/* Update the status on shmem if needed */
+	if (IsLogicalDecodingEnabled() != new_status)
+	{
+		XLogRecPtr	recptr;
+
+		UpdateLogicalDecodingStatus(new_status);
+
+		XLogBeginInsert();
+		XLogRegisterData(&new_status, sizeof(bool));
+		recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE);
+		XLogFlush(recptr);
+
+		elog(DEBUG1, "update logical decoding status to %d at end of recovery",
+			 new_status);
+	}
+
+	/*
+	 * Ensure all running processes have the updated status. We don't need to
+	 * wait for running transactions to finish as we don't accept any writes
+	 * yet. We need the wait even if we've not updated the status above as the
+	 * status have been turned on and off during recovery, having running
+	 * processes have different status on their local caches.
+	 */
+	WaitForProcSignalBarrier(
+							 EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
+}
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index 6f19614c79d..19c7130b961 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -6,6 +6,7 @@ backend_sources += files(
   'decode.c',
   'launcher.c',
   'logical.c',
+  'logicalctl.c',
   'logicalfuncs.c',
   'message.c',
   'origin.c',
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 2f0c08b8fbd..f3ed58b9001 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -57,6 +57,7 @@
 #include "pgstat.h"
 #include "postmaster/interrupt.h"
 #include "replication/logical.h"
+#include "replication/logicalctl.h"
 #include "replication/slotsync.h"
 #include "replication/snapbuild.h"
 #include "storage/ipc.h"
@@ -763,6 +764,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 							  remote_slot->failover,
 							  true);
 
+		EnsureLogicalDecodingEnabled();
+
 		/* For shorter lines. */
 		slot = MyReplicationSlot;
 
@@ -1058,15 +1061,16 @@ bool
 ValidateSlotSyncParams(int elevel)
 {
 	/*
-	 * Logical slot sync/creation requires wal_level >= logical.
+	 * Logical slot sync/creation requires to the logical decoding to be
+	 * enabled.
 	 *
 	 * Since altering the wal_level requires a server restart, so error out in
 	 * this case regardless of elevel provided by caller.
 	 */
-	if (wal_level < WAL_LEVEL_LOGICAL)
-		ereport(ERROR,
-				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\""));
+	if (!IsLogicalDecodingEnabled())
+		ereport(elevel,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("logical decoding is not enabled"));
 
 	/*
 	 * A physical replication slot(primary_slot_name) is required on the
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index e44ad576bc7..671b7fa2083 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -47,6 +47,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/interrupt.h"
+#include "replication/logicalctl.h"
 #include "replication/slotsync.h"
 #include "replication/slot.h"
 #include "replication/walsender_private.h"
@@ -219,6 +220,8 @@ ReplicationSlotsShmemInit(void)
 		/* First time through, so initialize */
 		MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
 
+		pg_atomic_init_u32(&ReplicationSlotCtl->n_inuse_logical_slots, 0);
+
 		for (i = 0; i < max_replication_slots; i++)
 		{
 			ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
@@ -459,7 +462,10 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 * ReplicationSlotAllocationLock.
 	 */
 	if (SlotIsLogical(slot))
+	{
 		pgstat_create_replslot(slot);
+		pg_atomic_add_fetch_u32(&ReplicationSlotCtl->n_inuse_logical_slots, 1);
+	}
 
 	/*
 	 * Now that the slot has been marked as in_use and active, it's safe to
@@ -806,6 +812,8 @@ restart:
 	}
 
 	LWLockRelease(ReplicationSlotControlLock);
+
+	DisableLogicalDecodingIfNecessary();
 }
 
 /*
@@ -920,6 +928,7 @@ void
 ReplicationSlotDropAcquired(void)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
+	bool		was_logical_slot = SlotIsLogical(slot);
 
 	Assert(MyReplicationSlot != NULL);
 
@@ -927,6 +936,9 @@ ReplicationSlotDropAcquired(void)
 	MyReplicationSlot = NULL;
 
 	ReplicationSlotDropPtr(slot);
+
+	if (was_logical_slot)
+		DisableLogicalDecodingIfNecessary();
 }
 
 /*
@@ -1027,7 +1039,10 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	 * another session.
 	 */
 	if (SlotIsLogical(slot))
+	{
 		pgstat_drop_replslot(slot);
+		pg_atomic_sub_fetch_u32(&ReplicationSlotCtl->n_inuse_logical_slots, 1);
+	}
 
 	/*
 	 * We release this at the very end, so that nobody starts trying to create
@@ -2578,12 +2593,12 @@ RestoreSlotFromDisk(const char *name)
 	 */
 	if (cp.slotdata.database != InvalidOid)
 	{
-		if (wal_level < WAL_LEVEL_LOGICAL)
+		if (wal_level < WAL_LEVEL_REPLICA)
 			ereport(FATAL,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
+					 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
 							NameStr(cp.slotdata.name)),
-					 errhint("Change \"wal_level\" to be \"logical\" or higher.")));
+					 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
 
 		/*
 		 * In standby mode, the hot standby must be enabled. This check is
@@ -2646,6 +2661,9 @@ RestoreSlotFromDisk(const char *name)
 		ReplicationSlotSetInactiveSince(slot, now, false);
 
 		restored = true;
+
+		if (SlotIsLogical(slot))
+			pg_atomic_add_fetch_u32(&ReplicationSlotCtl->n_inuse_logical_slots, 1);
 		break;
 	}
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 69f4c6157c5..6744a7979d7 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -18,6 +18,7 @@
 #include "access/xlogutils.h"
 #include "funcapi.h"
 #include "replication/logical.h"
+#include "replication/logicalctl.h"
 #include "replication/slot.h"
 #include "replication/slotsync.h"
 #include "utils/builtins.h"
@@ -136,6 +137,9 @@ create_logical_replication_slot(char *name, char *plugin,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
 						  failover, false);
 
+	EnsureLogicalDecodingEnabled();
+	CheckLogicalDecodingRequirements();
+
 	/*
 	 * Create logical decoding context to find start point or, if we don't
 	 * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
@@ -184,8 +188,6 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 
 	CheckSlotPermissions();
 
-	CheckLogicalDecodingRequirements();
-
 	create_logical_replication_slot(NameStr(*name),
 									NameStr(*plugin),
 									temporary,
@@ -905,6 +907,7 @@ pg_sync_replication_slots(PG_FUNCTION_ARGS)
 				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				errmsg("replication slots can only be synchronized to a standby server"));
 
+	EnsureLogicalDecodingEnabled();
 	ValidateSlotSyncParams(ERROR);
 
 	/* Load the libpq-specific functions */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 28b8591efa5..7158c638836 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -71,6 +71,7 @@
 #include "postmaster/interrupt.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/logicalctl.h"
 #include "replication/slotsync.h"
 #include "replication/slot.h"
 #include "replication/snapbuild.h"
@@ -1218,6 +1219,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		Assert(cmd->kind == REPLICATION_KIND_LOGICAL);
 
+		EnsureLogicalDecodingEnabled();
 		CheckLogicalDecodingRequirements();
 
 		/*
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2fa045e6b0f..f1ef837755c 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -31,6 +31,7 @@
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/walsummarizer.h"
+#include "replication/logicalctl.h"
 #include "replication/logicallauncher.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
@@ -150,6 +151,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
 	size = add_size(size, AioShmemSize());
+	size = add_size(size, LogicalDecodingCtlShmemSize());
 
 	/* include additional requested shmem from preload libraries */
 	size = add_size(size, total_addin_request);
@@ -343,6 +345,7 @@ CreateOrAttachShmemStructs(void)
 	WaitEventCustomShmemInit();
 	InjectionPointShmemInit();
 	AioShmemInit();
+	LogicalDecodingCtlShmemInit();
 }
 
 /*
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index a9bb540b55a..0c3788cb836 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -22,6 +22,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "port/pg_bitutils.h"
+#include "replication/logicalctl.h"
 #include "replication/logicalworker.h"
 #include "replication/walsender.h"
 #include "storage/condition_variable.h"
@@ -576,6 +577,9 @@ ProcessProcSignalBarrier(void)
 					case PROCSIGNAL_BARRIER_SMGRRELEASE:
 						processed = ProcessBarrierSmgrRelease();
 						break;
+					case PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO:
+						processed = ProcessBarrierUpdateXLogLogicalInfo();
+						break;
 				}
 
 				/*
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 4222bdab078..a65ce9cd02b 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -24,6 +24,7 @@
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/logicalctl.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
@@ -499,7 +500,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
 	 * seems OK, given that this kind of conflict should not normally be
 	 * reached, e.g. due to using a physical replication slot.
 	 */
-	if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
+	if (IsLogicalDecodingEnabled() && isCatalogRel)
 		InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid,
 										   snapshotConflictHorizon);
 }
@@ -1325,13 +1326,13 @@ LogStandbySnapshot(void)
 	 * record. Fortunately this routine isn't executed frequently, and it's
 	 * only a shared lock.
 	 */
-	if (wal_level < WAL_LEVEL_LOGICAL)
+	if (!IsLogicalDecodingEnabled())
 		LWLockRelease(ProcArrayLock);
 
 	recptr = LogCurrentRunningXacts(running);
 
 	/* Release lock if we kept it longer ... */
-	if (wal_level >= WAL_LEVEL_LOGICAL)
+	if (IsLogicalDecodingEnabled())
 		LWLockRelease(ProcArrayLock);
 
 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 4da68312b5f..311fed2861b 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -133,6 +133,7 @@ HASH_GROW_BUCKETS_ELECT	"Waiting to elect a Parallel Hash participant to allocat
 HASH_GROW_BUCKETS_REALLOCATE	"Waiting for an elected Parallel Hash participant to finish allocating more buckets."
 HASH_GROW_BUCKETS_REINSERT	"Waiting for other Parallel Hash participants to finish inserting tuples into new buckets."
 LOGICAL_APPLY_SEND_DATA	"Waiting for a logical replication leader apply process to send data to a parallel apply process."
+LOGICAL_DECODING_STATUS_CHANGE	"Waiting for the logical decoding status change."
 LOGICAL_PARALLEL_APPLY_STATE_CHANGE	"Waiting for a logical replication parallel apply process to change state."
 LOGICAL_SYNC_DATA	"Waiting for a logical replication remote server to send data for initial table synchronization."
 LOGICAL_SYNC_STATE_CHANGE	"Waiting for a logical replication remote server to change state."
@@ -352,6 +353,7 @@ DSMRegistry	"Waiting to read or update the dynamic shared memory registry."
 InjectionPoint	"Waiting to read or update information related to injection points."
 SerialControl	"Waiting to read or update shared <filename>pg_serial</filename> state."
 AioWorkerSubmissionQueue	"Waiting to access AIO worker submission queue."
+LogicalDecodingControl	"Waiting to access logical decoding status information."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index c86ceefda94..87bceae3406 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -40,6 +40,7 @@
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalctl.h"
 #include "replication/slot.h"
 #include "replication/slotsync.h"
 #include "replication/walsender.h"
@@ -658,6 +659,9 @@ BaseInit(void)
 	/* Initialize lock manager's local structs */
 	InitLockManagerAccess();
 
+	/* Initialize logical info WAL logging state */
+	InitializeProcessXLogLogicalInfo();
+
 	/*
 	 * Initialize replication slots after pgstat. The exit hook might need to
 	 * drop ephemeral slots, which in turn triggers stats reporting.
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index d14b1678e7f..5ac58aa8806 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -5234,6 +5234,17 @@ struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"effective_wal_level", PGC_INTERNAL, PRESET_OPTIONS,
+			gettext_noop("Show the effective WAL level."),
+			NULL,
+			GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE
+		},
+		&effective_wal_level,
+		WAL_LEVEL_REPLICA, wal_level_options,
+		NULL, NULL, show_effective_wal_level
+	},
+
 	{
 		{"dynamic_shared_memory_type", PGC_POSTMASTER, RESOURCES_MEM,
 			gettext_noop("Selects the dynamic shared memory implementation used."),
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index d12798be3d8..fc4c44b8a72 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -94,6 +94,9 @@ typedef enum RecoveryState
 } RecoveryState;
 
 extern PGDLLIMPORT int wal_level;
+extern PGDLLEXPORT int effective_wal_level;
+
+extern bool XLogLogicalInfo;
 
 /* Is WAL archiving enabled (always or only while server is running normally)? */
 #define XLogArchivingActive() \
@@ -123,7 +126,7 @@ extern PGDLLIMPORT int wal_level;
 #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA)
 
 /* Do we need to WAL-log information required only for logical replication? */
-#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL)
+#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL || XLogLogicalInfo)
 
 #ifdef WAL_DEBUG
 extern PGDLLIMPORT bool XLOG_DEBUG;
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 63e834a6ce4..5e9b44c82f0 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -80,6 +80,7 @@ typedef struct CheckPoint
 /* 0xC0 is used in Postgres 9.5-11 */
 #define XLOG_OVERWRITE_CONTRECORD		0xD0
 #define XLOG_CHECKPOINT_REDO			0xE0
+#define XLOG_LOGICAL_DECODING_STATUS_CHANGE	0xF0
 
 
 /*
@@ -136,6 +137,8 @@ typedef struct ControlFileData
 
 	XLogRecPtr	unloggedLSN;	/* current fake LSN value, for unlogged rels */
 
+	bool		logicalDecodingEnabled;
+
 	/*
 	 * These two values determine the minimum point we must recover up to
 	 * before starting up:
diff --git a/src/include/replication/logicalctl.h b/src/include/replication/logicalctl.h
new file mode 100644
index 00000000000..e5837a5e778
--- /dev/null
+++ b/src/include/replication/logicalctl.h
@@ -0,0 +1,59 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalctl.h
+ *		Definitions for logical decoding status control facility.
+ *
+ * Portions Copyright (c) 2013-2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/include/replication/logicalctl.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef XLOGLEVEL_H
+#define XLOGLEVEL_H
+
+#include "access/xlog.h"
+#include "port/atomics.h"
+#include "storage/condition_variable.h"
+
+typedef struct LogicalDecodingCtlData
+{
+	/* True while the logical decoding status is being changed */
+	bool		transition_in_progress;
+
+	/* Condition variable signaled when a transition completes */
+	ConditionVariable transition_cv;
+
+	/*
+	 * xlog_logical_info is the authoritative value used by the all process to
+	 * determine whether to write additional information required by logical
+	 * decoding to WAL. Since this information could be checked frequently,
+	 * each process caches this value in XLogLogicalInfo for better
+	 * performance.
+	 *
+	 * logical_decoding_enabled is true if we allow creating logical slots and
+	 * the logical decoding is enabled.
+	 *
+	 * Both fields are initialized at server startup time. On standbys, these
+	 * values are synchronized to the primary's values when replaying the
+	 * XLOG_LOGICAL_DECODING_STATUS_CHANGE record.
+	 */
+	pg_atomic_flag xlog_logical_info;
+	pg_atomic_flag logical_decoding_enabled;
+}			LogicalDecodingCtlData;
+extern LogicalDecodingCtlData * LogicalDecodingCtl;
+
+extern Size LogicalDecodingCtlShmemSize(void);
+extern void LogicalDecodingCtlShmemInit(void);
+extern void StartupLogicalDecodingStatus(bool status_in_control_file);
+extern void InitializeProcessXLogLogicalInfo(void);
+extern bool ProcessBarrierUpdateXLogLogicalInfo(void);
+extern bool IsLogicalDecodingEnabled(void);
+extern bool IsXLogLogicalInfoEnabled(void);
+extern void EnsureLogicalDecodingEnabled(void);
+extern void DisableLogicalDecodingIfNecessary(void);
+extern void UpdateLogicalDecodingStatus(bool new_status);
+extern void UpdateLogicalDecodingStatusEndOfRecovery(void);
+
+#endif
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76aeeb92242..a983e025d63 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,6 +15,7 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
+#include "port/atomics.h"
 #include "replication/walreceiver.h"
 
 /* directory to store replication slot data in */
@@ -233,6 +234,8 @@ typedef struct ReplicationSlot
  */
 typedef struct ReplicationSlotCtlData
 {
+	pg_atomic_uint32 n_inuse_logical_slots;
+
 	/*
 	 * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
 	 * reason you can't do that in an otherwise-empty struct.
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index a9681738146..4d78367878b 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
 PG_LWLOCK(53, AioWorkerSubmissionQueue)
+PG_LWLOCK(54, LogicalDecodingControl)
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index afeeb1ca019..8e428f298c6 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -54,6 +54,8 @@ typedef enum
 typedef enum
 {
 	PROCSIGNAL_BARRIER_SMGRRELEASE, /* ask smgr to close files */
+	PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO,	/* ask to update
+													 * XLogLogicalInfo */
 } ProcSignalBarrierType;
 
 /*
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index 82ac8646a8d..fbe0b1e2e3d 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -61,6 +61,7 @@ extern bool check_default_text_search_config(char **newval, void **extra, GucSou
 extern void assign_default_text_search_config(const char *newval, void *extra);
 extern bool check_default_with_oids(bool *newval, void **extra,
 									GucSource source);
+extern const char *show_effective_wal_level(void);
 extern bool check_huge_page_size(int *newval, void **extra, GucSource source);
 extern void assign_io_method(int newval, void *extra);
 extern bool check_io_max_concurrency(int *newval, void **extra, GucSource source);
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index 921813483e3..344e6b2f5c9 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -876,7 +876,7 @@ $handle =
   make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
 # We are not able to read from the slot as it requires wal_level >= logical on the primary server
 check_pg_recvlogical_stderr($handle,
-	"logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary"
+	"logical decoding needs to be enabled on the primary"
 );
 
 # Restore primary wal_level
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 916fdb48b3b..51f102f0c9f 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -589,7 +589,7 @@ CREATE PUBLICATION tap_pub2 FOR TABLE skip_wal;
 ROLLBACK;
 });
 ok( $reterr =~
-	  m/WARNING:  "wal_level" is insufficient to publish logical changes/,
+	  m/WARNING:  logical decoding needs to be enabled to publish logical changes/,
 	'CREATE PUBLICATION while "wal_level=minimal"');
 
 done_testing();
-- 
2.43.5

