Hi Melih,

Here is a patch to help in getting the execution at various phases
like: a) replication slot creation time, b) Wal reading c) Number of
WAL records read d) subscription relation state change etc
Couple of observation while we tested with this patch:
1) We noticed that the patch takes more time for finding the decoding
start point.
2) Another observation was that the number of XLOG records read for
identify the consistent point was significantly high with the v26_0001
patch.

HEAD
postgres=# select avg(counttime)/1000 "avgtime(ms)",
median(counttime)/1000 "median(ms)", min(counttime)/1000
"mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test
group by logtype;
      avgtime(ms)       |       median(ms)       | mintime(ms) |
maxtime(ms) |         logtype
------------------------+------------------------+-------------+-------------+--------------------------
 0.00579245283018867920 | 0.00200000000000000000 |           0 |
    1 | SNAPSHOT_BUILD
     1.2246811320754717 | 0.98550000000000000000 |           0 |
   37 | LOGICAL_SLOT_CREATION
   171.0863283018867920 |   183.9120000000000000 |           0 |
  408 | FIND_DECODING_STARTPOINT
     2.0699433962264151 |     1.4380000000000000 |           1 |
   49 | INIT_DECODING_CONTEXT
(4 rows)

HEAD + v26-0001 patch
postgres=# select avg(counttime)/1000 "avgtime(ms)",
median(counttime)/1000 "median(ms)", min(counttime)/1000
"mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test
group by logtype;
      avgtime(ms)       |       median(ms)       | mintime(ms) |
maxtime(ms) |         logtype
------------------------+------------------------+-------------+-------------+--------------------------
 0.00588113207547169810 | 0.00500000000000000000 |           0 |
    0 | SNAPSHOT_BUILD
     1.1270962264150943 |     1.1000000000000000 |           0 |
    2 | LOGICAL_SLOT_CREATION
   301.1745528301886790 |   410.4870000000000000 |           0 |
  427 | FIND_DECODING_STARTPOINT
     1.4814660377358491 |     1.4530000000000000 |           1 |
    9 | INIT_DECODING_CONTEXT
(4 rows)

In the above FIND_DECODING_STARTPOINT is very much higher with V26-0001 patch.

HEAD
FIND_DECODING_XLOG_RECORD_COUNT
- average =  2762
- median = 3362

HEAD + reuse worker patch(v26_0001 patch)
Where FIND_DECODING_XLOG_RECORD_COUNT
- average =  4105
- median = 5345

Similarly Number of xlog records read is higher with v26_0001 patch.

Steps to calculate the timing:
-- first collect the necessary LOG from subscriber's log.
cat *.log | grep -E
'(LOGICAL_SLOT_CREATION|INIT_DECODING_CONTEXT|FIND_DECODING_STARTPOINT|SNAPSHOT_BUILD|FIND_DECODING_XLOG_RECORD_COUNT|LOGICAL_XLOG_READ|LOGICAL_DECODE_PROCESS_RECORD|LOGICAL_WAIT_TRANSACTION)'
> grep.dat

create table testv26(logtime varchar, pid varchar, level varchar,
space varchar, logtype varchar, counttime int);
-- then copy these datas into db table to count the avg number.
COPY testv26 FROM '/home/logs/grep.dat' DELIMITER ' ';

-- Finally, use the SQL to analyze the data:
select avg(counttime)/1000 "avgtime(ms)", logtype from testv26 group by logtype;

--- To get the number of xlog records read:
select avg(counttime) from testv26 where logtype
='FIND_DECODING_XLOG_RECORD_COUNT' and counttime != 1;

Thanks to Peter and Hou-san who helped in finding these out. We are
parallely analysing this, @Melih Mutlu  posting this information so
that it might help you too in analysing this issue.

Regards,
Vignesh
From b755cab38ff76e9f63304b2d8f344cb098ca6a33 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.f...@cn.fujitsu.com>
Date: Fri, 4 Aug 2023 17:57:29 +0800
Subject: [PATCH v1 1/2] count state change time

---
 src/backend/replication/logical/tablesync.c | 28 +++++++++++++++++++++
 1 file changed, 28 insertions(+)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 651a775065..0d9298f7b3 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -123,6 +123,10 @@
 #include "utils/syscache.h"
 #include "utils/usercontext.h"
 
+static TimestampTz start = 0;
+static long		secs = 0;
+static int			microsecs = 0;
+
 static bool table_states_valid = false;
 static List *table_states_not_ready = NIL;
 static bool FetchTableStates(bool *started_tx);
@@ -338,6 +342,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
 
 		CommitTransactionCommand();
