On 04/11/16 13:07, Andres Freund wrote:
> 
> Hm. I think I have to agree a bit with Peter here.  Overloading
> MyReplicationSlot this way seems ugly, and I think there's a bunch of
> bugs around it too.
> 
> 
> Sounds what we really want is a) two different lifetimes for ephemeral
> slots, session and "command" b) have a number of slots that are released
> either after a failed transaction / command or at session end.   The
> easiest way for that appears to have a list of slots to be checked at
> end-of-xact and backend shutdown. 
> 

Ok so how about attached? It adds temp slots as new type of persistence.
It does not really touch the behavior of any of the existing API or
persistence settings.

The temp slots are just cleaned up on backend exit or error, other than
that they are not special. I don't use any specific backend local list
to track them, instead they have active_pid always set and just cleanup
everything that has that set at the end of the session. This has nice
property that it forbids other backends for acquiring them.

It does not do any locking while searching for the slots to cleanup (see
ReplicationSlotCleanup), mainly because it complicates the interaction
with ReplicationSlotDropPtr and it seems to me that locking there is not
really needed there as other backends will never change active_pid to
our backend pid and then the ReplicationSlotDropPtr does exclusive lock
when resetting it.

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From 1cf0aca7f1405f31229ab679c9451b51a8cc18de Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Wed, 28 Sep 2016 23:36:58 +0200
Subject: [PATCH 1/7] Add support for TEMPORARY replication slots

This patch allows creating temporary replication slots that are removed
automatically at the end of the session or on error.
---
 contrib/test_decoding/Makefile          |  2 +-
 contrib/test_decoding/expected/ddl.out  |  4 +-
 contrib/test_decoding/expected/slot.out | 35 ++++++++++++++++
 contrib/test_decoding/sql/slot.sql      | 13 ++++++
 doc/src/sgml/func.sgml                  | 16 ++++++--
 doc/src/sgml/protocol.sgml              | 13 +++++-
 src/backend/catalog/system_views.sql    | 11 +++++
 src/backend/replication/repl_gram.y     | 22 ++++++----
 src/backend/replication/repl_scanner.l  |  1 +
 src/backend/replication/slot.c          | 72 ++++++++++++++++++++++++++-------
 src/backend/replication/slotfuncs.c     | 24 +++++++----
 src/backend/replication/walsender.c     | 28 ++++++++-----
 src/backend/storage/lmgr/proc.c         |  3 ++
 src/backend/tcop/postgres.c             |  3 ++
 src/include/catalog/pg_proc.h           |  6 +--
 src/include/nodes/replnodes.h           |  1 +
 src/include/replication/slot.h          |  4 +-
 src/test/regress/expected/rules.out     |  3 +-
 18 files changed, 209 insertions(+), 52 deletions(-)
 create mode 100644 contrib/test_decoding/expected/slot.out
 create mode 100644 contrib/test_decoding/sql/slot.sql

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a6641f5..d2bc8b8 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -39,7 +39,7 @@ submake-test_decoding:
 
 REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
 	decoding_into_rel binary prepared replorigin time messages \
-	spill
+	spill slot
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 7fbeafd..84ab7d3 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -702,7 +702,7 @@ SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
------------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------+---------------------
+ slot_name | plugin | slot_type | datoid | database | persistent | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
+-----------+--------+-----------+--------+----------+------------+--------+------------+------+--------------+-------------+---------------------
 (0 rows)
 
diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
new file mode 100644
index 0000000..28b2f89
--- /dev/null
+++ b/contrib/test_decoding/expected/slot.out
@@ -0,0 +1,35 @@
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slotp', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slote', 'test_decoding', true);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT pg_drop_replication_slot('regression_slotp');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slotp', 'test_decoding', false);
+ ?column? 
+----------
+ init
+(1 row)
+
+-- reconnect to clean temp slots
+\c
+SELECT pg_drop_replication_slot('regression_slotp');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+-- should fail because the slot was dropped automatically
+SELECT pg_drop_replication_slot('regression_slote');
+ERROR:  replication slot "regression_slote" does not exist
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
new file mode 100644
index 0000000..839a440
--- /dev/null
+++ b/contrib/test_decoding/sql/slot.sql
@@ -0,0 +1,13 @@
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slotp', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slote', 'test_decoding', true);
+
+SELECT pg_drop_replication_slot('regression_slotp');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slotp', 'test_decoding', false);
+
+-- reconnect to clean temp slots
+\c
+
+SELECT pg_drop_replication_slot('regression_slotp');
+
+-- should fail because the slot was dropped automatically
+SELECT pg_drop_replication_slot('regression_slote');
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 2e64cc4..0f37ddc 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -18464,7 +18464,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         <indexterm>
          <primary>pg_create_physical_replication_slot</primary>
         </indexterm>
-        <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type> <optional>, <parameter>immediately_reserve</> <type>boolean</> </optional>)</function></literal>
+        <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type> <optional>, <parameter>immediately_reserve</> <type>boolean</>, <parameter>temporary</> <type>boolean</></optional>)</function></literal>
        </entry>
        <entry>
         (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
@@ -18477,7 +18477,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         the <acronym>LSN</> is reserved on first connection from a streaming
         replication client. Streaming changes from a physical slot is only
         possible with the streaming-replication protocol &mdash;
-        see <xref linkend="protocol-replication">. This function corresponds
+        see <xref linkend="protocol-replication">. The optional third
+        parameter, <parameter>temporary</>, when set to true, specifies that
+        the slot should not be permanently stored to disk and is only meant
+        for use by current session. Note that temporary slots are also
+        released upon any error. This function corresponds
         to the replication protocol command <literal>CREATE_REPLICATION_SLOT
         ... PHYSICAL</literal>.
        </entry>
@@ -18504,7 +18508,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         <indexterm>
          <primary>pg_create_logical_replication_slot</primary>
         </indexterm>
-        <literal><function>pg_create_logical_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type>)</function></literal>
+        <literal><function>pg_create_logical_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</> <type>boolean</></optional>)</function></literal>
        </entry>
        <entry>
         (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
@@ -18512,7 +18516,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
        <entry>
         Creates a new logical (decoding) replication slot named
         <parameter>slot_name</parameter> using the output plugin
-        <parameter>plugin</parameter>.  A call to this function has the same
+        <parameter>plugin</parameter>. The optional third
+        parameter, <parameter>temporary</>, when set to true, specifies that
+        the slot should not be permanently stored to disk and is only meant
+        for use by current session. Note that temporary slots are also
+        released upon any error. A call to this function has the same
         effect as the replication protocol command
         <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
        </entry>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 50cf527..8d9f628 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1434,7 +1434,7 @@ The commands accepted in walsender mode are:
   </varlistentry>
 
   <varlistentry>
-   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> }
+   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> }
      <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
     </term>
     <listitem>
@@ -1465,6 +1465,17 @@ The commands accepted in walsender mode are:
       </varlistentry>
 
       <varlistentry>
+       <term><literal>TEMPORARY</></term>
+       <listitem>
+        <para>
+         Specify that this replication slot is a temporary one. Temporary
+         slots are not saved to disk and are automatically dropped on error,
+         or when the session has finished.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
        <term><literal>RESERVE_WAL</></term>
        <listitem>
         <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index ada2142..03e51e0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -703,6 +703,7 @@ CREATE VIEW pg_replication_slots AS
             L.slot_type,
             L.datoid,
             D.datname AS database,
+            L.persistent,
             L.active,
             L.active_pid,
             L.xmin,
@@ -968,12 +969,22 @@ AS 'pg_logical_slot_peek_binary_changes';
 
 CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
     IN slot_name name, IN immediately_reserve boolean DEFAULT false,
+	IN temporary boolean DEFAULT false,
     OUT slot_name name, OUT xlog_position pg_lsn)
 RETURNS RECORD
 LANGUAGE INTERNAL
 STRICT VOLATILE
 AS 'pg_create_physical_replication_slot';
 
+CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
+    IN slot_name name, IN plugin name,
+	IN temporary boolean DEFAULT false,
+    OUT slot_name text, OUT xlog_position pg_lsn)
+RETURNS RECORD
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_create_logical_replication_slot';
+
 CREATE OR REPLACE FUNCTION
   make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
                 days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index fd0fa6d..e75516c 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -77,6 +77,7 @@ Node *replication_parse_result;
 %token K_LOGICAL
 %token K_SLOT
 %token K_RESERVE_WAL
+%token K_TEMPORARY
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -89,7 +90,7 @@ Node *replication_parse_result;
 %type <defelt>	plugin_opt_elem
 %type <node>	plugin_opt_arg
 %type <str>		opt_slot
-%type <boolval>	opt_reserve_wal
+%type <boolval>	opt_reserve_wal opt_temporary
 
 %%
 
@@ -183,24 +184,26 @@ base_backup_opt:
 			;
 
 create_replication_slot:
-			/* CREATE_REPLICATION_SLOT slot PHYSICAL RESERVE_WAL */
-			K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal
+			/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
+			K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_PHYSICAL;
 					cmd->slotname = $2;
