Masahiko Sawada <masahiko.saw...@2ndquadrant.com> writes:

> I've attached the updated version patch that incorporated your
> comments. I believe we're going in the right direction for fixing this
> bug. I'll register this item to the next commit fest so as not to
> forget.

I've moved confirmed_flush check to the second lookup out of paranoic
considerations (e.g. slot could have been recreated and creation hasn't
finished yet) and made some minor stylistic adjustments. It looks good
to me now.

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2c9d5de6d9..4a3c7aa0ce 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -121,7 +121,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
  */
 static void
 create_logical_replication_slot(char *name, char *plugin,
-								bool temporary, XLogRecPtr restart_lsn)
+								bool temporary, XLogRecPtr restart_lsn,
+								bool find_startpoint)
 {
 	LogicalDecodingContext *ctx = NULL;
 
@@ -139,16 +140,18 @@ create_logical_replication_slot(char *name, char *plugin,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
 	/*
-	 * Create logical decoding context, to build the initial snapshot.
+	 * Create logical decoding context to find start point or, if we don't
+	 * need it, to 1) bump slot's restart_lsn 2) check plugin sanity.
 	 */
 	ctx = CreateInitDecodingContext(plugin, NIL,
-									false,	/* do not build snapshot */
+									false,	/* do not build data snapshot */
 									restart_lsn,
 									logical_read_local_xlog_page, NULL, NULL,
 									NULL);
 
 	/* build initial snapshot, might take a while */
-	DecodingContextFindStartpoint(ctx);
+	if (find_startpoint)
+		DecodingContextFindStartpoint(ctx);
 
 	/* don't need the decoding context anymore */
 	FreeDecodingContext(ctx);
@@ -179,7 +182,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	create_logical_replication_slot(NameStr(*name),
 									NameStr(*plugin),
 									temporary,
-									InvalidXLogRecPtr);
+									InvalidXLogRecPtr,
+									true);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -683,10 +687,19 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 
 	/* Create new slot and acquire it */
 	if (logical_slot)
+	{
+		/*
+		 * WAL required for building snapshot could be removed as we haven't
+		 * reserved WAL yet. So we create a new logical replication slot
+		 * without building an initial snapshot.  A reasonable start point for
+		 * decoding will be provided by the source slot.
+		 */
 		create_logical_replication_slot(NameStr(*dst_name),
 										plugin,
 										temporary,
-										src_restart_lsn);
+										src_restart_lsn,
+										false);
+	}
 	else
 		create_physical_replication_slot(NameStr(*dst_name),
 										 true,
@@ -703,6 +716,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		TransactionId copy_xmin;
 		TransactionId copy_catalog_xmin;
 		XLogRecPtr	copy_restart_lsn;
+		XLogRecPtr	copy_confirmed_flush;
 		bool		copy_islogical;
 		char	   *copy_name;
 
@@ -714,6 +728,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		copy_xmin = src->data.xmin;
 		copy_catalog_xmin = src->data.catalog_xmin;
 		copy_restart_lsn = src->data.restart_lsn;
+		copy_confirmed_flush = src->data.confirmed_flush;
 
 		/* for existence check */
 		copy_name = pstrdup(NameStr(src->data.name));
@@ -738,6 +753,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 							NameStr(*src_name)),
 					 errdetail("The source replication slot was modified incompatibly during the copy operation.")));
 
+		/* The source slot must have a consistent snapshot */
+		if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("cannot copy a logical replication slot that doesn't have confirmed_flush_lsn"),
+					 errhint("Retry when the source replication slot creation is finished.")));
+
 		/* Install copied values again */
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 		MyReplicationSlot->effective_xmin = copy_effective_xmin;
@@ -746,6 +768,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		MyReplicationSlot->data.xmin = copy_xmin;
 		MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
 		MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+		MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
 		SpinLockRelease(&MyReplicationSlot->mutex);
 
 		ReplicationSlotMarkDirty();

-- cheers, arseny

Reply via email to