+
+		TimestampDifference(start, GetCurrentTimestamp(), &secs, &microsecs);
+		elog(LOG, "SUBREL_STATE_SYNCDONE %d", ((int) secs * 1000000 + microsecs));
+		start = GetCurrentTimestamp();
+
 		pgstat_report_stat(false);
 
 		/*
@@ -1258,6 +1267,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	bool		must_use_password;
 	bool		run_as_owner;
 
+	start = GetCurrentTimestamp();
+
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
 	relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
@@ -1361,6 +1372,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+	TimestampDifference(start, GetCurrentTimestamp(), &secs, &microsecs);
+	elog(LOG, "SUBREL_STATE_DATASYNC %d", ((int) secs * 1000000 + microsecs));
+	start = GetCurrentTimestamp();
+
 	/* Update the state and make it visible to others. */
 	StartTransactionCommand();
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
@@ -1404,6 +1419,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 					   slotname, false /* permanent */ , false /* two_phase */ ,
 					   CRS_USE_SNAPSHOT, origin_startpos);
 
+	TimestampDifference(start, GetCurrentTimestamp(), &secs, &microsecs);
+	elog(LOG, "WALRCV_CREATE_SLOT %d", ((int) secs * 1000000 + microsecs));
+	start = GetCurrentTimestamp();
+
 	/*
 	 * Setup replication origin tracking. The purpose of doing this before the
 	 * copy is to avoid doing the copy again due to any error in setting up
@@ -1502,6 +1521,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	CommitTransactionCommand();
 
+	TimestampDifference(start, GetCurrentTimestamp(), &secs, &microsecs);
+	elog(LOG, "SUBREL_STATE_FINISHEDCOPY %d", ((int) secs * 1000000 + microsecs));
+	start = GetCurrentTimestamp();
+
 copy_table_done:
 
 	elog(DEBUG1,
@@ -1521,6 +1544,11 @@ copy_table_done:
 	 * then return to let LogicalRepApplyLoop do it.
 	 */
 	wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
+
+	TimestampDifference(start, GetCurrentTimestamp(), &secs, &microsecs);
+	elog(LOG, "SUBREL_STATE_CATCHUP %d", ((int) secs * 1000000 + microsecs));
+	start = GetCurrentTimestamp();
+
 	return slotname;
 }
 
-- 
2.34.1

From 0b9b3944b572165072e00cafb0bbc8f5a80554be Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Mon, 7 Aug 2023 12:26:20 +0530
Subject: [PATCH v1 2/2] Logs to measure creation of replication slot breakup.

Logs to measure creation of replication slot breakup.
---
 src/backend/replication/logical/logical.c   | 27 +++++++++++++++-
 src/backend/replication/logical/snapbuild.c | 14 +++++++-
 src/backend/replication/walsender.c         | 36 +++++++++++++++++++++
 3 files changed, 75 insertions(+), 2 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 41243d0187..aa195c4aa9 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -630,11 +630,20 @@ void
 DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 {
 	ReplicationSlot *slot = ctx->slot;
+	int count = 0;
+
+	instr_time	start;
+	instr_time	elapsed;
+	instr_time	total_read;
+	instr_time	total_decode;
+
+	INSTR_TIME_SET_ZERO(total_read);
+	INSTR_TIME_SET_ZERO(total_decode);
 
 	/* Initialize from where to start reading WAL. */
 	XLogBeginRead(ctx->reader, slot->data.restart_lsn);
 
-	elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
+	elog(LOG, "searching for logical decoding starting point, starting at %X/%X",
 		 LSN_FORMAT_ARGS(slot->data.restart_lsn));
 
 	/* Wait for a consistent starting point */
@@ -642,16 +651,29 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 	{
 		XLogRecord *record;
 		char	   *err = NULL;
+		count++;
+
+		INSTR_TIME_SET_CURRENT(start);
 
 		/* the read_page callback waits for new WAL */
 		record = XLogReadRecord(ctx->reader, &err);
+		INSTR_TIME_SET_CURRENT(elapsed);
+		INSTR_TIME_SUBTRACT(elapsed, start);
+		INSTR_TIME_ADD(total_read, elapsed);
+
 		if (err)
 			elog(ERROR, "could not find logical decoding starting point: %s", err);
 		if (!record)
 			elog(ERROR, "could not find logical decoding starting point");
 
+		INSTR_TIME_SET_CURRENT(start);
+
 		LogicalDecodingProcessRecord(ctx, ctx->reader);
 
+		INSTR_TIME_SET_CURRENT(elapsed);
+		INSTR_TIME_SUBTRACT(elapsed, start);
+		INSTR_TIME_ADD(total_decode, elapsed);
+
 		/* only continue till we found a consistent spot */
 		if (DecodingContextReady(ctx))
 			break;
@@ -664,6 +686,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 	if (slot->data.two_phase)
 		slot->data.two_phase_at = ctx->reader->EndRecPtr;
 	SpinLockRelease(&slot->mutex);
+	elog(LOG, "LOGICAL_XLOG_READ %ld", INSTR_TIME_GET_MICROSEC(total_read));
+	elog(LOG, "LOGICAL_DECODE_PROCESS_RECORD %ld", INSTR_TIME_GET_MICROSEC(total_decode));
+	elog(LOG, "FIND_DECODING_XLOG_RECORD_COUNT %d", count);
 }
 
 /*
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 843ceba840..2ce302a597 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1209,6 +1209,8 @@ SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
  * -----------------------------------
  */
 
+extern instr_time	total_wait;
+
 /*
  * Process a running xacts record, and use its information to first build a
  * historic snapshot and later to release resources that aren't needed
@@ -1227,8 +1229,18 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	 */
 	if (builder->state < SNAPBUILD_CONSISTENT)
 	{
+		instr_time	start;
+		instr_time	elapsed;
+		bool result;
+
+		INSTR_TIME_SET_CURRENT(start);
+		result = SnapBuildFindSnapshot(builder, lsn, running);
+		INSTR_TIME_SET_CURRENT(elapsed);
+		INSTR_TIME_SUBTRACT(elapsed, start);
+		INSTR_TIME_ADD(total_wait, elapsed);
+
 		/* returns false if there's no point in performing cleanup just yet */
-		if (!SnapBuildFindSnapshot(builder, lsn, running))
+		if (!result)
 			return;
 	}
 	else
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d27ef2985d..13831bdc6f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -197,6 +197,10 @@ static volatile sig_atomic_t replication_active = false;
 
 static LogicalDecodingContext *logical_decoding_ctx = NULL;
 
+static TimestampTz start = 0;
+static long		secs = 0;
+static int			microsecs = 0;
+
 /* A sample associating a WAL location with the time it was written. */
 typedef struct
 {
@@ -1034,6 +1038,8 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
 	}
 }
 