-					cmd->reserve_wal = $4;
+					cmd->temporary = $3;
+					cmd->reserve_wal = $5;
 					$$ = (Node *) cmd;
 				}
-			/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
-			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
+			/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
+			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_LOGICAL;
 					cmd->slotname = $2;
-					cmd->plugin = $4;
+					cmd->temporary = $3;
+					cmd->plugin = $5;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -276,6 +279,11 @@ opt_reserve_wal:
 			| /* EMPTY */					{ $$ = false; }
 			;
 
+opt_temporary:
+			K_TEMPORARY						{ $$ = true; }
+			| /* EMPTY */					{ $$ = false; }
+			;
+
 opt_slot:
 			K_SLOT IDENT
 				{ $$ = $2; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index f83ec53..9f50ce6 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -98,6 +98,7 @@ PHYSICAL			{ return K_PHYSICAL; }
 RESERVE_WAL			{ return K_RESERVE_WAL; }
 LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
+TEMPORARY			{ return K_TEMPORARY; }
 
 ","				{ return ','; }
 ";"				{ return ';'; }
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 0b2575e..9226539 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -47,6 +47,7 @@
 #include "storage/fd.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "utils/builtins.h"
 
 /*
  * Replication slot on-disk data structure.
@@ -98,7 +99,9 @@ int			max_replication_slots = 0;	/* the maximum number of replication
 										 * slots */
 
 static LWLockTranche ReplSlotIOLWLockTranche;
+
 static void ReplicationSlotDropAcquired(void);
+static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
@@ -329,7 +332,7 @@ ReplicationSlotAcquire(const char *name)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
-	int			active_pid = 0;
+	int			active_pid = 0; /* Keep compiler quiet */
 
 	Assert(MyReplicationSlot == NULL);
 
@@ -346,7 +349,7 @@ ReplicationSlotAcquire(const char *name)
 			SpinLockAcquire(&s->mutex);
 			active_pid = s->active_pid;
 			if (active_pid == 0)
-				s->active_pid = MyProcPid;
+				active_pid = s->active_pid = MyProcPid;
 			SpinLockRelease(&s->mutex);
 			slot = s;
 			break;
@@ -359,7 +362,7 @@ ReplicationSlotAcquire(const char *name)
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("replication slot \"%s\" does not exist", name)));
-	if (active_pid != 0)
+	if (active_pid != MyProcPid)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_IN_USE),
 				 errmsg("replication slot \"%s\" is active for PID %d",
@@ -389,9 +392,12 @@ ReplicationSlotRelease(void)
 		 */
 		ReplicationSlotDropAcquired();
 	}
-	else
+	else if (slot->data.persistency == RS_PERSISTENT)
 	{
-		/* Mark slot inactive.  We're not freeing it, just disconnecting. */
+		/*
+		 * Mark persistent slot inactive.  We're not freeing it, just
+		 * disconnecting.
+		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
 		SpinLockRelease(&slot->mutex);
@@ -406,11 +412,39 @@ ReplicationSlotRelease(void)
 }
 
 /*
+ * Cleanup all temporary slots created in current session.
+ */
+void
+ReplicationSlotCleanup()
+{
+	int			i;
+
+	Assert(MyReplicationSlot == NULL);
+
+	/*
+	 * No need for locking as we are only interested in slots active in
+	 * current process and those are not touched by other processes.
+	 */
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->active_pid == MyProcPid)
+		{
+			Assert(s->in_use && s->data.persistency == RS_TEMPORARY);
+
+			ReplicationSlotDropPtr(s);
+		}
+	}
+}
+
+/*
  * Permanently drop replication slot identified by the passed in name.
  */
 void
 ReplicationSlotDrop(const char *name)
 {
+
 	Assert(MyReplicationSlot == NULL);
 
 	ReplicationSlotAcquire(name);
@@ -419,21 +453,31 @@ ReplicationSlotDrop(const char *name)
 }
 
 /*
- * Permanently drop the currently acquired replication slot which will be
- * released by the point this function returns.
+ * Permanently drop the currently acquired replication slot.
  */
 static void
 ReplicationSlotDropAcquired(void)
 {
-	char		path[MAXPGPATH];
-	char		tmppath[MAXPGPATH];
 	ReplicationSlot *slot = MyReplicationSlot;
 
-	Assert(MyReplicationSlot != NULL);
+	Assert(MyReplicationSlot);
 
 	/* slot isn't acquired anymore */
 	MyReplicationSlot = NULL;
 
+	ReplicationSlotDropPtr(slot);
+}
+
+/*
+ * Permanently drop the replication slot which will be released by the point
+ * this function returns.
+ */
+static void
+ReplicationSlotDropPtr(ReplicationSlot *slot)
+{
+	char		path[MAXPGPATH];
+	char		tmppath[MAXPGPATH];
+
 	/*
 	 * If some other backend ran this code concurrently with us, we might try
 	 * to delete a slot with a certain name while someone else was trying to
@@ -448,9 +492,9 @@ ReplicationSlotDropAcquired(void)
 	/*
 	 * Rename the slot directory on disk, so that we'll no longer recognize
 	 * this as a valid slot.  Note that if this fails, we've got to mark the
-	 * slot inactive before bailing out.  If we're dropping an ephemeral slot,
-	 * we better never fail hard as the caller won't expect the slot to
-	 * survive and this might get called during error handling.
+	 * slot inactive before bailing out.  If we're dropping an ephemeral or
+	 * a temporary slot, we better never fail hard as the caller won't expect
+	 * the slot to survive and this might get called during error handling.
 	 */
 	if (rename(path, tmppath) == 0)
 	{
@@ -469,7 +513,7 @@ ReplicationSlotDropAcquired(void)
 	}
 	else
 	{
-		bool		fail_softly = slot->data.persistency == RS_EPHEMERAL;
+		bool		fail_softly = slot->data.persistency != RS_PERSISTENT;
 
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f908761..912bfb7 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -41,6 +41,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 {
 	Name		name = PG_GETARG_NAME(0);
 	bool		immediately_reserve = PG_GETARG_BOOL(1);
+	bool		temporary = PG_GETARG_BOOL(2);
 	Datum		values[2];
 	bool		nulls[2];
 	TupleDesc	tupdesc;
@@ -57,7 +58,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	CheckSlotRequirements();
 
 	/* acquire replication slot, this will check for conflicting names */
-	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
+	ReplicationSlotCreate(NameStr(*name), false,
+						  temporary ? RS_TEMPORARY : RS_PERSISTENT);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	nulls[0] = false;
@@ -96,6 +98,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 {
 	Name		name = PG_GETARG_NAME(0);
 	Name		plugin = PG_GETARG_NAME(1);
+	bool		temporary = PG_GETARG_BOOL(2);
 
 	LogicalDecodingContext *ctx = NULL;
 
@@ -116,11 +119,14 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 
 	/*
 	 * Acquire a logical decoding slot, this will check for conflicting names.
-	 * Initially create it as ephemeral - that allows us to nicely handle
-	 * errors during initialization because it'll get dropped if this
+	 * Initially create persisent slot as ephemeral - that allows us to nicely
+	 * handle errors during initialization because it'll get dropped if this
 	 * transaction fails. We'll make it persistent at the end.
+	 * Temporary slots can be created as temporary from beginning as they get
+	 * dropped on error as well.
 	 */
-	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
+	ReplicationSlotCreate(NameStr(*name), true,
+						  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
 	/*
 	 * Create logical decoding context, to build the initial snapshot.
@@ -143,8 +149,9 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	tuple = heap_form_tuple(tupdesc, values, nulls);
 	result = HeapTupleGetDatum(tuple);
 
-	/* ok, slot is now fully created, mark it as persistent */
-	ReplicationSlotPersist();
+	/* ok, slot is now fully created, mark it as persistent if needed */
+	if (!temporary)
+		ReplicationSlotPersist();
 	ReplicationSlotRelease();
 
 	PG_RETURN_DATUM(result);
@@ -174,7 +181,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 10
+#define PG_GET_REPLICATION_SLOTS_COLS 11
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -219,6 +226,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		Datum		values[PG_GET_REPLICATION_SLOTS_COLS];
 		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
 
+		ReplicationSlotPersistency	persistency;
 		TransactionId xmin;
 		TransactionId catalog_xmin;
 		XLogRecPtr	restart_lsn;
@@ -246,6 +254,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			namecpy(&plugin, &slot->data.plugin);
 
 			active_pid = slot->active_pid;
+			persistency = slot->data.persistency;
 		}
 		SpinLockRelease(&slot->mutex);
 
@@ -269,6 +278,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			values[i++] = database;
 
+		values[i++] = BoolGetDatum(persistency == RS_PERSISTENT);
 		values[i++] = BoolGetDatum(active_pid != 0);
 
 		if (active_pid != 0)
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc5e508..c9b9db1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -264,6 +264,8 @@ WalSndErrorCleanup(void)
 	if (MyReplicationSlot != NULL)
 		ReplicationSlotRelease();
 
+	ReplicationSlotCleanup();
+
 	replication_active = false;
 	if (walsender_ready_to_stop)
 		proc_exit(0);
@@ -794,18 +796,22 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 	{
-		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT);
+		ReplicationSlotCreate(cmd->slotname, false,
+							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
 	}
 	else
 	{
 		CheckLogicalDecodingRequirements();
 
 		/*
-		 * Initially create the slot as ephemeral - that allows us to nicely
-		 * handle errors during initialization because it'll get dropped if
-		 * this transaction fails. We'll make it persistent at the end.
+		 * Initially create persisent slot as ephemeral - that allows us to
+		 * nicely handle errors during initialization because it'll get
+		 * dropped if this transaction fails. We'll make it persistent at the
+		 * end. Temporary slots can be created as temporary from beginning as
+		 * they get dropped on error as well.
 		 */
-		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
+		ReplicationSlotCreate(cmd->slotname, true,
+							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 	}
 
 	initStringInfo(&output_message);
@@ -839,15 +845,18 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		/* don't need the decoding context anymore */
 		FreeDecodingContext(ctx);
 
-		ReplicationSlotPersist();
+		if (!cmd->temporary)
+			ReplicationSlotPersist();
 	}
 	else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal)
 	{
 		ReplicationSlotReserveWal();
 
-		/* Write this slot to disk */
 		ReplicationSlotMarkDirty();
-		ReplicationSlotSave();
+
+		/* Write this slot to disk if it's permanent one. */
+		if (!cmd->temporary)
+			ReplicationSlotSave();
 	}
 
 	snprintf(xpos, sizeof(xpos), "%X/%X",
@@ -931,9 +940,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 	pq_endmessage(&buf);
 
-	/*
-	 * release active status again, START_REPLICATION will reacquire it
-	 */
 	ReplicationSlotRelease();
 }
 
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index b201631..c9eef79 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -806,6 +806,9 @@ ProcKill(int code, Datum arg)
 	if (MyReplicationSlot != NULL)
 		ReplicationSlotRelease();
 
+	/* Also cleanup all the temporary slots. */
+	ReplicationSlotCleanup();
+
 	/*
 	 * Detach from any lock group of which we are a member.  If the leader
 	 * exist before all other group members, it's PGPROC will remain allocated
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 599874e..d453ec4 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3884,6 +3884,9 @@ PostgresMain(int argc, char *argv[],
 		if (MyReplicationSlot != NULL)
 			ReplicationSlotRelease();
 
+		/* We also want to cleanup temporary slots on error. */
+		ReplicationSlotCleanup();
+
 		/*
 		 * Now return to normal top-level context and clear ErrorContext for
 		 * next time.
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 17ec71d..e2a6585 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5174,13 +5174,13 @@ DATA(insert OID = 5016 (  spg_box_quad_leaf_consistent	PGNSP PGUID 12 1 0 0 0 f
 DESCR("SP-GiST support for quad tree over box");
 
 /* replication slots */
-DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 16 16" "{19,16,16,19,3220}" "{i,i,i,o,o}" "{slot_name,immediately_reserve,temporary,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
-DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,persistent,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
-DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
 DESCR("set up a logical replication slot");
 DATA(insert OID = 3782 (  pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_get_changes _null_ _null_ _null_ ));
 DESCR("get changes from replication slot");
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index d2f1edb..024b965 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -55,6 +55,7 @@ typedef struct CreateReplicationSlotCmd
 	char	   *slotname;
 	ReplicationKind kind;
 	char	   *plugin;
+	bool		temporary;
 	bool		reserve_wal;
 } CreateReplicationSlotCmd;
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index e00562d..b653e5c 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -28,7 +28,8 @@
 typedef enum ReplicationSlotPersistency
 {
 	RS_PERSISTENT,
-	RS_EPHEMERAL
+	RS_EPHEMERAL,
+	RS_TEMPORARY
 } ReplicationSlotPersistency;
 
 /*
@@ -165,6 +166,7 @@ extern void ReplicationSlotDrop(const char *name);
 
 extern void ReplicationSlotAcquire(const char *name);
 extern void ReplicationSlotRelease(void);
+extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
 extern void ReplicationSlotMarkDirty(void);
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 00700f2..cff6d9b 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1423,13 +1423,14 @@ pg_replication_slots| SELECT l.slot_name,
     l.slot_type,
     l.datoid,
     d.datname AS database,
+    l.persistent,
     l.active,
     l.active_pid,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
     l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, persistent, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.7.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to