On 2015-07-07 09:42:54 -0700, Gurjeet Singh wrote: > --- a/src/backend/replication/slot.c > +++ b/src/backend/replication/slot.c > @@ -40,10 +40,10 @@ > #include <sys/stat.h> > > #include "access/transam.h" > +#include "access/xlog_internal.h" > #include "common/string.h" > #include "miscadmin.h" > #include "replication/slot.h" > -#include "storage/fd.h" > #include "storage/proc.h" > #include "storage/procarray.h"
Why did you remove fd.h? The file definitely uses infrastructure from there. We're not terribly consistent about that but I'd rather not rely on it being included via xlog_internal.h -> xlog.h. > /* > + * Grab and save an LSN value to prevent WAL recycling past that point. > + */ > +void > +ReplicationSlotRegisterRestartLSN() > +{ Didn't like that description and function name very much. What does 'grabbing' mean here? Should probably mention that it works on the currently active slot and modifies it. It's now ReplicationSlotReserveWal() > + ReplicationSlot *slot = MyReplicationSlot; > + > + Assert(slot != NULL); > + Assert(slot->data.restart_lsn == InvalidXLogRecPtr); > + > + /* > + * The replication slot mechanism is used to prevent removal of required > + * WAL. As there is no interlock between this and checkpoints required, > WAL > + * segment could be removed before ReplicationSlotsComputeRequiredLSN() > has > + * been called to prevent that. In the very unlikely case that this > happens > + * we'll just retry. > + */ You removed some punctuation in that sentence converting a sentence in bad english into one without the original meaning ;). See the attached for a new version. > + while (true) > + { > + XLogSegNo segno; > + > + /* > + * Let's start with enough information if we can, so log a > standby > + * snapshot and start logical decoding at exactly that position. > + */ > + if (!RecoveryInProgress()) > + { > + XLogRecPtr flushptr; > + > + /* start at current insert position */ > + slot->data.restart_lsn = GetXLogInsertRecPtr(); > + > + /* > + * Log an xid snapshot for logical replication. This > snapshot is not > + * needed for physical replication, as it relies on the > snapshot > + * created by checkpoint when the base backup starts. > + */ > + if (slot->data.database != InvalidOid) > + { > + /* make sure we have enough information to > start */ > + flushptr = LogStandbySnapshot(); > + > + /* and make sure it's fsynced to disk */ > + XLogFlush(flushptr); > + } > + } > + else > + slot->data.restart_lsn = GetRedoRecPtr(); > + > + /* prevent WAL removal as fast as possible */ > + ReplicationSlotsComputeRequiredLSN(); > + > + /* > + * If all required WAL is still there, great, otherwise retry. > The > + * slot should prevent further removal of WAL, unless there's a > + * concurrent ReplicationSlotsComputeRequiredLSN() after we've > written > + * the new restart_lsn above, so normally we should never need > to loop > + * more than twice. > + */ > + XLByteToSeg(slot->data.restart_lsn, segno); > + if (XLogGetLastRemovedSegno() < segno) > + break; > + } > +} The way you added the check for logical vs. physical slots in there looks wrong to me. For a physical slot created !InRecovy we'll possibly return a xlog position from the future (it's the insert position *and* not flushed to disk), which then cannot be received. > +/* > * Flush all replication slots to disk. > * > * This needn't actually be part of a checkpoint, but it's a convenient > @@ -876,7 +942,7 @@ StartupReplicationSlots(void) > } > > /* ---- > - * Manipulation of ondisk state of replication slots > + * Manipulation of on-disk state of replication slots > * > * NB: none of the routines below should take any notice whether a slot is > the > * current one or not, that's all handled a layer above. > diff --git a/src/backend/replication/slotfuncs.c > b/src/backend/replication/slotfuncs.c > index 9a2793f..01b376a 100644 > --- a/src/backend/replication/slotfuncs.c > +++ b/src/backend/replication/slotfuncs.c > @@ -40,6 +40,7 @@ Datum > pg_create_physical_replication_slot(PG_FUNCTION_ARGS) > { > Name name = PG_GETARG_NAME(0); > + bool immediately_reserve = PG_GETARG_BOOL(1); > Datum values[2]; > bool nulls[2]; > TupleDesc tupdesc; > @@ -58,10 +59,28 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) > /* acquire replication slot, this will check for conflicting names */ > ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT); > > - values[0] = NameGetDatum(&MyReplicationSlot->data.name); > + if (immediately_reserve) > + { > + /* Allocate restart-LSN, if the user asked for it */ > + ReplicationSlotRegisterRestartLSN(); > + > + /* Write this slot to disk */ > + ReplicationSlotMarkDirty(); > + ReplicationSlotSave(); > > - nulls[0] = false; > - nulls[1] = true; > + values[0] = NameGetDatum(&MyReplicationSlot->data.name); > + values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn); > + > + nulls[0] = false; > + nulls[1] = false; > + } > + else > + { > + values[0] = NameGetDatum(&MyReplicationSlot->data.name); > + > + nulls[0] = false; > + nulls[1] = true; > + } I moved values[0] = NameGetDatum(&MyReplicationSlot->data.name); nulls[0] = false; to outside the conditional block, there seems no reason to have it in there? I also removed a bunch of unrelated minor cleanups that I plan to commit & backpatch separately. What do you think? Andres
>From c9dad57f297dd5b904668ffd9fd6c0e69211a555 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Mon, 10 Aug 2015 13:52:48 +0200 Subject: [PATCH 1/2] Introduce macros determining if a replication slot is physical or logical. These make the code a bit easier to read, and make it easier to add a more explicit notion of a slot's type at some point in the future. Author: Gurjeet Singh Discussion: CABwTF4Wh_dBCzTU=49pFXR6coR4NW1ynb+vBqT+Po=7fuq5...@mail.gmail.com --- src/backend/replication/logical/logical.c | 4 ++-- src/backend/replication/slot.c | 6 +++--- src/backend/replication/walsender.c | 4 ++-- src/include/replication/slot.h | 3 +++ 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 7643add..d737288 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -231,7 +231,7 @@ CreateInitDecodingContext(char *plugin, elog(ERROR, "cannot initialize logical decoding without a specified plugin"); /* Make sure the passed slot is suitable. These are user facing errors. */ - if (slot->data.database == InvalidOid) + if (SlotIsPhysical(slot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot use physical replication slot for logical decoding"))); @@ -380,7 +380,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, elog(ERROR, "cannot perform logical decoding without an acquired slot"); /* make sure the passed slot is suitable, these are user facing errors */ - if (slot->data.database == InvalidOid) + if (SlotIsPhysical(slot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errmsg("cannot use physical replication slot for logical decoding")))); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index f8544de..4d33629 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -693,7 +693,7 @@ ReplicationSlotsComputeLogicalRestartLSN(void) continue; /* we're only interested in logical slots */ - if (s->data.database == InvalidOid) + if (!SlotIsLogical(s)) continue; /* read once, it's ok if it increases while we're checking */ @@ -740,8 +740,8 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) if (!s->in_use) continue; - /* not database specific, skip */ - if (s->data.database == InvalidOid) + /* only logical slots are database specific, skip */ + if (!SlotIsLogical(s)) continue; /* not our database, skip */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index eb1b89b..e1bab07 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -514,7 +514,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { ReplicationSlotAcquire(cmd->slotname); - if (MyReplicationSlot->data.database != InvalidOid) + if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errmsg("cannot use a logical replication slot for physical replication")))); @@ -1564,7 +1564,7 @@ ProcessStandbyReplyMessage(void) */ if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr) { - if (MyReplicationSlot->data.database != InvalidOid) + if (SlotIsLogical(MyReplicationSlot)) LogicalConfirmReceivedLocation(flushPtr); else PhysicalConfirmReceivedLocation(flushPtr); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 78cff07..367ef0a 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -125,6 +125,9 @@ typedef struct ReplicationSlot XLogRecPtr candidate_restart_lsn; } ReplicationSlot; +#define SlotIsPhysical(slot) (slot->data.database == InvalidOid) +#define SlotIsLogical(slot) (slot->data.database != InvalidOid) + /* * Shared memory control area for all of replication slots. */ -- 2.3.0.149.gf3f4077.dirty
>From 91900b87f099aaf736df6f6b5a6465ae5b334405 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Mon, 10 Aug 2015 16:10:13 +0200 Subject: [PATCH 2/2] Allow pg_create_physical_replication_slot() to reserve WAL. When creating a physical slot it's often useful to immediately reserve the current WAL position instead of only doing after the first feedback message arrives. That e.g. allows slots to guarantee that all the WAL for a base backup will be available afterwards. Logical slots already have to reserve WAL during creation, so generalize that logic into being usable for both physical and logical slots. Author: Gurjeet Singh Reviewed-By: Andres Freund Discussion: CABwTF4Wh_dBCzTU=49pFXR6coR4NW1ynb+vBqT+Po=7fuq5...@mail.gmail.com --- doc/src/sgml/func.sgml | 8 +++- src/backend/catalog/system_views.sql | 7 +++ src/backend/replication/logical/logical.c | 47 +------------------- src/backend/replication/slot.c | 71 +++++++++++++++++++++++++++++++ src/backend/replication/slotfuncs.c | 21 ++++++++- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_proc.h | 2 +- src/include/replication/slot.h | 1 + 8 files changed, 107 insertions(+), 52 deletions(-) diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 4fcc4fe..b1df55d 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -17211,7 +17211,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); <indexterm> <primary>pg_create_physical_replication_slot</primary> </indexterm> - <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type>)</function></literal> + <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type><optional>, <parameter>immediately_reserve</> <type>boolean</> </optional>)</function></literal> </entry> <entry> (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>) @@ -17221,7 +17221,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); <parameter>slot_name</parameter>. Streaming changes from a physical slot is only possible with the streaming-replication protocol - see <xref linkend="protocol-replication">. Corresponds to the replication protocol - command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>. + command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>. The optional + second parameter, when <literal>true</>, specifies that the <acronym>LSN</> + for this replication slot be reserved immediately; the <acronym<LSN</> + is otherwise reserved on first connection from a streaming replication + client. </entry> </row> <row> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 3190c7f..ccc030f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -917,6 +917,13 @@ LANGUAGE INTERNAL VOLATILE ROWS 1000 COST 1000 AS 'pg_logical_slot_peek_binary_changes'; +CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( + IN slot_name name, IN immediately_reserve boolean DEFAULT false, + OUT slot_name name, OUT xlog_position pg_lsn) +RETURNS RECORD +LANGUAGE INTERNAL +AS 'pg_create_physical_replication_slot'; + CREATE OR REPLACE FUNCTION make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0, days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0, diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index d737288..3e84116 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -253,52 +253,7 @@ CreateInitDecodingContext(char *plugin, StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN); SpinLockRelease(&slot->mutex); - /* - * The replication slot mechanism is used to prevent removal of required - * WAL. As there is no interlock between this and checkpoints required WAL - * could be removed before ReplicationSlotsComputeRequiredLSN() has been - * called to prevent that. In the very unlikely case that this happens - * we'll just retry. - */ - while (true) - { - XLogSegNo segno; - - /* - * Let's start with enough information if we can, so log a standby - * snapshot and start decoding at exactly that position. - */ - if (!RecoveryInProgress()) - { - XLogRecPtr flushptr; - - /* start at current insert position */ - slot->data.restart_lsn = GetXLogInsertRecPtr(); - - /* make sure we have enough information to start */ - flushptr = LogStandbySnapshot(); - - /* and make sure it's fsynced to disk */ - XLogFlush(flushptr); - } - else - slot->data.restart_lsn = GetRedoRecPtr(); - - /* prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); - - /* - * If all required WAL is still there, great, otherwise retry. The - * slot should prevent further removal of WAL, unless there's a - * concurrent ReplicationSlotsComputeRequiredLSN() after we've written - * the new restart_lsn above, so normally we should never need to loop - * more than twice. - */ - XLByteToSeg(slot->data.restart_lsn, segno); - if (XLogGetLastRemovedSegno() < segno) - break; - } - + ReplicationSlotReserveWal(); /* ---- * This is a bit tricky: We need to determine a safe xmin horizon to start diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 4d33629..6bd6505 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -40,6 +40,7 @@ #include <sys/stat.h> #include "access/transam.h" +#include "access/xlog_internal.h" #include "common/string.h" #include "miscadmin.h" #include "replication/slot.h" @@ -782,6 +783,76 @@ CheckSlotRequirements(void) } /* + * Reserve WAL for the currently active slot. + * + * Compute and set restart_lsn in a manner that's appropriate for the type of + * the slot and concurrency safe. + */ +void +ReplicationSlotReserveWal(void) +{ + ReplicationSlot *slot = MyReplicationSlot; + + Assert(slot != NULL); + Assert(slot->data.restart_lsn == InvalidXLogRecPtr); + + /* + * The replication slot mechanism is used to prevent removal of required + * WAL. As there is no interlock between this routine and checkpoints, WAL + * segments could concurrently be removed when a now stale return value of + * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that + * this happens we'll just retry. + */ + while (true) + { + XLogSegNo segno; + + /* + * For logical slots log a standby snapshot and start logical decoding + * at exactly that position. That allows the slot to start up more + * quickly. + * + * That's not needed (or indeed helpful) for physical slots as they'll + * start replay at the last logged checkpoint anyway. Instead return + * the location of the last redo LSN. While that slightly increases + * the chance that we have to retry, it's where a base backup has to + * start replay at. + */ + if (!RecoveryInProgress() && SlotIsLogical(slot)) + { + XLogRecPtr flushptr; + + /* start at current insert position */ + slot->data.restart_lsn = GetXLogInsertRecPtr(); + + /* make sure we have enough information to start */ + flushptr = LogStandbySnapshot(); + + /* and make sure it's fsynced to disk */ + XLogFlush(flushptr); + } + else + { + slot->data.restart_lsn = GetRedoRecPtr(); + } + + /* prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); + + /* + * If all required WAL is still there, great, otherwise retry. The + * slot should prevent further removal of WAL, unless there's a + * concurrent ReplicationSlotsComputeRequiredLSN() after we've written + * the new restart_lsn above, so normally we should never need to loop + * more than twice. + */ + XLByteToSeg(slot->data.restart_lsn, segno); + if (XLogGetLastRemovedSegno() < segno) + break; + } +} + +/* * Flush all replication slots to disk. * * This needn't actually be part of a checkpoint, but it's a convenient diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ecfcb07..2dc6827 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -40,6 +40,7 @@ Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS) { Name name = PG_GETARG_NAME(0); + bool immediately_reserve = PG_GETARG_BOOL(1); Datum values[2]; bool nulls[2]; TupleDesc tupdesc; @@ -59,9 +60,25 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT); values[0] = NameGetDatum(&MyReplicationSlot->data.name); - nulls[0] = false; - nulls[1] = true; + + if (immediately_reserve) + { + /* Reserve WAL as the user asked for it */ + ReplicationSlotReserveWal(); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + + values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn); + nulls[1] = false; + } + else + { + values[0] = NameGetDatum(&MyReplicationSlot->data.name); + nulls[1] = true; + } tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 8cd6772..16e9c14 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201508101 +#define CATALOG_VERSION_NO 201508102 #endif diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 5163962..ddf7c67 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5193,7 +5193,7 @@ DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 DESCR("SP-GiST support for quad tree over range"); /* replication slots */ -DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); +DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 367ef0a..20dd7a2 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -166,6 +166,7 @@ extern void ReplicationSlotMarkDirty(void); /* misc stuff */ extern bool ReplicationSlotValidateName(const char *name, int elevel); +extern void ReplicationSlotReserveWal(void); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); -- 2.3.0.149.gf3f4077.dirty
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers