Hi Melih, Here is a patch to help in getting the execution at various phases like: a) replication slot creation time, b) Wal reading c) Number of WAL records read d) subscription relation state change etc Couple of observation while we tested with this patch: 1) We noticed that the patch takes more time for finding the decoding start point. 2) Another observation was that the number of XLOG records read for identify the consistent point was significantly high with the v26_0001 patch.
HEAD postgres=# select avg(counttime)/1000 "avgtime(ms)", median(counttime)/1000 "median(ms)", min(counttime)/1000 "mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test group by logtype; avgtime(ms) | median(ms) | mintime(ms) | maxtime(ms) | logtype ------------------------+------------------------+-------------+-------------+-------------------------- 0.00579245283018867920 | 0.00200000000000000000 | 0 | 1 | SNAPSHOT_BUILD 1.2246811320754717 | 0.98550000000000000000 | 0 | 37 | LOGICAL_SLOT_CREATION 171.0863283018867920 | 183.9120000000000000 | 0 | 408 | FIND_DECODING_STARTPOINT 2.0699433962264151 | 1.4380000000000000 | 1 | 49 | INIT_DECODING_CONTEXT (4 rows) HEAD + v26-0001 patch postgres=# select avg(counttime)/1000 "avgtime(ms)", median(counttime)/1000 "median(ms)", min(counttime)/1000 "mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test group by logtype; avgtime(ms) | median(ms) | mintime(ms) | maxtime(ms) | logtype ------------------------+------------------------+-------------+-------------+-------------------------- 0.00588113207547169810 | 0.00500000000000000000 | 0 | 0 | SNAPSHOT_BUILD 1.1270962264150943 | 1.1000000000000000 | 0 | 2 | LOGICAL_SLOT_CREATION 301.1745528301886790 | 410.4870000000000000 | 0 | 427 | FIND_DECODING_STARTPOINT 1.4814660377358491 | 1.4530000000000000 | 1 | 9 | INIT_DECODING_CONTEXT (4 rows) In the above FIND_DECODING_STARTPOINT is very much higher with V26-0001 patch. HEAD FIND_DECODING_XLOG_RECORD_COUNT - average = 2762 - median = 3362 HEAD + reuse worker patch(v26_0001 patch) Where FIND_DECODING_XLOG_RECORD_COUNT - average = 4105 - median = 5345 Similarly Number of xlog records read is higher with v26_0001 patch. Steps to calculate the timing: -- first collect the necessary LOG from subscriber's log. cat *.log | grep -E '(LOGICAL_SLOT_CREATION|INIT_DECODING_CONTEXT|FIND_DECODING_STARTPOINT|SNAPSHOT_BUILD|FIND_DECODING_XLOG_RECORD_COUNT|LOGICAL_XLOG_READ|LOGICAL_DECODE_PROCESS_RECORD|LOGICAL_WAIT_TRANSACTION)' > grep.dat create table testv26(logtime varchar, pid varchar, level varchar, space varchar, logtype varchar, counttime int); -- then copy these datas into db table to count the avg number. COPY testv26 FROM '/home/logs/grep.dat' DELIMITER ' '; -- Finally, use the SQL to analyze the data: select avg(counttime)/1000 "avgtime(ms)", logtype from testv26 group by logtype; --- To get the number of xlog records read: select avg(counttime) from testv26 where logtype ='FIND_DECODING_XLOG_RECORD_COUNT' and counttime != 1; Thanks to Peter and Hou-san who helped in finding these out. We are parallely analysing this, @Melih Mutlu posting this information so that it might help you too in analysing this issue. Regards, Vignesh
From b755cab38ff76e9f63304b2d8f344cb098ca6a33 Mon Sep 17 00:00:00 2001 From: Hou Zhijie <houzj.f...@cn.fujitsu.com> Date: Fri, 4 Aug 2023 17:57:29 +0800 Subject: [PATCH v1 1/2] count state change time --- src/backend/replication/logical/tablesync.c | 28 +++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 651a775065..0d9298f7b3 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -123,6 +123,10 @@ #include "utils/syscache.h" #include "utils/usercontext.h" +static TimestampTz start = 0; +static long secs = 0; +static int microsecs = 0; + static bool table_states_valid = false; static List *table_states_not_ready = NIL; static bool FetchTableStates(bool *started_tx); @@ -338,6 +342,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); CommitTransactionCommand(); + + TimestampDifference(start, GetCurrentTimestamp(), &secs, µsecs); + elog(LOG, "SUBREL_STATE_SYNCDONE %d", ((int) secs * 1000000 + microsecs)); + start = GetCurrentTimestamp(); + pgstat_report_stat(false); /* @@ -1258,6 +1267,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) bool must_use_password; bool run_as_owner; + start = GetCurrentTimestamp(); + /* Check the state of the table synchronization. */ StartTransactionCommand(); relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, @@ -1361,6 +1372,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; SpinLockRelease(&MyLogicalRepWorker->relmutex); + TimestampDifference(start, GetCurrentTimestamp(), &secs, µsecs); + elog(LOG, "SUBREL_STATE_DATASYNC %d", ((int) secs * 1000000 + microsecs)); + start = GetCurrentTimestamp(); + /* Update the state and make it visible to others. */ StartTransactionCommand(); UpdateSubscriptionRelState(MyLogicalRepWorker->subid, @@ -1404,6 +1419,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) slotname, false /* permanent */ , false /* two_phase */ , CRS_USE_SNAPSHOT, origin_startpos); + TimestampDifference(start, GetCurrentTimestamp(), &secs, µsecs); + elog(LOG, "WALRCV_CREATE_SLOT %d", ((int) secs * 1000000 + microsecs)); + start = GetCurrentTimestamp(); + /* * Setup replication origin tracking. The purpose of doing this before the * copy is to avoid doing the copy again due to any error in setting up @@ -1502,6 +1521,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CommitTransactionCommand(); + TimestampDifference(start, GetCurrentTimestamp(), &secs, µsecs); + elog(LOG, "SUBREL_STATE_FINISHEDCOPY %d", ((int) secs * 1000000 + microsecs)); + start = GetCurrentTimestamp(); + copy_table_done: elog(DEBUG1, @@ -1521,6 +1544,11 @@ copy_table_done: * then return to let LogicalRepApplyLoop do it. */ wait_for_worker_state_change(SUBREL_STATE_CATCHUP); + + TimestampDifference(start, GetCurrentTimestamp(), &secs, µsecs); + elog(LOG, "SUBREL_STATE_CATCHUP %d", ((int) secs * 1000000 + microsecs)); + start = GetCurrentTimestamp(); + return slotname; } -- 2.34.1
From 0b9b3944b572165072e00cafb0bbc8f5a80554be Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Mon, 7 Aug 2023 12:26:20 +0530 Subject: [PATCH v1 2/2] Logs to measure creation of replication slot breakup. Logs to measure creation of replication slot breakup. --- src/backend/replication/logical/logical.c | 27 +++++++++++++++- src/backend/replication/logical/snapbuild.c | 14 +++++++- src/backend/replication/walsender.c | 36 +++++++++++++++++++++ 3 files changed, 75 insertions(+), 2 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 41243d0187..aa195c4aa9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -630,11 +630,20 @@ void DecodingContextFindStartpoint(LogicalDecodingContext *ctx) { ReplicationSlot *slot = ctx->slot; + int count = 0; + + instr_time start; + instr_time elapsed; + instr_time total_read; + instr_time total_decode; + + INSTR_TIME_SET_ZERO(total_read); + INSTR_TIME_SET_ZERO(total_decode); /* Initialize from where to start reading WAL. */ XLogBeginRead(ctx->reader, slot->data.restart_lsn); - elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", + elog(LOG, "searching for logical decoding starting point, starting at %X/%X", LSN_FORMAT_ARGS(slot->data.restart_lsn)); /* Wait for a consistent starting point */ @@ -642,16 +651,29 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) { XLogRecord *record; char *err = NULL; + count++; + + INSTR_TIME_SET_CURRENT(start); /* the read_page callback waits for new WAL */ record = XLogReadRecord(ctx->reader, &err); + INSTR_TIME_SET_CURRENT(elapsed); + INSTR_TIME_SUBTRACT(elapsed, start); + INSTR_TIME_ADD(total_read, elapsed); + if (err) elog(ERROR, "could not find logical decoding starting point: %s", err); if (!record) elog(ERROR, "could not find logical decoding starting point"); + INSTR_TIME_SET_CURRENT(start); + LogicalDecodingProcessRecord(ctx, ctx->reader); + INSTR_TIME_SET_CURRENT(elapsed); + INSTR_TIME_SUBTRACT(elapsed, start); + INSTR_TIME_ADD(total_decode, elapsed); + /* only continue till we found a consistent spot */ if (DecodingContextReady(ctx)) break; @@ -664,6 +686,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) if (slot->data.two_phase) slot->data.two_phase_at = ctx->reader->EndRecPtr; SpinLockRelease(&slot->mutex); + elog(LOG, "LOGICAL_XLOG_READ %ld", INSTR_TIME_GET_MICROSEC(total_read)); + elog(LOG, "LOGICAL_DECODE_PROCESS_RECORD %ld", INSTR_TIME_GET_MICROSEC(total_decode)); + elog(LOG, "FIND_DECODING_XLOG_RECORD_COUNT %d", count); } /* diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 843ceba840..2ce302a597 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1209,6 +1209,8 @@ SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, * ----------------------------------- */ +extern instr_time total_wait; + /* * Process a running xacts record, and use its information to first build a * historic snapshot and later to release resources that aren't needed @@ -1227,8 +1229,18 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact */ if (builder->state < SNAPBUILD_CONSISTENT) { + instr_time start; + instr_time elapsed; + bool result; + + INSTR_TIME_SET_CURRENT(start); + result = SnapBuildFindSnapshot(builder, lsn, running); + INSTR_TIME_SET_CURRENT(elapsed); + INSTR_TIME_SUBTRACT(elapsed, start); + INSTR_TIME_ADD(total_wait, elapsed); + /* returns false if there's no point in performing cleanup just yet */ - if (!SnapBuildFindSnapshot(builder, lsn, running)) + if (!result) return; } else diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d27ef2985d..13831bdc6f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -197,6 +197,10 @@ static volatile sig_atomic_t replication_active = false; static LogicalDecodingContext *logical_decoding_ctx = NULL; +static TimestampTz start = 0; +static long secs = 0; +static int microsecs = 0; + /* A sample associating a WAL location with the time it was written. */ typedef struct { @@ -1034,6 +1038,8 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, } } +instr_time total_wait; + /* * Create a new replication slot. */ @@ -1052,6 +1058,15 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Datum values[4]; bool nulls[4] = {0}; + instr_time begin; + instr_time elapsed; + instr_time total_create; + + INSTR_TIME_SET_ZERO(total_create); + INSTR_TIME_SET_ZERO(total_wait); + + INSTR_TIME_SET_CURRENT(begin); + Assert(!MyReplicationSlot); parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); @@ -1083,6 +1098,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) LogicalDecodingContext *ctx; bool need_full_snapshot = false; + INSTR_TIME_SET_CURRENT(elapsed); + INSTR_TIME_SUBTRACT(elapsed, begin); + INSTR_TIME_ADD(total_create, elapsed); + + elog(LOG, "LOGICAL_SLOT_CREATION %ld", INSTR_TIME_GET_MICROSEC(total_create)); + /* * Do options check early so that we can bail before calling the * DecodingContextFindStartpoint which can take long time. @@ -1131,6 +1152,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) need_full_snapshot = true; } + start = GetCurrentTimestamp(); ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, InvalidXLogRecPtr, XL_ROUTINE(.page_read = logical_read_xlog_page, @@ -1139,6 +1161,10 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); + TimestampDifference(start, GetCurrentTimestamp(), &secs, µsecs); + elog(LOG, "INIT_DECODING_CONTEXT %d", ((int) secs * 1000000 + microsecs)); + start = GetCurrentTimestamp(); + /* * Signal that we don't need the timeout mechanism. We're just * creating the replication slot and don't yet accept feedback @@ -1151,6 +1177,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); + TimestampDifference(start, GetCurrentTimestamp(), &secs, µsecs); + elog(LOG, "FIND_DECODING_STARTPOINT %d", ((int) secs * 1000000 + microsecs)); + start = GetCurrentTimestamp(); + + elog(LOG, "LOGICAL_WAIT_TRANSACTION %ld", INSTR_TIME_GET_MICROSEC(total_wait)); + /* * Export or use the snapshot if we've been asked to do so. * @@ -1169,6 +1201,10 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) RestoreTransactionSnapshot(snap, MyProc); } + TimestampDifference(start, GetCurrentTimestamp(), &secs, µsecs); + elog(LOG, "SNAPSHOT_BUILD %d", ((int) secs * 1000000 + microsecs)); + start = GetCurrentTimestamp(); + /* don't need the decoding context anymore */ FreeDecodingContext(ctx); -- 2.34.1