On 04/11/16 13:07, Andres Freund wrote: > > Hm. I think I have to agree a bit with Peter here. Overloading > MyReplicationSlot this way seems ugly, and I think there's a bunch of > bugs around it too. > > > Sounds what we really want is a) two different lifetimes for ephemeral > slots, session and "command" b) have a number of slots that are released > either after a failed transaction / command or at session end. The > easiest way for that appears to have a list of slots to be checked at > end-of-xact and backend shutdown. >
Ok so how about attached? It adds temp slots as new type of persistence. It does not really touch the behavior of any of the existing API or persistence settings. The temp slots are just cleaned up on backend exit or error, other than that they are not special. I don't use any specific backend local list to track them, instead they have active_pid always set and just cleanup everything that has that set at the end of the session. This has nice property that it forbids other backends for acquiring them. It does not do any locking while searching for the slots to cleanup (see ReplicationSlotCleanup), mainly because it complicates the interaction with ReplicationSlotDropPtr and it seems to me that locking there is not really needed there as other backends will never change active_pid to our backend pid and then the ReplicationSlotDropPtr does exclusive lock when resetting it. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From 1cf0aca7f1405f31229ab679c9451b51a8cc18de Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmo...@pjmodos.net> Date: Wed, 28 Sep 2016 23:36:58 +0200 Subject: [PATCH 1/7] Add support for TEMPORARY replication slots This patch allows creating temporary replication slots that are removed automatically at the end of the session or on error. --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/expected/ddl.out | 4 +- contrib/test_decoding/expected/slot.out | 35 ++++++++++++++++ contrib/test_decoding/sql/slot.sql | 13 ++++++ doc/src/sgml/func.sgml | 16 ++++++-- doc/src/sgml/protocol.sgml | 13 +++++- src/backend/catalog/system_views.sql | 11 +++++ src/backend/replication/repl_gram.y | 22 ++++++---- src/backend/replication/repl_scanner.l | 1 + src/backend/replication/slot.c | 72 ++++++++++++++++++++++++++------- src/backend/replication/slotfuncs.c | 24 +++++++---- src/backend/replication/walsender.c | 28 ++++++++----- src/backend/storage/lmgr/proc.c | 3 ++ src/backend/tcop/postgres.c | 3 ++ src/include/catalog/pg_proc.h | 6 +-- src/include/nodes/replnodes.h | 1 + src/include/replication/slot.h | 4 +- src/test/regress/expected/rules.out | 3 +- 18 files changed, 209 insertions(+), 52 deletions(-) create mode 100644 contrib/test_decoding/expected/slot.out create mode 100644 contrib/test_decoding/sql/slot.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index a6641f5..d2bc8b8 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -39,7 +39,7 @@ submake-test_decoding: REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill + spill slot regresscheck: | submake-regress submake-test_decoding temp-install $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 7fbeafd..84ab7d3 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -702,7 +702,7 @@ SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ SELECT * FROM pg_replication_slots; - slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn ------------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------+--------------------- + slot_name | plugin | slot_type | datoid | database | persistent | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn +-----------+--------+-----------+--------+----------+------------+--------+------------+------+--------------+-------------+--------------------- (0 rows) diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out new file mode 100644 index 0000000..28b2f89 --- /dev/null +++ b/contrib/test_decoding/expected/slot.out @@ -0,0 +1,35 @@ +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slotp', 'test_decoding'); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slote', 'test_decoding', true); + ?column? +---------- + init +(1 row) + +SELECT pg_drop_replication_slot('regression_slotp'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slotp', 'test_decoding', false); + ?column? +---------- + init +(1 row) + +-- reconnect to clean temp slots +\c +SELECT pg_drop_replication_slot('regression_slotp'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +-- should fail because the slot was dropped automatically +SELECT pg_drop_replication_slot('regression_slote'); +ERROR: replication slot "regression_slote" does not exist diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql new file mode 100644 index 0000000..839a440 --- /dev/null +++ b/contrib/test_decoding/sql/slot.sql @@ -0,0 +1,13 @@ +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slotp', 'test_decoding'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slote', 'test_decoding', true); + +SELECT pg_drop_replication_slot('regression_slotp'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slotp', 'test_decoding', false); + +-- reconnect to clean temp slots +\c + +SELECT pg_drop_replication_slot('regression_slotp'); + +-- should fail because the slot was dropped automatically +SELECT pg_drop_replication_slot('regression_slote'); diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 2e64cc4..0f37ddc 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -18464,7 +18464,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); <indexterm> <primary>pg_create_physical_replication_slot</primary> </indexterm> - <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type> <optional>, <parameter>immediately_reserve</> <type>boolean</> </optional>)</function></literal> + <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type> <optional>, <parameter>immediately_reserve</> <type>boolean</>, <parameter>temporary</> <type>boolean</></optional>)</function></literal> </entry> <entry> (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>) @@ -18477,7 +18477,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); the <acronym>LSN</> is reserved on first connection from a streaming replication client. Streaming changes from a physical slot is only possible with the streaming-replication protocol — - see <xref linkend="protocol-replication">. This function corresponds + see <xref linkend="protocol-replication">. The optional third + parameter, <parameter>temporary</>, when set to true, specifies that + the slot should not be permanently stored to disk and is only meant + for use by current session. Note that temporary slots are also + released upon any error. This function corresponds to the replication protocol command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>. </entry> @@ -18504,7 +18508,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); <indexterm> <primary>pg_create_logical_replication_slot</primary> </indexterm> - <literal><function>pg_create_logical_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type>)</function></literal> + <literal><function>pg_create_logical_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</> <type>boolean</></optional>)</function></literal> </entry> <entry> (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>) @@ -18512,7 +18516,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); <entry> Creates a new logical (decoding) replication slot named <parameter>slot_name</parameter> using the output plugin - <parameter>plugin</parameter>. A call to this function has the same + <parameter>plugin</parameter>. The optional third + parameter, <parameter>temporary</>, when set to true, specifies that + the slot should not be permanently stored to disk and is only meant + for use by current session. Note that temporary slots are also + released upon any error. A call to this function has the same effect as the replication protocol command <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>. </entry> diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 50cf527..8d9f628 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1434,7 +1434,7 @@ The commands accepted in walsender mode are: </varlistentry> <varlistentry> - <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> } + <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> } <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm> </term> <listitem> @@ -1465,6 +1465,17 @@ The commands accepted in walsender mode are: </varlistentry> <varlistentry> + <term><literal>TEMPORARY</></term> + <listitem> + <para> + Specify that this replication slot is a temporary one. Temporary + slots are not saved to disk and are automatically dropped on error, + or when the session has finished. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><literal>RESERVE_WAL</></term> <listitem> <para> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index ada2142..03e51e0 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -703,6 +703,7 @@ CREATE VIEW pg_replication_slots AS L.slot_type, L.datoid, D.datname AS database, + L.persistent, L.active, L.active_pid, L.xmin, @@ -968,12 +969,22 @@ AS 'pg_logical_slot_peek_binary_changes'; CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( IN slot_name name, IN immediately_reserve boolean DEFAULT false, + IN temporary boolean DEFAULT false, OUT slot_name name, OUT xlog_position pg_lsn) RETURNS RECORD LANGUAGE INTERNAL STRICT VOLATILE AS 'pg_create_physical_replication_slot'; +CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( + IN slot_name name, IN plugin name, + IN temporary boolean DEFAULT false, + OUT slot_name text, OUT xlog_position pg_lsn) +RETURNS RECORD +LANGUAGE INTERNAL +STRICT VOLATILE +AS 'pg_create_logical_replication_slot'; + CREATE OR REPLACE FUNCTION make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0, days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0, diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index fd0fa6d..e75516c 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -77,6 +77,7 @@ Node *replication_parse_result; %token K_LOGICAL %token K_SLOT %token K_RESERVE_WAL +%token K_TEMPORARY %type <node> command %type <node> base_backup start_replication start_logical_replication @@ -89,7 +90,7 @@ Node *replication_parse_result; %type <defelt> plugin_opt_elem %type <node> plugin_opt_arg %type <str> opt_slot -%type <boolval> opt_reserve_wal +%type <boolval> opt_reserve_wal opt_temporary %% @@ -183,24 +184,26 @@ base_backup_opt: ; create_replication_slot: - /* CREATE_REPLICATION_SLOT slot PHYSICAL RESERVE_WAL */ - K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal + /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */ + K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); cmd->kind = REPLICATION_KIND_PHYSICAL; cmd->slotname = $2; - cmd->reserve_wal = $4; + cmd->temporary = $3; + cmd->reserve_wal = $5; $$ = (Node *) cmd; } - /* CREATE_REPLICATION_SLOT slot LOGICAL plugin */ - | K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT + /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ + | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); cmd->kind = REPLICATION_KIND_LOGICAL; cmd->slotname = $2; - cmd->plugin = $4; + cmd->temporary = $3; + cmd->plugin = $5; $$ = (Node *) cmd; } ; @@ -276,6 +279,11 @@ opt_reserve_wal: | /* EMPTY */ { $$ = false; } ; +opt_temporary: + K_TEMPORARY { $$ = true; } + | /* EMPTY */ { $$ = false; } + ; + opt_slot: K_SLOT IDENT { $$ = $2; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index f83ec53..9f50ce6 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -98,6 +98,7 @@ PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } +TEMPORARY { return K_TEMPORARY; } "," { return ','; } ";" { return ';'; } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 0b2575e..9226539 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "storage/fd.h" #include "storage/proc.h" #include "storage/procarray.h" +#include "utils/builtins.h" /* * Replication slot on-disk data structure. @@ -98,7 +99,9 @@ int max_replication_slots = 0; /* the maximum number of replication * slots */ static LWLockTranche ReplSlotIOLWLockTranche; + static void ReplicationSlotDropAcquired(void); +static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); @@ -329,7 +332,7 @@ ReplicationSlotAcquire(const char *name) { ReplicationSlot *slot = NULL; int i; - int active_pid = 0; + int active_pid = 0; /* Keep compiler quiet */ Assert(MyReplicationSlot == NULL); @@ -346,7 +349,7 @@ ReplicationSlotAcquire(const char *name) SpinLockAcquire(&s->mutex); active_pid = s->active_pid; if (active_pid == 0) - s->active_pid = MyProcPid; + active_pid = s->active_pid = MyProcPid; SpinLockRelease(&s->mutex); slot = s; break; @@ -359,7 +362,7 @@ ReplicationSlotAcquire(const char *name) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name))); - if (active_pid != 0) + if (active_pid != MyProcPid) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is active for PID %d", @@ -389,9 +392,12 @@ ReplicationSlotRelease(void) */ ReplicationSlotDropAcquired(); } - else + else if (slot->data.persistency == RS_PERSISTENT) { - /* Mark slot inactive. We're not freeing it, just disconnecting. */ + /* + * Mark persistent slot inactive. We're not freeing it, just + * disconnecting. + */ SpinLockAcquire(&slot->mutex); slot->active_pid = 0; SpinLockRelease(&slot->mutex); @@ -406,11 +412,39 @@ ReplicationSlotRelease(void) } /* + * Cleanup all temporary slots created in current session. + */ +void +ReplicationSlotCleanup() +{ + int i; + + Assert(MyReplicationSlot == NULL); + + /* + * No need for locking as we are only interested in slots active in + * current process and those are not touched by other processes. + */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->active_pid == MyProcPid) + { + Assert(s->in_use && s->data.persistency == RS_TEMPORARY); + + ReplicationSlotDropPtr(s); + } + } +} + +/* * Permanently drop replication slot identified by the passed in name. */ void ReplicationSlotDrop(const char *name) { + Assert(MyReplicationSlot == NULL); ReplicationSlotAcquire(name); @@ -419,21 +453,31 @@ ReplicationSlotDrop(const char *name) } /* - * Permanently drop the currently acquired replication slot which will be - * released by the point this function returns. + * Permanently drop the currently acquired replication slot. */ static void ReplicationSlotDropAcquired(void) { - char path[MAXPGPATH]; - char tmppath[MAXPGPATH]; ReplicationSlot *slot = MyReplicationSlot; - Assert(MyReplicationSlot != NULL); + Assert(MyReplicationSlot); /* slot isn't acquired anymore */ MyReplicationSlot = NULL; + ReplicationSlotDropPtr(slot); +} + +/* + * Permanently drop the replication slot which will be released by the point + * this function returns. + */ +static void +ReplicationSlotDropPtr(ReplicationSlot *slot) +{ + char path[MAXPGPATH]; + char tmppath[MAXPGPATH]; + /* * If some other backend ran this code concurrently with us, we might try * to delete a slot with a certain name while someone else was trying to @@ -448,9 +492,9 @@ ReplicationSlotDropAcquired(void) /* * Rename the slot directory on disk, so that we'll no longer recognize * this as a valid slot. Note that if this fails, we've got to mark the - * slot inactive before bailing out. If we're dropping an ephemeral slot, - * we better never fail hard as the caller won't expect the slot to - * survive and this might get called during error handling. + * slot inactive before bailing out. If we're dropping an ephemeral or + * a temporary slot, we better never fail hard as the caller won't expect + * the slot to survive and this might get called during error handling. */ if (rename(path, tmppath) == 0) { @@ -469,7 +513,7 @@ ReplicationSlotDropAcquired(void) } else { - bool fail_softly = slot->data.persistency == RS_EPHEMERAL; + bool fail_softly = slot->data.persistency != RS_PERSISTENT; SpinLockAcquire(&slot->mutex); slot->active_pid = 0; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index f908761..912bfb7 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -41,6 +41,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) { Name name = PG_GETARG_NAME(0); bool immediately_reserve = PG_GETARG_BOOL(1); + bool temporary = PG_GETARG_BOOL(2); Datum values[2]; bool nulls[2]; TupleDesc tupdesc; @@ -57,7 +58,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); /* acquire replication slot, this will check for conflicting names */ - ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT); + ReplicationSlotCreate(NameStr(*name), false, + temporary ? RS_TEMPORARY : RS_PERSISTENT); values[0] = NameGetDatum(&MyReplicationSlot->data.name); nulls[0] = false; @@ -96,6 +98,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) { Name name = PG_GETARG_NAME(0); Name plugin = PG_GETARG_NAME(1); + bool temporary = PG_GETARG_BOOL(2); LogicalDecodingContext *ctx = NULL; @@ -116,11 +119,14 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) /* * Acquire a logical decoding slot, this will check for conflicting names. - * Initially create it as ephemeral - that allows us to nicely handle - * errors during initialization because it'll get dropped if this + * Initially create persisent slot as ephemeral - that allows us to nicely + * handle errors during initialization because it'll get dropped if this * transaction fails. We'll make it persistent at the end. + * Temporary slots can be created as temporary from beginning as they get + * dropped on error as well. */ - ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL); + ReplicationSlotCreate(NameStr(*name), true, + temporary ? RS_TEMPORARY : RS_EPHEMERAL); /* * Create logical decoding context, to build the initial snapshot. @@ -143,8 +149,9 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); - /* ok, slot is now fully created, mark it as persistent */ - ReplicationSlotPersist(); + /* ok, slot is now fully created, mark it as persistent if needed */ + if (!temporary) + ReplicationSlotPersist(); ReplicationSlotRelease(); PG_RETURN_DATUM(result); @@ -174,7 +181,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 10 +#define PG_GET_REPLICATION_SLOTS_COLS 11 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -219,6 +226,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) Datum values[PG_GET_REPLICATION_SLOTS_COLS]; bool nulls[PG_GET_REPLICATION_SLOTS_COLS]; + ReplicationSlotPersistency persistency; TransactionId xmin; TransactionId catalog_xmin; XLogRecPtr restart_lsn; @@ -246,6 +254,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) namecpy(&plugin, &slot->data.plugin); active_pid = slot->active_pid; + persistency = slot->data.persistency; } SpinLockRelease(&slot->mutex); @@ -269,6 +278,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else values[i++] = database; + values[i++] = BoolGetDatum(persistency == RS_PERSISTENT); values[i++] = BoolGetDatum(active_pid != 0); if (active_pid != 0) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index bc5e508..c9b9db1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -264,6 +264,8 @@ WalSndErrorCleanup(void) if (MyReplicationSlot != NULL) ReplicationSlotRelease(); + ReplicationSlotCleanup(); + replication_active = false; if (walsender_ready_to_stop) proc_exit(0); @@ -794,18 +796,22 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) if (cmd->kind == REPLICATION_KIND_PHYSICAL) { - ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT); + ReplicationSlotCreate(cmd->slotname, false, + cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT); } else { CheckLogicalDecodingRequirements(); /* - * Initially create the slot as ephemeral - that allows us to nicely - * handle errors during initialization because it'll get dropped if - * this transaction fails. We'll make it persistent at the end. + * Initially create persisent slot as ephemeral - that allows us to + * nicely handle errors during initialization because it'll get + * dropped if this transaction fails. We'll make it persistent at the + * end. Temporary slots can be created as temporary from beginning as + * they get dropped on error as well. */ - ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL); + ReplicationSlotCreate(cmd->slotname, true, + cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL); } initStringInfo(&output_message); @@ -839,15 +845,18 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) /* don't need the decoding context anymore */ FreeDecodingContext(ctx); - ReplicationSlotPersist(); + if (!cmd->temporary) + ReplicationSlotPersist(); } else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal) { ReplicationSlotReserveWal(); - /* Write this slot to disk */ ReplicationSlotMarkDirty(); - ReplicationSlotSave(); + + /* Write this slot to disk if it's permanent one. */ + if (!cmd->temporary) + ReplicationSlotSave(); } snprintf(xpos, sizeof(xpos), "%X/%X", @@ -931,9 +940,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) pq_endmessage(&buf); - /* - * release active status again, START_REPLICATION will reacquire it - */ ReplicationSlotRelease(); } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index b201631..c9eef79 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -806,6 +806,9 @@ ProcKill(int code, Datum arg) if (MyReplicationSlot != NULL) ReplicationSlotRelease(); + /* Also cleanup all the temporary slots. */ + ReplicationSlotCleanup(); + /* * Detach from any lock group of which we are a member. If the leader * exist before all other group members, it's PGPROC will remain allocated diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 599874e..d453ec4 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3884,6 +3884,9 @@ PostgresMain(int argc, char *argv[], if (MyReplicationSlot != NULL) ReplicationSlotRelease(); + /* We also want to cleanup temporary slots on error. */ + ReplicationSlotCleanup(); + /* * Now return to normal top-level context and clear ErrorContext for * next time. diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 17ec71d..e2a6585 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5174,13 +5174,13 @@ DATA(insert OID = 5016 ( spg_box_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 f DESCR("SP-GiST support for quad tree over box"); /* replication slots */ -DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); +DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 16 16" "{19,16,16,19,3220}" "{i,i,i,o,o}" "{slot_name,immediately_reserve,temporary,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); -DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); +DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,persistent,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); -DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); +DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); DESCR("set up a logical replication slot"); DATA(insert OID = 3782 ( pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_get_changes _null_ _null_ _null_ )); DESCR("get changes from replication slot"); diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index d2f1edb..024b965 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -55,6 +55,7 @@ typedef struct CreateReplicationSlotCmd char *slotname; ReplicationKind kind; char *plugin; + bool temporary; bool reserve_wal; } CreateReplicationSlotCmd; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index e00562d..b653e5c 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -28,7 +28,8 @@ typedef enum ReplicationSlotPersistency { RS_PERSISTENT, - RS_EPHEMERAL + RS_EPHEMERAL, + RS_TEMPORARY } ReplicationSlotPersistency; /* @@ -165,6 +166,7 @@ extern void ReplicationSlotDrop(const char *name); extern void ReplicationSlotAcquire(const char *name); extern void ReplicationSlotRelease(void); +extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); extern void ReplicationSlotMarkDirty(void); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 00700f2..cff6d9b 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1423,13 +1423,14 @@ pg_replication_slots| SELECT l.slot_name, l.slot_type, l.datoid, d.datname AS database, + l.persistent, l.active, l.active_pid, l.xmin, l.catalog_xmin, l.restart_lsn, l.confirmed_flush_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, persistent, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.7.4
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers