On 30/12/2018 20:27, Petr Jelinek wrote: > Hi, > > while messing around with slot code I noticed that the SQL functions for > consuming/moving logical replication slots only move restart_lsn up to > previously consumed position and not to currently consumed position. The > reason for that is that restart_lsn is not moved forward unless new > value is smaller that current confirmed_lsn of the slot. But we only > update confirmed_lsn of the slot at the end of the SQL functions so we > can only move restart_lsn up to the position we reached on previous > call. Same is true for catalog_xmin. > > This does not really hurt much functionality wise but it means that > every record is needlessly processed twice as we always restart from > position that was reached 2 calls of the function ago and that we keep > older catalog_xmin than necessary which can potentially affect system > catalog bloat. > > This affects both the pg_logical_slot_get_[binary_]changes and > pg_replication_slot_advance. > > Attached patch improves things by adding call to move the slot's > restart_lsn and catalog_xmin to the last serialized snapshot position > right after we update the confirmed_lsn. >
Meh, and it has copy-paste issue. Fixed in new attachment. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
>From 5a5bde11181d6f527d6c4f27fbdf165f302e9000 Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmo...@pjmodos.net> Date: Sun, 30 Dec 2018 19:57:19 +0100 Subject: [PATCH] Move restart_lsn and catalog_xmin more agresivelly in SQL slot functions The restart_lsn and catalog_xmin are normally moved by snapshot builder and are limited to the position set as confirmed_lsn by replication feedback. However, since the SQL functions like pg_logical_slot_get_changes or pg_replication_slot_advance only update the confirmed_lsn at the end of the function, the restart_lsn and catlog_xmin are only moved to the location of previous call. This in practice means that each record is processed twice when these functions are called repeatadly as they restart decoding from approximately the lsn reached two call ago. It also means we keep catalog_xmin pinned back for longer than necessary. This patch adds update call to the end of the affected SQL functions right after the confirmed_lsn is updated which improves situation considerably. --- src/backend/replication/logical/logicalfuncs.c | 10 ++++++++++ src/backend/replication/logical/snapbuild.c | 6 ++++++ src/backend/replication/slotfuncs.c | 10 ++++++++++ src/include/replication/snapbuild.h | 1 + 4 files changed, 27 insertions(+) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 45aae71a49..e077e869db 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -45,6 +45,7 @@ #include "replication/logical.h" #include "replication/logicalfuncs.h" #include "replication/message.h" +#include "replication/snapbuild.h" #include "storage/fd.h" @@ -331,8 +332,17 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin */ if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm) { + XLogRecPtr snapshot_lsn; + TransactionId xmin; + LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr); + xmin = ReorderBufferGetOldestXmin(ctx->reorder); + snapshot_lsn = SnapBuildGetSnapshotLsn(ctx->snapshot_builder); + LogicalIncreaseXminForSlot(snapshot_lsn, xmin); + LogicalIncreaseRestartDecodingForSlot(snapshot_lsn, + snapshot_lsn); + /* * If only the confirmed_flush_lsn has changed the slot won't get * marked as dirty by the above. Callers on the walsender diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 363ddf4505..529e3287a3 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -687,6 +687,12 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid) return builder->snapshot; } +XLogRecPtr +SnapBuildGetSnapshotLsn(SnapBuild *builder) +{ + return builder->last_serialized_snapshot; +} + /* * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is * any. Aborts the previously started transaction and resets the resource diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 8782bad4a2..9bd322e4d2 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -21,6 +21,7 @@ #include "replication/slot.h" #include "replication/logical.h" #include "replication/logicalfuncs.h" +#include "replication/snapbuild.h" #include "utils/builtins.h" #include "utils/inval.h" #include "utils/pg_lsn.h" @@ -428,8 +429,17 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) { + XLogRecPtr snapshot_lsn; + TransactionId xmin; + LogicalConfirmReceivedLocation(moveto); + xmin = ReorderBufferGetOldestXmin(ctx->reorder); + snapshot_lsn = SnapBuildGetSnapshotLsn(ctx->snapshot_builder); + LogicalIncreaseXminForSlot(snapshot_lsn, xmin); + LogicalIncreaseRestartDecodingForSlot(snapshot_lsn, + snapshot_lsn); + /* * If only the confirmed_flush LSN has changed the slot won't get * marked as dirty by the above. Callers on the walsender diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 56257430ae..e5a5daca92 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -73,6 +73,7 @@ extern void SnapBuildClearExportedSnapshot(void); extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate); extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid); +extern XLogRecPtr SnapBuildGetSnapshotLsn(SnapBuild *builder); extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); -- 2.17.1