+instr_time	total_wait;
+
 /*
  * Create a new replication slot.
  */
@@ -1052,6 +1058,15 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	Datum		values[4];
 	bool		nulls[4] = {0};
 
+	instr_time	begin;
+	instr_time	elapsed;
+	instr_time	total_create;
+
+	INSTR_TIME_SET_ZERO(total_create);
+	INSTR_TIME_SET_ZERO(total_wait);
+
+	INSTR_TIME_SET_CURRENT(begin);
+
 	Assert(!MyReplicationSlot);
 
 	parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
@@ -1083,6 +1098,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		LogicalDecodingContext *ctx;
 		bool		need_full_snapshot = false;
 
+		INSTR_TIME_SET_CURRENT(elapsed);
+		INSTR_TIME_SUBTRACT(elapsed, begin);
+		INSTR_TIME_ADD(total_create, elapsed);
+
+		elog(LOG, "LOGICAL_SLOT_CREATION %ld", INSTR_TIME_GET_MICROSEC(total_create));
+
 		/*
 		 * Do options check early so that we can bail before calling the
 		 * DecodingContextFindStartpoint which can take long time.
@@ -1131,6 +1152,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 			need_full_snapshot = true;
 		}
 
+		start = GetCurrentTimestamp();
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
 										InvalidXLogRecPtr,
 										XL_ROUTINE(.page_read = logical_read_xlog_page,
@@ -1139,6 +1161,10 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 										WalSndPrepareWrite, WalSndWriteData,
 										WalSndUpdateProgress);
 
+		TimestampDifference(start, GetCurrentTimestamp(), &secs, &microsecs);
+		elog(LOG, "INIT_DECODING_CONTEXT %d", ((int) secs * 1000000 + microsecs));
+		start = GetCurrentTimestamp();
+
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
 		 * creating the replication slot and don't yet accept feedback
@@ -1151,6 +1177,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		/* build initial snapshot, might take a while */
 		DecodingContextFindStartpoint(ctx);
 
+		TimestampDifference(start, GetCurrentTimestamp(), &secs, &microsecs);
+		elog(LOG, "FIND_DECODING_STARTPOINT %d", ((int) secs * 1000000 + microsecs));
+		start = GetCurrentTimestamp();
+
+		elog(LOG, "LOGICAL_WAIT_TRANSACTION %ld", INSTR_TIME_GET_MICROSEC(total_wait));
+
 		/*
 		 * Export or use the snapshot if we've been asked to do so.
 		 *
@@ -1169,6 +1201,10 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 			RestoreTransactionSnapshot(snap, MyProc);
 		}
 
+		TimestampDifference(start, GetCurrentTimestamp(), &secs, &microsecs);
+		elog(LOG, "SNAPSHOT_BUILD %d", ((int) secs * 1000000 + microsecs));
+		start = GetCurrentTimestamp();
+
 		/* don't need the decoding context anymore */
 		FreeDecodingContext(ctx);
 
-- 
2.34.1

Reply via email to