On 2021-Mar-23, Alvaro Herrera wrote:

> I think a possible solution to this problem is that the "detach" flag in
> pg_inherits is not a boolean anymore, but an Xid (or maybe two Xids).
> Not sure exactly which Xid(s) yet, and I'm not sure what are the exact
> rules, but the Xid becomes a marker that indicates an horizon past which
> the partition is no longer visible.  Then, REPEATABLE READ can see the
> partition, but only if its snapshot is older than the Xid.

So a solution to this problem seems similar (but not quite the same) as
pg_index.indcheckxmin: the partition is included in the partition
directory, or not, depending on the pg_inherits tuple visibility for the
active snapshot.  This solves the problem because the RI query uses a
fresh snapshot, for which the partition has already been detached, while
the normal REPEATABLE READ query is using the old snapshot for which the
'detach-pending' row is still seen as in progress.  With this, the weird
hack in a couple of places that needed to check the isolation level is
gone, which makes me a bit more comfortable.

So attached is v9 with this problem solved.

I'll add one more torture test, and if it works correctly I'll push it:
have a cursor in the repeatable read transaction, which can read the
referenced partition table and see the row in the detached partition,
but the RI query must not see that row.  Bonus: the RI query is run from
another cursor that is doing UPDATE WHERE CURRENT OF that cursor.

-- 
Álvaro Herrera                            39°49'30"S 73°17'W
"Estoy de acuerdo contigo en que la verdad absoluta no existe...
El problema es que la mentira sí existe y tu estás mintiendo" (G. Lama)
>From 61ff1c133bb4fa7e81f119e806af9760b2faf0e4 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 13 Jul 2020 20:15:30 -0400
Subject: [PATCH v9 1/2] Let ALTER TABLE exec routines deal with the relation

This means that ATExecCmd relies on AlteredRelationInfo->rel instead of
keeping the relation as a local variable; this is useful if the
subcommand needs to modify the relation internally.  For example, a
subcommand that internally commits its transaction needs this.
---
 src/backend/commands/tablecmds.c | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3349bcfaa7..bf7fd6e8ae 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -157,6 +157,8 @@ typedef struct AlteredTableInfo
 	Oid			relid;			/* Relation to work on */
 	char		relkind;		/* Its relkind */
 	TupleDesc	oldDesc;		/* Pre-modification tuple descriptor */
+	/* Transiently set during Phase 2, normally set to NULL */
+	Relation	rel;
 	/* Information saved by Phase 1 for Phase 2: */
 	List	   *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */
 	/* Information saved by Phases 1/2 for Phase 3: */
@@ -354,7 +356,7 @@ static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 					  AlterTableUtilityContext *context);
 static void ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
 							  AlterTableUtilityContext *context);
-static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
+static void ATExecCmd(List **wqueue, AlteredTableInfo *tab,
 					  AlterTableCmd *cmd, LOCKMODE lockmode, int cur_pass,
 					  AlterTableUtilityContext *context);
 static AlterTableCmd *ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab,
@@ -4569,7 +4571,6 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
 		{
 			AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
 			List	   *subcmds = tab->subcmds[pass];
-			Relation	rel;
 			ListCell   *lcmd;
 
 			if (subcmds == NIL)
@@ -4578,10 +4579,10 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
 			/*
 			 * Appropriate lock was obtained by phase 1, needn't get it again
 			 */
-			rel = relation_open(tab->relid, NoLock);
+			tab->rel = relation_open(tab->relid, NoLock);
 
 			foreach(lcmd, subcmds)
-				ATExecCmd(wqueue, tab, rel,
+				ATExecCmd(wqueue, tab,
 						  castNode(AlterTableCmd, lfirst(lcmd)),
 						  lockmode, pass, context);
 
@@ -4593,7 +4594,11 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
 			if (pass == AT_PASS_ALTER_TYPE)
 				ATPostAlterTypeCleanup(wqueue, tab, lockmode);
 
-			relation_close(rel, NoLock);
+			if (tab->rel)
+			{
+				relation_close(tab->rel, NoLock);
+				tab->rel = NULL;
+			}
 		}
 	}
 
@@ -4619,11 +4624,12 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
  * ATExecCmd: dispatch a subcommand to appropriate execution routine
  */
 static void
-ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ATExecCmd(List **wqueue, AlteredTableInfo *tab,
 		  AlterTableCmd *cmd, LOCKMODE lockmode, int cur_pass,
 		  AlterTableUtilityContext *context)
 {
 	ObjectAddress address = InvalidObjectAddress;
+	Relation	rel = tab->rel;
 
 	switch (cmd->subtype)
 	{
@@ -5730,6 +5736,7 @@ ATGetQueueEntry(List **wqueue, Relation rel)
 	 */
 	tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo));
 	tab->relid = relid;
+	tab->rel = NULL;			/* set later */
 	tab->relkind = rel->rd_rel->relkind;
 	tab->oldDesc = CreateTupleDescCopyConstr(RelationGetDescr(rel));
 	tab->newrelpersistence = RELPERSISTENCE_PERMANENT;
-- 
2.20.1

>From 9529835de0b3c1bde861a83d52e2741e231934a5 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 25 Mar 2021 12:44:08 -0300
Subject: [PATCH v9 2/2] ALTER TABLE ... DETACH CONCURRENTLY

---
 doc/src/sgml/catalogs.sgml                    |  10 +
 doc/src/sgml/ref/alter_table.sgml             |  28 +-
 src/backend/catalog/heap.c                    |  20 +-
 src/backend/catalog/index.c                   |   4 +-
 src/backend/catalog/partition.c               |  38 +-
 src/backend/catalog/pg_inherits.c             | 114 +++-
 src/backend/commands/indexcmds.c              |   6 +-
 src/backend/commands/tablecmds.c              | 569 ++++++++++++++----
 src/backend/commands/trigger.c                |   5 +-
 src/backend/executor/execPartition.c          |  29 +-
 src/backend/nodes/copyfuncs.c                 |   1 +
 src/backend/nodes/equalfuncs.c                |   1 +
 src/backend/optimizer/util/plancat.c          |   8 +-
 src/backend/parser/gram.y                     |  23 +-
 src/backend/partitioning/partbounds.c         |   6 +-
 src/backend/partitioning/partdesc.c           |  21 +-
 src/backend/tcop/utility.c                    |  19 +
 src/backend/utils/adt/ri_triggers.c           |   6 +-
 src/backend/utils/cache/partcache.c           |  12 +-
 src/bin/psql/describe.c                       |  42 +-
 src/include/catalog/partition.h               |   2 +-
 src/include/catalog/pg_inherits.h             |   8 +-
 src/include/nodes/parsenodes.h                |   2 +
 src/include/parser/kwlist.h                   |   1 +
 src/include/partitioning/partdesc.h           |   5 +-
 src/include/utils/snapmgr.h                   |   1 +
 .../detach-partition-concurrently-1.out       | 234 +++++++
 .../detach-partition-concurrently-2.out       |  66 ++
 .../detach-partition-concurrently-3.out       | 282 +++++++++
 .../detach-partition-concurrently-4.out       |  69 +++
 src/test/isolation/isolation_schedule         |   4 +
 .../detach-partition-concurrently-1.spec      |  69 +++
 .../detach-partition-concurrently-2.spec      |  41 ++
 .../detach-partition-concurrently-3.spec      |  66 ++
 .../detach-partition-concurrently-4.spec      |  36 ++
 src/test/modules/delay_execution/Makefile     |   3 +-
 .../expected/partition-removal-1.out          | 175 ++++++
 .../specs/partition-removal-1.spec            |  58 ++
 src/test/regress/expected/alter_table.out     |  29 +
 src/test/regress/sql/alter_table.sql          |  20 +
 40 files changed, 1959 insertions(+), 174 deletions(-)
 create mode 100644 src/test/isolation/expected/detach-partition-concurrently-1.out
 create mode 100644 src/test/isolation/expected/detach-partition-concurrently-2.out
 create mode 100644 src/test/isolation/expected/detach-partition-concurrently-3.out
 create mode 100644 src/test/isolation/expected/detach-partition-concurrently-4.out
 create mode 100644 src/test/isolation/specs/detach-partition-concurrently-1.spec
 create mode 100644 src/test/isolation/specs/detach-partition-concurrently-2.spec
 create mode 100644 src/test/isolation/specs/detach-partition-concurrently-3.spec
 create mode 100644 src/test/isolation/specs/detach-partition-concurrently-4.spec
 create mode 100644 src/test/modules/delay_execution/expected/partition-removal-1.out
 create mode 100644 src/test/modules/delay_execution/specs/partition-removal-1.spec

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index bae4d8cdd3..cd00d9e3bb 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -4497,6 +4497,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        when using declarative partitioning.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>inhdetachpending</structfield> <type>bool</type>
+      </para>
+      <para>
+       <literal>true</literal> for a partition that is in the process of
+       being detached; <literal>false</literal> otherwise.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/ref/alter_table.sgml b/doc/src/sgml/ref/alter_table.sgml
index 0bd0c1a503..6db505af45 100644
--- a/doc/src/sgml/ref/alter_table.sgml
+++ b/doc/src/sgml/ref/alter_table.sgml
@@ -36,7 +36,9 @@ ALTER TABLE ALL IN TABLESPACE <replaceable class="parameter">name</replaceable>
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     ATTACH PARTITION <replaceable class="parameter">partition_name</replaceable> { FOR VALUES <replaceable class="parameter">partition_bound_spec</replaceable> | DEFAULT }
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
-    DETACH PARTITION <replaceable class="parameter">partition_name</replaceable>
+    DETACH PARTITION <replaceable class="parameter">partition_name</replaceable> [ CONCURRENTLY ]
+ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
+    DETACH PARTITION <replaceable class="parameter">partition_name</replaceable> FINALIZE
 
 <phrase>where <replaceable class="parameter">action</replaceable> is one of:</phrase>
 
@@ -954,7 +956,8 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
    </varlistentry>
 
    <varlistentry>
-    <term><literal>DETACH PARTITION</literal> <replaceable class="parameter">partition_name</replaceable></term>
+    <term><literal>DETACH PARTITION</literal> <replaceable class="parameter">partition_name</replaceable> [ { CONCURRENTLY | FINALIZE } ]</term>
+
     <listitem>
      <para>
       This form detaches the specified partition of the target table.  The detached
@@ -962,6 +965,27 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       ties to the table from which it was detached.  Any indexes that were
       attached to the target table's indexes are detached.  Any triggers that
       were created as clones of those in the target table are removed.
+      <literal>SHARE<literal> lock is obtained on any tables that reference
+      this partitioned table in foreign key constraints.
+     </para>
+     <para>
+      If <literal>CONCURRENTLY</literal> is specified, this process runs in two
+      transactions in order to avoid blocking other sessions that might be accessing
+      the partitioned table.  During the first transaction, a
+      <literal>SHARE UPDATE EXCLUSIVE</literal> lock is taken on both parent table and
+      partition, and the partition is marked as undergoing detach;
+      at that point, the transaction is committed and all other transactions using
+      the partitioned table are waited for.
+      Once all those transactions have completed, the second transaction acquires
+      <literal>ACCESS EXCLUSIVE</literal> on the partition, and the detach process
+      completes.
+      <literal>CONCURRENTLY</literal> is not allowed if the
+      partitioned table contains a default partition.
+     </para>
+     <para>
+      If <literal>FINALIZE</literal> is specified, a previous
+      <literal>DETACH CONCURRENTLY</literal> invocation that was cancelled or
+      interrupted is completed.
      </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index d0ec44bb40..9f6303266f 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -1923,7 +1923,12 @@ heap_drop_with_catalog(Oid relid)
 		elog(ERROR, "cache lookup failed for relation %u", relid);
 	if (((Form_pg_class) GETSTRUCT(tuple))->relispartition)
 	{
-		parentOid = get_partition_parent(relid);
+		/*
+		 * We have to lock the parent if the partition is being detached,
+		 * because it's possible that some query still has a partition
+		 * descriptor that includes this partition.
+		 */
+		parentOid = get_partition_parent(relid, true);
 		LockRelationOid(parentOid, AccessExclusiveLock);
 
 		/*
@@ -2559,10 +2564,12 @@ StoreConstraints(Relation rel, List *cooked_constraints, bool is_internal)
  * Returns a list of CookedConstraint nodes that shows the cooked form of
  * the default and constraint expressions added to the relation.
  *
- * NB: caller should have opened rel with AccessExclusiveLock, and should
- * hold that lock till end of transaction.  Also, we assume the caller has
- * done a CommandCounterIncrement if necessary to make the relation's catalog
- * tuples visible.
+ * NB: caller should have opened rel with some self-conflicting lock mode,
+ * and should hold that lock till end of transaction; for normal cases that'll
+ * be AccessExclusiveLock, but if caller knows that the constraint is already
+ * enforced by some other means, it can be ShareUpdateExclusiveLock.  Also, we
+ * assume the caller has done a CommandCounterIncrement if necessary to make
+ * the relation's catalog tuples visible.
  */
 List *
 AddRelationNewConstraints(Relation rel,
@@ -3831,7 +3838,8 @@ StorePartitionBound(Relation rel, Relation parent, PartitionBoundSpec *bound)
 	 * relcache entry for that partition every time a partition is added or
 	 * removed.
 	 */
-	defaultPartOid = get_default_oid_from_partdesc(RelationGetPartitionDesc(parent));
+	defaultPartOid =
+		get_default_oid_from_partdesc(RelationGetPartitionDesc(parent, false));
 	if (OidIsValid(defaultPartOid))
 		CacheInvalidateRelcacheByRelid(defaultPartOid);
 
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 397d70d226..af30840c04 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -1837,7 +1837,7 @@ index_concurrently_swap(Oid newIndexId, Oid oldIndexId, const char *oldName)
 		List	   *ancestors = get_partition_ancestors(oldIndexId);
 		Oid			parentIndexRelid = linitial_oid(ancestors);
 
-		DeleteInheritsTuple(oldIndexId, parentIndexRelid);
+		DeleteInheritsTuple(oldIndexId, parentIndexRelid, false, NULL);
 		StoreSingleInheritance(newIndexId, parentIndexRelid, 1);
 
 		list_free(ancestors);
@@ -2487,7 +2487,7 @@ index_drop(Oid indexId, bool concurrent, bool concurrent_lock_mode)
 	/*
 	 * fix INHERITS relation
 	 */
-	DeleteInheritsTuple(indexId, InvalidOid);
+	DeleteInheritsTuple(indexId, InvalidOid, false, NULL);
 
 	/*
 	 * We are presently too lazy to attempt to compute the new correct value
diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c
index af7754d6ab..790f4ccb92 100644
--- a/src/backend/catalog/partition.c
+++ b/src/backend/catalog/partition.c
@@ -32,7 +32,8 @@
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
-static Oid	get_partition_parent_worker(Relation inhRel, Oid relid);
+static Oid	get_partition_parent_worker(Relation inhRel, Oid relid,
+										bool *detach_pending);
 static void get_partition_ancestors_worker(Relation inhRel, Oid relid,
 										   List **ancestors);
 
@@ -42,23 +43,32 @@ static void get_partition_ancestors_worker(Relation inhRel, Oid relid,
  *
  * Returns inheritance parent of a partition by scanning pg_inherits
  *
+ * If the partition is in the process of being detached, an error is thrown,
+ * unless even_if_detached is passed as true.
+ *
  * Note: Because this function assumes that the relation whose OID is passed
  * as an argument will have precisely one parent, it should only be called
  * when it is known that the relation is a partition.
  */
 Oid
-get_partition_parent(Oid relid)
+get_partition_parent(Oid relid, bool even_if_detached)
 {
 	Relation	catalogRelation;
 	Oid			result;
+	bool		detach_pending;
 
 	catalogRelation = table_open(InheritsRelationId, AccessShareLock);
 
-	result = get_partition_parent_worker(catalogRelation, relid);
+	result = get_partition_parent_worker(catalogRelation, relid,
+										 &detach_pending);
 
 	if (!OidIsValid(result))
 		elog(ERROR, "could not find tuple for parent of relation %u", relid);
 
+	if (detach_pending && !even_if_detached)
+		elog(ERROR, "relation %u has no parent because it's being detached",
+			 relid);
+
 	table_close(catalogRelation, AccessShareLock);
 
 	return result;
@@ -68,15 +78,20 @@ get_partition_parent(Oid relid)
  * get_partition_parent_worker
  *		Scan the pg_inherits relation to return the OID of the parent of the
  *		given relation
+ *
+ * If the partition is being detached, *detach_pending is set true (but the
+ * original parent is still returned.)
  */
 static Oid
-get_partition_parent_worker(Relation inhRel, Oid relid)
+get_partition_parent_worker(Relation inhRel, Oid relid, bool *detach_pending)
 {
 	SysScanDesc scan;
 	ScanKeyData key[2];
 	Oid			result = InvalidOid;
 	HeapTuple	tuple;
 
+	*detach_pending = false;
+
 	ScanKeyInit(&key[0],
 				Anum_pg_inherits_inhrelid,
 				BTEqualStrategyNumber, F_OIDEQ,
@@ -93,6 +108,9 @@ get_partition_parent_worker(Relation inhRel, Oid relid)
 	{
 		Form_pg_inherits form = (Form_pg_inherits) GETSTRUCT(tuple);
 
+		/* Let caller know of partition being detached */
+		if (form->inhdetachpending)
+			*detach_pending = true;
 		result = form->inhparent;
 	}
 
@@ -134,10 +152,14 @@ static void
 get_partition_ancestors_worker(Relation inhRel, Oid relid, List **ancestors)
 {
 	Oid			parentOid;
+	bool		detach_pending;
 
-	/* Recursion ends at the topmost level, ie., when there's no parent */
-	parentOid = get_partition_parent_worker(inhRel, relid);
-	if (parentOid == InvalidOid)
+	/*
+	 * Recursion ends at the topmost level, ie., when there's no parent; also
+	 * when the partition is being detached.
+	 */
+	parentOid = get_partition_parent_worker(inhRel, relid, &detach_pending);
+	if (parentOid == InvalidOid || detach_pending)
 		return;
 
 	*ancestors = lappend_oid(*ancestors, parentOid);
@@ -170,7 +192,7 @@ index_get_partition(Relation partition, Oid indexId)
 		ReleaseSysCache(tup);
 		if (!ispartition)
 			continue;
-		if (get_partition_parent(partIdx) == indexId)
+		if (get_partition_parent(partIdx, false) == indexId)
 		{
 			list_free(idxlist);
 			return partIdx;
diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c
index 5ab7902827..bedee069be 100644
--- a/src/backend/catalog/pg_inherits.c
+++ b/src/backend/catalog/pg_inherits.c
@@ -29,6 +29,7 @@
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/memutils.h"
+#include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
 /*
@@ -50,9 +51,14 @@ typedef struct SeenRelsEntry
  * given rel; caller should already have locked it).  If lockmode is NoLock
  * then no locks are acquired, but caller must beware of race conditions
  * against possible DROPs of child relations.
+ *
+ * include_detached says to include all partitions, even if they're marked
+ * detached.  Passing it as false means they might or might not be included,
+ * depending on the visibility of the pg_inherits row for the active snapshot.
  */
 List *
-find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
+find_inheritance_children(Oid parentrelId, bool include_detached,
+						  LOCKMODE lockmode)
 {
 	List	   *list = NIL;
 	Relation	relation;
@@ -91,6 +97,30 @@ find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
 
 	while ((inheritsTuple = systable_getnext(scan)) != NULL)
 	{
+		/*
+		 * Cope with partitions concurrently being detached.  When we see a
+		 * partition marked "detach pending", we only include it in the set of
+		 * visible partitions if caller requested all detached partitions, or
+		 * if its pg_inherits tuple's xmin is still visible to the active
+		 * snapshot.
+		 *
+		 * The reason for this check is that we want to avoid seeing the
+		 * partition as alive in RI queries during REPEATABLE READ or
+		 * SERIALIZABLE transactions.
+		 */
+		if (((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhdetachpending &&
+			!include_detached)
+		{
+			TransactionId xmin;
+			Snapshot snap;
+
+			xmin = HeapTupleHeaderGetXmin(inheritsTuple->t_data);
+			snap = GetActiveSnapshot();
+
+			if (!XidInMVCCSnapshot(xmin, snap))
+				continue;
+		}
+
 		inhrelid = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhrelid;
 		if (numoids >= maxoids)
 		{
@@ -160,6 +190,9 @@ find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
  * given rel; caller should already have locked it).  If lockmode is NoLock
  * then no locks are acquired, but caller must beware of race conditions
  * against possible DROPs of child relations.
+ *
+ * NB - No current callers of this routine are interested in children being
+ * concurrently detached, so there's no provision to include them.
  */
 List *
 find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
@@ -199,7 +232,8 @@ find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
 		ListCell   *lc;
 
 		/* Get the direct children of this rel */
-		currentchildren = find_inheritance_children(currentrel, lockmode);
+		currentchildren = find_inheritance_children(currentrel, false,
+													lockmode);
 
 		/*
 		 * Add to the queue only those children not already seen. This avoids
@@ -430,6 +464,7 @@ StoreSingleInheritance(Oid relationId, Oid parentOid, int32 seqNumber)
 	values[Anum_pg_inherits_inhrelid - 1] = ObjectIdGetDatum(relationId);
 	values[Anum_pg_inherits_inhparent - 1] = ObjectIdGetDatum(parentOid);
 	values[Anum_pg_inherits_inhseqno - 1] = Int32GetDatum(seqNumber);
+	values[Anum_pg_inherits_inhdetachpending - 1] = BoolGetDatum(false);
 
 	memset(nulls, 0, sizeof(nulls));
 
@@ -449,10 +484,17 @@ StoreSingleInheritance(Oid relationId, Oid parentOid, int32 seqNumber)
  * as InvalidOid, in which case all tuples matching inhrelid are deleted;
  * otherwise only delete tuples with the specified inhparent.
  *
+ * expect_detach_pending is the expected state of the inhdetachpending flag.
+ * If the catalog row does not match that state, an error is raised.
+ *
+ * childname is the partition name, if a table; pass NULL for regular
+ * inheritance or when working with other relation kinds.
+ *
  * Returns whether at least one row was deleted.
  */
 bool
-DeleteInheritsTuple(Oid inhrelid, Oid inhparent)
+DeleteInheritsTuple(Oid inhrelid, Oid inhparent, bool expect_detach_pending,
+					const char *childname)
 {
 	bool		found = false;
 	Relation	catalogRelation;
@@ -479,6 +521,29 @@ DeleteInheritsTuple(Oid inhrelid, Oid inhparent)
 		parent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
 		if (!OidIsValid(inhparent) || parent == inhparent)
 		{
+			bool	detach_pending;
+
+			detach_pending =
+				((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhdetachpending;
+
+			/*
+			 * Raise error depending on state.  This should only happen for
+			 * partitions, but we have no way to cross-check.
+			 */
+			if (detach_pending && !expect_detach_pending)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("cannot detach partition \"%s\"",
+								childname ? childname : "unknown relation"),
+						 errdetail("The partition is being detached concurrently or has an unfinished detach."),
+						 errhint("Use ALTER TABLE ... DETACH PARTITION ... FINALIZE to complete the pending detach operation")));
+			if (!detach_pending && expect_detach_pending)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("cannot complete detaching partition \"%s\"",
+								childname ? childname : "unknown relation"),
+						 errdetail("There's no pending concurrent detach.")));
+
 			CatalogTupleDelete(catalogRelation, &inheritsTuple->t_self);
 			found = true;
 		}
@@ -490,3 +555,46 @@ DeleteInheritsTuple(Oid inhrelid, Oid inhparent)
 
 	return found;
 }
+
+/*
+ * Return whether the pg_inherits tuple for a partition has the "detach
+ * pending" flag set.
+ */
+bool
+PartitionHasPendingDetach(Oid partoid)
+{
+	Relation	catalogRelation;
+	ScanKeyData key;
+	SysScanDesc scan;
+	HeapTuple	inheritsTuple;
+
+	/* We don't have a good way to verify it is in fact a partition */
+
+	/*
+	 * Find the pg_inherits entry by inhrelid.  (There should only be one.)
+	 */
+	catalogRelation = table_open(InheritsRelationId, RowExclusiveLock);
+	ScanKeyInit(&key,
+				Anum_pg_inherits_inhrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(partoid));
+	scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
+							  true, NULL, 1, &key);
+
+	while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
+	{
+		bool	detached;
+
+		detached =
+			((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhdetachpending;
+
+		/* Done */
+		systable_endscan(scan);
+		table_close(catalogRelation, RowExclusiveLock);
+
+		return detached;
+	}
+
+	elog(ERROR, "relation %u is not a partition", partoid);
+	return false;				/* keep compiler quiet */
+}
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index e4e1bbb7e0..166374cc0c 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -410,7 +410,7 @@ CompareOpclassOptions(Datum *opts1, Datum *opts2, int natts)
  * GetCurrentVirtualXIDs.  If, during any iteration, a particular vxid
  * doesn't show up in the output, we know we can forget about it.
  */
-static void
+void
 WaitForOlderSnapshots(TransactionId limitXmin, bool progress)
 {
 	int			n_old_snapshots;
@@ -1123,7 +1123,7 @@ DefineIndex(Oid relationId,
 	 */
 	if (partitioned && stmt->relation && !stmt->relation->inh)
 	{
-		PartitionDesc pd = RelationGetPartitionDesc(rel);
+		PartitionDesc pd = RelationGetPartitionDesc(rel, false);
 
 		if (pd->nparts != 0)
 			flags |= INDEX_CREATE_INVALID;
@@ -1180,7 +1180,7 @@ DefineIndex(Oid relationId,
 		 *
 		 * If we're called internally (no stmt->relation), recurse always.
 		 */
-		partdesc = RelationGetPartitionDesc(rel);
+		partdesc = RelationGetPartitionDesc(rel, false);
 		if ((!stmt->relation || stmt->relation->inh) && partdesc->nparts > 0)
 		{
 			int			nparts = partdesc->nparts;
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index bf7fd6e8ae..0966636fb2 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -544,7 +544,8 @@ static PartitionSpec *transformPartitionSpec(Relation rel, PartitionSpec *partsp
 static void ComputePartitionAttrs(ParseState *pstate, Relation rel, List *partParams, AttrNumber *partattrs,
 								  List **partexprs, Oid *partopclass, Oid *partcollation, char strategy);
 static void CreateInheritance(Relation child_rel, Relation parent_rel);
-static void RemoveInheritance(Relation child_rel, Relation parent_rel);
+static void RemoveInheritance(Relation child_rel, Relation parent_rel,
+							  bool allow_detached);
 static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel,
 										   PartitionCmd *cmd,
 										   AlterTableUtilityContext *context);
@@ -553,8 +554,14 @@ static void QueuePartitionConstraintValidation(List **wqueue, Relation scanrel,
 											   List *partConstraint,
 											   bool validate_default);
 static void CloneRowTriggersToPartition(Relation parent, Relation partition);
+static void DetachAddConstraintIfNeeded(List **wqueue, Relation partRel);
 static void DropClonedTriggersFromPartition(Oid partitionId);
-static ObjectAddress ATExecDetachPartition(Relation rel, RangeVar *name);
+static ObjectAddress ATExecDetachPartition(List **wqueue, AlteredTableInfo *tab,
+										   Relation rel, RangeVar *name,
+										   bool concurrent);
+static void DetachPartitionFinalize(Relation rel, Relation partRel,
+									bool concurrent, Oid defaultPartOid, bool wait);
+static ObjectAddress ATExecDetachPartitionFinalize(Relation rel, RangeVar *name);
 static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation rel,
 											  RangeVar *name);
 static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl);
@@ -1002,7 +1009,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 		 * lock the partition so as to avoid a deadlock.
 		 */
 		defaultPartOid =
-			get_default_oid_from_partdesc(RelationGetPartitionDesc(parent));
+			get_default_oid_from_partdesc(RelationGetPartitionDesc(parent,
+																   false));
 		if (OidIsValid(defaultPartOid))
 			defaultRel = table_open(defaultPartOid, AccessExclusiveLock);
 
@@ -1555,7 +1563,7 @@ RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid,
 	 */
 	if (is_partition && relOid != oldRelOid)
 	{
-		state->partParentOid = get_partition_parent(relOid);
+		state->partParentOid = get_partition_parent(relOid, true);
 		if (OidIsValid(state->partParentOid))
 			LockRelationOid(state->partParentOid, AccessExclusiveLock);
 	}
@@ -3315,7 +3323,7 @@ renameatt_internal(Oid myrelid,
 		 * expected_parents will only be 0 if we are not already recursing.
 		 */
 		if (expected_parents == 0 &&
-			find_inheritance_children(myrelid, NoLock) != NIL)
+			find_inheritance_children(myrelid, false, NoLock) != NIL)
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 					 errmsg("inherited column \"%s\" must be renamed in child tables too",
@@ -3514,7 +3522,7 @@ rename_constraint_internal(Oid myrelid,
 		else
 		{
 			if (expected_parents == 0 &&
-				find_inheritance_children(myrelid, NoLock) != NIL)
+				find_inheritance_children(myrelid, false, NoLock) != NIL)
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 						 errmsg("inherited constraint \"%s\" must be renamed in child tables too",
@@ -4134,7 +4142,14 @@ AlterTableGetLockLevel(List *cmds)
 				break;
 
 			case AT_DetachPartition:
-				cmd_lockmode = AccessExclusiveLock;
+				if (((PartitionCmd *) cmd->def)->concurrent)
+					cmd_lockmode = ShareUpdateExclusiveLock;
+				else
+					cmd_lockmode = AccessExclusiveLock;
+				break;
+
+			case AT_DetachPartitionFinalize:
+				cmd_lockmode = ShareUpdateExclusiveLock;
 				break;
 
 			case AT_CheckNotNull:
@@ -4218,6 +4233,19 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 	/* Find or create work queue entry for this table */
 	tab = ATGetQueueEntry(wqueue, rel);
 
+	/*
+	 * Disallow any ALTER TABLE other than ALTER TABLE DETACH FINALIZE on
+	 * partitions that are pending detach.
+	 */
+	if (rel->rd_rel->relispartition &&
+		cmd->subtype != AT_DetachPartitionFinalize &&
+		PartitionHasPendingDetach(RelationGetRelid(rel)))
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("cannot alter partition \"%s\" with an incomplete detach",
+					   RelationGetRelationName(rel)),
+				errhint("Use ALTER TABLE ... DETACH PARTITION ... FINALIZE to complete the pending detach operation."));
+
 	/*
 	 * Copy the original subcommand for each table.  This avoids conflicts
 	 * when different child tables need to make different parse
@@ -4531,6 +4559,11 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 			/* No command-specific prep needed */
 			pass = AT_PASS_MISC;
 			break;
+		case AT_DetachPartitionFinalize:
+			ATSimplePermissions(rel, ATT_TABLE);
+			/* No command-specific prep needed */
+			pass = AT_PASS_MISC;
+			break;
 		default:				/* oops */
 			elog(ERROR, "unrecognized alter table type: %d",
 				 (int) cmd->subtype);
@@ -4920,7 +4953,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab,
 			Assert(cmd != NULL);
 			/* ATPrepCmd ensures it must be a table */
 			Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
-			ATExecDetachPartition(rel, ((PartitionCmd *) cmd->def)->name);
+			ATExecDetachPartition(wqueue, tab, rel,
+								  ((PartitionCmd *) cmd->def)->name,
+								  ((PartitionCmd *) cmd->def)->concurrent);
+			break;
+		case AT_DetachPartitionFinalize:
+			ATExecDetachPartitionFinalize(rel, ((PartitionCmd *) cmd->def)->name);
 			break;
 		case AT_AlterCollationRefreshVersion:
 			/* ATPrepCmd ensured it must be an index */
@@ -6352,7 +6390,7 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	 */
 	if (colDef->identity &&
 		recurse &&
-		find_inheritance_children(myrelid, NoLock) != NIL)
+		find_inheritance_children(myrelid, false, NoLock) != NIL)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 				 errmsg("cannot recursively add identity column to table that has child tables")));
@@ -6597,7 +6635,8 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	 * routines, we have to do this one level of recursion at a time; we can't
 	 * use find_all_inheritors to do it in one pass.
 	 */
-	children = find_inheritance_children(RelationGetRelid(rel), lockmode);
+	children =
+		find_inheritance_children(RelationGetRelid(rel), false, lockmode);
 
 	/*
 	 * If we are told not to recurse, there had better not be any child
@@ -6751,7 +6790,7 @@ ATPrepDropNotNull(Relation rel, bool recurse, bool recursing)
 	 */
 	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionDesc partdesc = RelationGetPartitionDesc(rel);
+		PartitionDesc partdesc = RelationGetPartitionDesc(rel, false);
 
 		Assert(partdesc != NULL);
 		if (partdesc->nparts > 0 && !recurse && !recursing)
@@ -6850,7 +6889,7 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
 	/* If rel is partition, shouldn't drop NOT NULL if parent has the same */
 	if (rel->rd_rel->relispartition)
 	{
-		Oid			parentId = get_partition_parent(RelationGetRelid(rel));
+		Oid			parentId = get_partition_parent(RelationGetRelid(rel), false);
 		Relation	parent = table_open(parentId, AccessShareLock);
 		TupleDesc	tupDesc = RelationGetDescr(parent);
 		AttrNumber	parent_attnum;
@@ -7460,7 +7499,7 @@ ATPrepDropExpression(Relation rel, AlterTableCmd *cmd, bool recurse, bool recurs
 	 * resulting state can be properly dumped and restored.
 	 */
 	if (!recurse &&
-		find_inheritance_children(RelationGetRelid(rel), lockmode))
+		find_inheritance_children(RelationGetRelid(rel), false, lockmode))
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("ALTER TABLE / DROP EXPRESSION must be applied to child tables too")));
@@ -8067,7 +8106,8 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 	 * routines, we have to do this one level of recursion at a time; we can't
 	 * use find_all_inheritors to do it in one pass.
 	 */
-	children = find_inheritance_children(RelationGetRelid(rel), lockmode);
+	children =
+		find_inheritance_children(RelationGetRelid(rel), false, lockmode);
 
 	if (children)
 	{
@@ -8531,7 +8571,8 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	 * routines, we have to do this one level of recursion at a time; we can't
 	 * use find_all_inheritors to do it in one pass.
 	 */
-	children = find_inheritance_children(RelationGetRelid(rel), lockmode);
+	children =
+		find_inheritance_children(RelationGetRelid(rel), false, lockmode);
 
 	/*
 	 * Check if ONLY was specified with ALTER TABLE.  If so, allow the
@@ -9146,7 +9187,7 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
 	 */
 	if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionDesc pd = RelationGetPartitionDesc(pkrel);
+		PartitionDesc pd = RelationGetPartitionDesc(pkrel, false);
 
 		for (int i = 0; i < pd->nparts; i++)
 		{
@@ -9280,7 +9321,7 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
 	}
 	else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionDesc pd = RelationGetPartitionDesc(rel);
+		PartitionDesc pd = RelationGetPartitionDesc(rel, false);
 
 		/*
 		 * Recurse to take appropriate action on each partition; either we
@@ -11064,7 +11105,8 @@ ATExecDropConstraint(Relation rel, const char *constrName,
 	 * use find_all_inheritors to do it in one pass.
 	 */
 	if (!is_no_inherit_constraint)
-		children = find_inheritance_children(RelationGetRelid(rel), lockmode);
+		children =
+			find_inheritance_children(RelationGetRelid(rel), false, lockmode);
 	else
 		children = NIL;
 
@@ -11448,7 +11490,8 @@ ATPrepAlterColumnType(List **wqueue,
 		}
 	}
 	else if (!recursing &&
-			 find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL)
+			 find_inheritance_children(RelationGetRelid(rel), false,
+									   NoLock) != NIL)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 				 errmsg("type of inherited column \"%s\" must be changed in child tables too",
@@ -14286,7 +14329,7 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
 	 */
 
 	/* Off to RemoveInheritance() where most of the work happens */
-	RemoveInheritance(rel, parent_rel);
+	RemoveInheritance(rel, parent_rel, false);
 
 	ObjectAddressSet(address, RelationRelationId,
 					 RelationGetRelid(parent_rel));
@@ -14297,12 +14340,72 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
 	return address;
 }
 
+/*
+ * MarkInheritDetached
+ *
+ * Set inhdetachpending for a partition, for ATExecDetachPartition
+ * in concurrent mode.
+ */
+static void
+MarkInheritDetached(Relation child_rel, Relation parent_rel)
+{
+	Relation	catalogRelation;
+	SysScanDesc	scan;
+	ScanKeyData key;
+	HeapTuple	inheritsTuple;
+	bool		found = false;
+
+	Assert(child_rel->rd_rel->relkind == RELKIND_RELATION ||
+		   child_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+	Assert(parent_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+
+	/*
+	 * Find pg_inherits entries by inhrelid.
+	 */
+	catalogRelation = table_open(InheritsRelationId, RowExclusiveLock);
+	ScanKeyInit(&key,
+				Anum_pg_inherits_inhrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(RelationGetRelid(child_rel)));
+	scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
+							  true, NULL, 1, &key);
+
+	while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
+	{
+		HeapTuple	newtup;
+
+		if (((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent !=
+			RelationGetRelid(parent_rel))
+			elog(ERROR, "bad parent tuple found for partition %u",
+				 RelationGetRelid(child_rel));
+
+		newtup = heap_copytuple(inheritsTuple);
+		((Form_pg_inherits) GETSTRUCT(newtup))->inhdetachpending = true;
+
+		CatalogTupleUpdate(catalogRelation,
+						   &inheritsTuple->t_self,
+						   newtup);
+		found = true;
+	}
+
+	/* Done */
+	systable_endscan(scan);
+	table_close(catalogRelation, RowExclusiveLock);
+
+	if (!found)
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_TABLE),
+				 errmsg("relation \"%s\" is not a partition of relation \"%s\"",
+						RelationGetRelationName(child_rel),
+						RelationGetRelationName(parent_rel))));
+}
+
 /*
  * RemoveInheritance
  *
  * Drop a parent from the child's parents. This just adjusts the attinhcount
  * and attislocal of the columns and removes the pg_inherit and pg_depend
- * entries.
+ * entries.  expect_detached is passed down to DeleteInheritsTuple, q.v..
  *
  * If attinhcount goes to 0 then attislocal gets set to true. If it goes back
  * up attislocal stays true, which means if a child is ever removed from a
@@ -14316,7 +14419,7 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
  * Common to ATExecDropInherit() and ATExecDetachPartition().
  */
 static void
-RemoveInheritance(Relation child_rel, Relation parent_rel)
+RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
 {
 	Relation	catalogRelation;
 	SysScanDesc scan;
@@ -14332,7 +14435,9 @@ RemoveInheritance(Relation child_rel, Relation parent_rel)
 		child_is_partition = true;
 
 	found = DeleteInheritsTuple(RelationGetRelid(child_rel),
-								RelationGetRelid(parent_rel));
+								RelationGetRelid(parent_rel),
+								expect_detached,
+								RelationGetRelationName(child_rel));
 	if (!found)
 	{
 		if (child_is_partition)
@@ -16498,7 +16603,7 @@ QueuePartitionConstraintValidation(List **wqueue, Relation scanrel,
 	}
 	else if (scanrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionDesc partdesc = RelationGetPartitionDesc(scanrel);
+		PartitionDesc partdesc = RelationGetPartitionDesc(scanrel, false);
 		int			i;
 
 		for (i = 0; i < partdesc->nparts; i++)
@@ -16558,7 +16663,7 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd,
 	 * new partition will change its partition constraint.
 	 */
 	defaultPartOid =
-		get_default_oid_from_partdesc(RelationGetPartitionDesc(rel));
+		get_default_oid_from_partdesc(RelationGetPartitionDesc(rel, false));
 	if (OidIsValid(defaultPartOid))
 		LockRelationOid(defaultPartOid, AccessExclusiveLock);
 
@@ -17147,105 +17252,229 @@ CloneRowTriggersToPartition(Relation parent, Relation partition)
  * ALTER TABLE DETACH PARTITION
  *
  * Return the address of the relation that is no longer a partition of rel.
+ *
+ * If concurrent mode is requested, we run in two transactions.  A side-
+ * effect is that this command cannot run in a multi-part ALTER TABLE.
+ * Currently, that's enforced by the grammar.
+ *
+ * The strategy for concurrency is to first modify the partition's
+ * pg_inherit catalog row to make it visible to everyone that the
+ * partition is detached, lock the partition against writes, and commit
+ * the transaction; anyone who requests the partition descriptor from
+ * that point onwards has to ignore such a partition.  In a second
+ * transaction, we wait until all transactions that could have seen the
+ * partition as attached are gone, then we remove the rest of partition
+ * metadata (pg_inherits and pg_class.relpartbounds).
  */
 static ObjectAddress
-ATExecDetachPartition(Relation rel, RangeVar *name)
+ATExecDetachPartition(List **wqueue, AlteredTableInfo *tab, Relation rel,
+					  RangeVar *name, bool concurrent)
 {
-	Relation	partRel,
-				classRel;
-	HeapTuple	tuple,
-				newtuple;
-	Datum		new_val[Natts_pg_class];
-	bool		new_null[Natts_pg_class],
-				new_repl[Natts_pg_class];
+	Relation	partRel;
 	ObjectAddress address;
 	Oid			defaultPartOid;
-	List	   *indexes;
-	List	   *fks;
-	ListCell   *cell;
 
 	/*
 	 * We must lock the default partition, because detaching this partition
 	 * will change its partition constraint.
 	 */
 	defaultPartOid =
-		get_default_oid_from_partdesc(RelationGetPartitionDesc(rel));
-	if (OidIsValid(defaultPartOid))
-		LockRelationOid(defaultPartOid, AccessExclusiveLock);
-
-	partRel = table_openrv(name, ShareUpdateExclusiveLock);
-
-	/* Ensure that foreign keys still hold after this detach */
-	ATDetachCheckNoForeignKeyRefs(partRel);
-
-	/* All inheritance related checks are performed within the function */
-	RemoveInheritance(partRel, rel);
-
-	/* Update pg_class tuple */
-	classRel = table_open(RelationRelationId, RowExclusiveLock);
-	tuple = SearchSysCacheCopy1(RELOID,
-								ObjectIdGetDatum(RelationGetRelid(partRel)));
-	if (!HeapTupleIsValid(tuple))
-		elog(ERROR, "cache lookup failed for relation %u",
-			 RelationGetRelid(partRel));
-	Assert(((Form_pg_class) GETSTRUCT(tuple))->relispartition);
-
-	/* Clear relpartbound and reset relispartition */
-	memset(new_val, 0, sizeof(new_val));
-	memset(new_null, false, sizeof(new_null));
-	memset(new_repl, false, sizeof(new_repl));
-	new_val[Anum_pg_class_relpartbound - 1] = (Datum) 0;
-	new_null[Anum_pg_class_relpartbound - 1] = true;
-	new_repl[Anum_pg_class_relpartbound - 1] = true;
-	newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
-								 new_val, new_null, new_repl);
-
-	((Form_pg_class) GETSTRUCT(newtuple))->relispartition = false;
-	CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
-	heap_freetuple(newtuple);
-
+		get_default_oid_from_partdesc(RelationGetPartitionDesc(rel, false));
 	if (OidIsValid(defaultPartOid))
 	{
 		/*
-		 * If the relation being detached is the default partition itself,
-		 * remove it from the parent's pg_partitioned_table entry.
+		 * Concurrent detaching when a default partition exists is not
+		 * supported. The main problem is that the default partition
+		 * constraint would change.  And there's a definitional problem: what
+		 * should happen to the tuples that are being inserted that belong to
+		 * the partition being detached?  Putting them on the partition being
+		 * detached would be wrong, since they'd become "lost" after the but
+		 * we cannot put them in the default partition either until we alter
+		 * its partition constraint.
 		 *
-		 * If not, we must invalidate default partition's relcache entry, as
-		 * in StorePartitionBound: its partition constraint depends on every
-		 * other partition's partition constraint.
+		 * I think we could solve this problem if we effected the constraint
+		 * change before committing the first transaction.  But the lock would
+		 * have to remain AEL and it would cause concurrent query planning to
+		 * be blocked, so changing it that way would be even worse.
 		 */
-		if (RelationGetRelid(partRel) == defaultPartOid)
-			update_default_partition_oid(RelationGetRelid(rel), InvalidOid);
-		else
-			CacheInvalidateRelcacheByRelid(defaultPartOid);
+		if (concurrent)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("cannot detach partitions concurrently when a default partition exists")));
+		LockRelationOid(defaultPartOid, AccessExclusiveLock);
 	}
 
-	/* detach indexes too */
-	indexes = RelationGetIndexList(partRel);
-	foreach(cell, indexes)
+	/*
+	 * In concurrent mode, the partition is locked with share-update-exclusive
+	 * in the first transaction.  This allows concurrent transactions to be
+	 * doing DML to the partition.
+	 */
+	partRel = table_openrv(name, concurrent ? ShareUpdateExclusiveLock :
+						   AccessExclusiveLock);
+
+	/*
+	 * Check inheritance conditions and either delete the pg_inherits row
+	 * (in non-concurrent mode) or just set the inhdetachpending flag.
+	 */
+	if (!concurrent)
+		RemoveInheritance(partRel, rel, false);
+	else
+		MarkInheritDetached(partRel, rel);
+
+	/*
+	 * Ensure that foreign keys still hold after this detach.  This keeps
+	 * locks on the referencing tables, which prevents concurrent transactions
+	 * from adding rows that we wouldn't see.  For this to work in concurrent
+	 * mode, it is critical that the partition appears as no longer attached
+	 * for the RI queries as soon as the first transaction commits.
+	 */
+	ATDetachCheckNoForeignKeyRefs(partRel);
+
+	/*
+	 * Concurrent mode has to work harder; first we add a new constraint to
+	 * the partition that matches the partition constraint.  Then we close our
+	 * existing transaction, and in a new one wait for all processes to catch
+	 * up on the catalog updates we've done so far; at that point we can
+	 * complete the operation.
+	 */
+	if (concurrent)
 	{
-		Oid			idxid = lfirst_oid(cell);
-		Relation	idx;
-		Oid			constrOid;
+		Oid		partrelid,
+				parentrelid;
+		LOCKTAG		tag;
+		char   *parentrelname;
+		char   *partrelname;
 
-		if (!has_superclass(idxid))
-			continue;
+		/*
+		 * Add a new constraint to the partition being detached, which
+		 * supplants the partition constraint (unless there is one already).
+		 */
+		DetachAddConstraintIfNeeded(wqueue, partRel);
 
-		Assert((IndexGetRelation(get_partition_parent(idxid), false) ==
-				RelationGetRelid(rel)));
+		/*
+		 * We're almost done now; the only traces that remain are the
+		 * pg_inherits tuple and the partition's relpartbounds.  Before we can
+		 * remove those, we need to wait until all transactions that know that
+		 * this is a partition are gone.
+		 */
 
-		idx = index_open(idxid, AccessExclusiveLock);
-		IndexSetParentIndex(idx, InvalidOid);
+		/*
+		 * Remember relation OIDs to re-acquire them later; and relation names
+		 * too, for error messages if something is dropped in between.
+		 */
+		partrelid = RelationGetRelid(partRel);
+		parentrelid = RelationGetRelid(rel);
+		parentrelname = MemoryContextStrdup(PortalContext,
+											RelationGetRelationName(rel));
+		partrelname = MemoryContextStrdup(PortalContext,
+										  RelationGetRelationName(partRel));
 
-		/* If there's a constraint associated with the index, detach it too */
-		constrOid = get_relation_idx_constraint_oid(RelationGetRelid(partRel),
-													idxid);
-		if (OidIsValid(constrOid))
-			ConstraintSetParentConstraint(constrOid, InvalidOid, InvalidOid);
+		/* Invalidate relcache entries for the parent -- must be before close */
+		CacheInvalidateRelcache(rel);
+
+		table_close(partRel, NoLock);
+		table_close(rel, NoLock);
+		tab->rel = NULL;
+
+		/* Make updated catalog entry visible */
+		PopActiveSnapshot();
+		CommitTransactionCommand();
+
+		StartTransactionCommand();
+
+		/*
+		 * Now wait.  This ensures that all queries that were planned including
+		 * the partition are finished before we remove the rest of catalog
+		 * entries.  We don't need or indeed want to acquire this lock, though
+		 * -- that would block later queries.
+		 *
+		 * We don't need to concern ourselves with waiting for a lock on the
+		 * partition itself, since we will acquire AccessExclusiveLock below.
+		 */
+		SET_LOCKTAG_RELATION(tag, MyDatabaseId, parentrelid);
+		WaitForLockersMultiple(list_make1(&tag), AccessExclusiveLock, false);
+
+		/*
+		 * Now acquire locks in both relations again.  Note they may have been
+		 * removed in the meantime, so care is required.
+		 */
+		rel = try_relation_open(parentrelid, ShareUpdateExclusiveLock);
+		partRel = try_relation_open(partrelid, AccessExclusiveLock);
+
+		/* If the relations aren't there, something bad happened; bail out */
+		if (rel == NULL)
+		{
+			if (partRel != NULL)	/* shouldn't happen */
+				elog(WARNING, "dangling partition \"%s\" remains, can't fix",
+					 partrelname);
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("partitioned table \"%s\" was removed concurrently",
+							parentrelname)));
+		}
+		if (partRel == NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("partition \"%s\" was removed concurrently", partrelname)));
+
+		tab->rel = rel;
 
-		index_close(idx, NoLock);
 	}
-	table_close(classRel, RowExclusiveLock);
+
+	/* Do the final part of detaching */
+	DetachPartitionFinalize(rel, partRel, concurrent, defaultPartOid, false);
+
+	ObjectAddressSet(address, RelationRelationId, RelationGetRelid(partRel));
+
+	/* keep our lock until commit */
+	table_close(partRel, NoLock);
+
+	return address;
+}
+
+/*
+ * Second part of ALTER TABLE .. DETACH.
+ *
+ * This is separate so that it can be run independently when the second
+ * transaction of the concurrent algorithm fails (crash or abort).
+ */
+static void
+DetachPartitionFinalize(Relation rel, Relation partRel, bool concurrent,
+						Oid defaultPartOid, bool wait)
+{
+	Relation	classRel;
+	List	   *fks;
+	ListCell   *cell;
+	List	   *indexes;
+	Datum		new_val[Natts_pg_class];
+	bool		new_null[Natts_pg_class],
+				new_repl[Natts_pg_class];
+	HeapTuple	tuple,
+				newtuple;
+
+	/*
+	 * If asked to, wait until existing snapshots are gone.  This is important
+	 * if the second transaction of DETACH PARTITION CONCURRENTLY is canceled:
+	 * the user could immediately run DETACH FINALIZE without actually waiting
+	 * for existing transactions.  We must not complete the detach action until
+	 * all such queries are complete (otherwise we would present them with an
+	 * inconsistent view of catalogs).
+	 */
+	if (wait)
+	{
+		Snapshot	snap = GetActiveSnapshot();
+
+		WaitForOlderSnapshots(snap->xmin, false);
+	}
+
+	if (concurrent)
+	{
+		/*
+		 * We can remove the pg_inherits row now. (In the non-concurrent case,
+		 * this was already done).
+		 */
+		RemoveInheritance(partRel, rel, true);
+	}
 
 	/* Drop any triggers that were cloned on creation/attach. */
 	DropClonedTriggersFromPartition(RelationGetRelid(partRel));
@@ -17318,22 +17547,150 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
 		ObjectAddressSet(constraint, ConstraintRelationId, constrOid);
 		performDeletion(&constraint, DROP_RESTRICT, 0);
 	}
-	CommandCounterIncrement();
+
+	/* Now we can detach indexes */
+	indexes = RelationGetIndexList(partRel);
+	foreach(cell, indexes)
+	{
+		Oid			idxid = lfirst_oid(cell);
+		Relation	idx;
+		Oid			constrOid;
+
+		if (!has_superclass(idxid))
+			continue;
+
+		Assert((IndexGetRelation(get_partition_parent(idxid, false), false) ==
+				RelationGetRelid(rel)));
+
+		idx = index_open(idxid, AccessExclusiveLock);
+		IndexSetParentIndex(idx, InvalidOid);
+
+		/* If there's a constraint associated with the index, detach it too */
+		constrOid = get_relation_idx_constraint_oid(RelationGetRelid(partRel),
+													idxid);
+		if (OidIsValid(constrOid))
+			ConstraintSetParentConstraint(constrOid, InvalidOid, InvalidOid);
+
+		index_close(idx, NoLock);
+	}
+
+	/* Update pg_class tuple */
+	classRel = table_open(RelationRelationId, RowExclusiveLock);
+	tuple = SearchSysCacheCopy1(RELOID,
+								ObjectIdGetDatum(RelationGetRelid(partRel)));
+	if (!HeapTupleIsValid(tuple))
+		elog(ERROR, "cache lookup failed for relation %u",
+			 RelationGetRelid(partRel));
+	Assert(((Form_pg_class) GETSTRUCT(tuple))->relispartition);
+
+	/* Clear relpartbound and reset relispartition */
+	memset(new_val, 0, sizeof(new_val));
+	memset(new_null, false, sizeof(new_null));
+	memset(new_repl, false, sizeof(new_repl));
+	new_val[Anum_pg_class_relpartbound - 1] = (Datum) 0;
+	new_null[Anum_pg_class_relpartbound - 1] = true;
+	new_repl[Anum_pg_class_relpartbound - 1] = true;
+	newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+								 new_val, new_null, new_repl);
+
+	((Form_pg_class) GETSTRUCT(newtuple))->relispartition = false;
+	CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+	heap_freetuple(newtuple);
+	table_close(classRel, RowExclusiveLock);
+
+	if (OidIsValid(defaultPartOid))
+	{
+		/*
+		 * If the relation being detached is the default partition itself,
+		 * remove it from the parent's pg_partitioned_table entry.
+		 *
+		 * If not, we must invalidate default partition's relcache entry, as
+		 * in StorePartitionBound: its partition constraint depends on every
+		 * other partition's partition constraint.
+		 */
+		if (RelationGetRelid(partRel) == defaultPartOid)
+			update_default_partition_oid(RelationGetRelid(rel), InvalidOid);
+		else
+			CacheInvalidateRelcacheByRelid(defaultPartOid);
+	}
 
 	/*
 	 * Invalidate the parent's relcache so that the partition is no longer
 	 * included in its partition descriptor.
 	 */
 	CacheInvalidateRelcache(rel);
+}
+
+/*
+ * ALTER TABLE ... DETACH PARTITION ... FINALIZE
+ *
+ * To use when a DETACH PARTITION command previously did not run to
+ * completion; this completes the detaching process.
+ */
+static ObjectAddress
+ATExecDetachPartitionFinalize(Relation rel, RangeVar *name)
+{
+	Relation    partRel;
+	ObjectAddress address;
+
+	partRel = table_openrv(name, AccessExclusiveLock);
+
+	DetachPartitionFinalize(rel, partRel, true, InvalidOid, true);
 
 	ObjectAddressSet(address, RelationRelationId, RelationGetRelid(partRel));
 
-	/* keep our lock until commit */
 	table_close(partRel, NoLock);
 
 	return address;
 }
 
+/*
+ * DetachAddConstraintIfNeeded
+ *		Subroutine for ATExecDetachPartition.  Create a constraint that
+ *		takes the place of the partition constraint, but avoid creating
+ *		a dupe if an equivalent constraint already exists.
+ */
+static void
+DetachAddConstraintIfNeeded(List **wqueue, Relation partRel)
+{
+	AlteredTableInfo *tab;
+	Expr	   *constraintExpr;
+	TupleDesc	td = RelationGetDescr(partRel);
+	Constraint *n;
+
+	constraintExpr = make_ands_explicit(RelationGetPartitionQual(partRel));
+
+	/* If an identical constraint exists, we don't need to create one */
+	if (td->constr && td->constr->num_check > 0)
+	{
+		for (int i = 0; i < td->constr->num_check; i++)
+		{
+			Node	*thisconstr;
+
+			thisconstr = stringToNode(td->constr->check[i].ccbin);
+
+			if (equal(constraintExpr, thisconstr))
+				return;
+		}
+	}
+
+	tab = ATGetQueueEntry(wqueue, partRel);
+
+	/* Add constraint on partition, equivalent to the partition constraint */
+	n = makeNode(Constraint);
+	n->contype = CONSTR_CHECK;
+	n->conname = NULL;
+	n->location = -1;
+	n->is_no_inherit = false;
+	n->raw_expr = NULL;
+	n->cooked_expr = nodeToString(constraintExpr);
+	n->initially_valid = true;
+	n->skip_validation = true;
+	/* It's a re-add, since it nominally already exists */
+	ATAddCheckConstraint(wqueue, tab, partRel, n,
+						 true, false, true, ShareUpdateExclusiveLock);
+}
+
 /*
  * DropClonedTriggersFromPartition
  *		subroutine for ATExecDetachPartition to remove any triggers that were
@@ -17501,7 +17858,7 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
 
 	/* Silently do nothing if already in the right state */
 	currParent = partIdx->rd_rel->relispartition ?
-		get_partition_parent(partIdxId) : InvalidOid;
+		get_partition_parent(partIdxId, false) : InvalidOid;
 	if (currParent != RelationGetRelid(parentIdx))
 	{
 		IndexInfo  *childInfo;
@@ -17529,7 +17886,7 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
 							   RelationGetRelationName(partIdx))));
 
 		/* Make sure it indexes a partition of the other index's table */
-		partDesc = RelationGetPartitionDesc(parentTbl);
+		partDesc = RelationGetPartitionDesc(parentTbl, false);
 		found = false;
 		for (i = 0; i < partDesc->nparts; i++)
 		{
@@ -17683,7 +18040,7 @@ validatePartitionedIndex(Relation partedIdx, Relation partedTbl)
 	 * If we found as many inherited indexes as the partitioned table has
 	 * partitions, we're good; update pg_index to set indisvalid.
 	 */
-	if (tuples == RelationGetPartitionDesc(partedTbl)->nparts)
+	if (tuples == RelationGetPartitionDesc(partedTbl, false)->nparts)
 	{
 		Relation	idxRel;
 		HeapTuple	newtup;
@@ -17713,8 +18070,8 @@ validatePartitionedIndex(Relation partedIdx, Relation partedTbl)
 		/* make sure we see the validation we just did */
 		CommandCounterIncrement();
 
-		parentIdxId = get_partition_parent(RelationGetRelid(partedIdx));
-		parentTblId = get_partition_parent(RelationGetRelid(partedTbl));
+		parentIdxId = get_partition_parent(RelationGetRelid(partedIdx), false);
+		parentTblId = get_partition_parent(RelationGetRelid(partedTbl), false);
 		parentIdx = relation_open(parentIdxId, AccessExclusiveLock);
 		parentTbl = relation_open(parentTblId, AccessExclusiveLock);
 		Assert(!parentIdx->rd_index->indisvalid);
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 4e4e05844c..7383d5994e 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -1119,7 +1119,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
 	 */
 	if (partition_recurse)
 	{
-		PartitionDesc partdesc = RelationGetPartitionDesc(rel);
+		PartitionDesc partdesc = RelationGetPartitionDesc(rel, false);
 		List	   *idxs = NIL;
 		List	   *childTbls = NIL;
 		ListCell   *l;
@@ -1141,7 +1141,8 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
 			ListCell   *l;
 			List	   *idxs = NIL;
 
-			idxs = find_inheritance_children(indexOid, ShareRowExclusiveLock);
+			idxs = find_inheritance_children(indexOid, false,
+											 ShareRowExclusiveLock);
 			foreach(l, idxs)
 				childTbls = lappend_oid(childTbls,
 										IndexGetRelation(lfirst_oid(l),
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index b8da4c5967..619aaffae4 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -569,6 +569,7 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
 					  int partidx)
 {
 	ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
+	Oid			partOid = dispatch->partdesc->oids[partidx];
 	Relation	partrel;
 	int			firstVarno = mtstate->resultRelInfo[0].ri_RangeTableIndex;
 	Relation	firstResultRel = mtstate->resultRelInfo[0].ri_RelationDesc;
@@ -579,7 +580,7 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
 
 	oldcxt = MemoryContextSwitchTo(proute->memcxt);
 
-	partrel = table_open(dispatch->partdesc->oids[partidx], RowExclusiveLock);
+	partrel = table_open(partOid, RowExclusiveLock);
 
 	leaf_part_rri = makeNode(ResultRelInfo);
 	InitResultRelInfo(leaf_part_rri,
@@ -1065,9 +1066,21 @@ ExecInitPartitionDispatchInfo(EState *estate,
 	int			dispatchidx;
 	MemoryContext oldcxt;
 
+	/*
+	 * For data modification, it is better that executor does not include
+	 * partitions being detached, except in snapshot-isolation mode.  This
+	 * means that a read-committed transaction immediately gets a "no
+	 * partition for tuple" error when a tuple is inserted into a partition
+	 * that's being detached concurrently, but a transaction in repeatable-
+	 * read mode can still use the partition.  Note that because partition
+	 * detach uses ShareLock on the partition (which conflicts with DML),
+	 * we're certain that the detach won't be able to complete until any
+	 * inserting transaction is done.
+	 */
 	if (estate->es_partition_directory == NULL)
 		estate->es_partition_directory =
-			CreatePartitionDirectory(estate->es_query_cxt);
+			CreatePartitionDirectory(estate->es_query_cxt,
+									 IsolationUsesXactSnapshot());
 
 	oldcxt = MemoryContextSwitchTo(proute->memcxt);
 
@@ -1645,9 +1658,10 @@ ExecCreatePartitionPruneState(PlanState *planstate,
 	ListCell   *lc;
 	int			i;
 
+	/* Executor must always include detached partitions */
 	if (estate->es_partition_directory == NULL)
 		estate->es_partition_directory =
-			CreatePartitionDirectory(estate->es_query_cxt);
+			CreatePartitionDirectory(estate->es_query_cxt, true);
 
 	n_part_hierarchies = list_length(partitionpruneinfo->prune_infos);
 	Assert(n_part_hierarchies > 0);
@@ -1713,9 +1727,12 @@ ExecCreatePartitionPruneState(PlanState *planstate,
 												partrel);
 
 			/*
-			 * Initialize the subplan_map and subpart_map.  Since detaching a
-			 * partition requires AccessExclusiveLock, no partitions can have
-			 * disappeared, nor can the bounds for any partition have changed.
+			 * Initialize the subplan_map and subpart_map.
+			 *
+			 * Because we request detached partitions to be included, and
+			 * detaching waits for old transactions, it is safe to assume that
+			 * no partitions have disappeared since this query was planned.
+			 *
 			 * However, new partitions may have been added.
 			 */
 			Assert(partdesc->nparts >= pinfo->nparts);
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 82d7cce5d5..38b56231b7 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4737,6 +4737,7 @@ _copyPartitionCmd(const PartitionCmd *from)
 
 	COPY_NODE_FIELD(name);
 	COPY_NODE_FIELD(bound);
+	COPY_SCALAR_FIELD(concurrent);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3e980c457c..3292dda342 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2975,6 +2975,7 @@ _equalPartitionCmd(const PartitionCmd *a, const PartitionCmd *b)
 {
 	COMPARE_NODE_FIELD(name);
 	COMPARE_NODE_FIELD(bound);
+	COMPARE_SCALAR_FIELD(concurrent);
 
 	return true;
 }
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index 7f2e40ae39..6c39bf893f 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -2141,10 +2141,14 @@ set_relation_partition_info(PlannerInfo *root, RelOptInfo *rel,
 {
 	PartitionDesc partdesc;
 
-	/* Create the PartitionDirectory infrastructure if we didn't already */
+	/*
+	 * Create the PartitionDirectory infrastructure if we didn't already.
+	 */
 	if (root->glob->partition_directory == NULL)
+	{
 		root->glob->partition_directory =
-			CreatePartitionDirectory(CurrentMemoryContext);
+			CreatePartitionDirectory(CurrentMemoryContext, false);
+	}
 
 	partdesc = PartitionDirectoryLookup(root->glob->partition_directory,
 										relation);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 4843ff99e2..2132cf4d82 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -658,7 +658,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXPRESSION
 	EXTENSION EXTERNAL EXTRACT
 
-	FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
+	FALSE_P FAMILY FETCH FILTER FINALIZE FIRST_P FLOAT_P FOLLOWING FOR
 	FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
 
 	GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS
@@ -2108,12 +2108,13 @@ partition_cmd:
 					n->subtype = AT_AttachPartition;
 					cmd->name = $3;
 					cmd->bound = $4;
+					cmd->concurrent = false;
 					n->def = (Node *) cmd;
 
 					$$ = (Node *) n;
 				}
-			/* ALTER TABLE <name> DETACH PARTITION <partition_name> */
-			| DETACH PARTITION qualified_name
+			/* ALTER TABLE <name> DETACH PARTITION <partition_name> [CONCURRENTLY] */
+			| DETACH PARTITION qualified_name opt_concurrently
 				{
 					AlterTableCmd *n = makeNode(AlterTableCmd);
 					PartitionCmd *cmd = makeNode(PartitionCmd);
@@ -2121,8 +2122,21 @@ partition_cmd:
 					n->subtype = AT_DetachPartition;
 					cmd->name = $3;
 					cmd->bound = NULL;
+					cmd->concurrent = $4;
 					n->def = (Node *) cmd;
 
+					$$ = (Node *) n;
+				}
+			| DETACH PARTITION qualified_name FINALIZE
+				{
+					AlterTableCmd *n = makeNode(AlterTableCmd);
+					PartitionCmd *cmd = makeNode(PartitionCmd);
+
+					n->subtype = AT_DetachPartitionFinalize;
+					cmd->name = $3;
+					cmd->bound = NULL;
+					cmd->concurrent = false;
+					n->def = (Node *) cmd;
 					$$ = (Node *) n;
 				}
 		;
@@ -2137,6 +2151,7 @@ index_partition_cmd:
 					n->subtype = AT_AttachPartition;
 					cmd->name = $3;
 					cmd->bound = NULL;
+					cmd->concurrent = false;
 					n->def = (Node *) cmd;
 
 					$$ = (Node *) n;
@@ -15395,6 +15410,7 @@ unreserved_keyword:
 			| EXTERNAL
 			| FAMILY
 			| FILTER
+			| FINALIZE
 			| FIRST_P
 			| FOLLOWING
 			| FORCE
@@ -15936,6 +15952,7 @@ bare_label_keyword:
 			| EXTRACT
 			| FALSE_P
 			| FAMILY
+			| FINALIZE
 			| FIRST_P
 			| FLOAT_P
 			| FOLLOWING
diff --git a/src/backend/partitioning/partbounds.c b/src/backend/partitioning/partbounds.c
index e5f3482d52..2b2b1dc1ad 100644
--- a/src/backend/partitioning/partbounds.c
+++ b/src/backend/partitioning/partbounds.c
@@ -2798,7 +2798,7 @@ check_new_partition_bound(char *relname, Relation parent,
 						  PartitionBoundSpec *spec, ParseState *pstate)
 {
 	PartitionKey key = RelationGetPartitionKey(parent);
-	PartitionDesc partdesc = RelationGetPartitionDesc(parent);
+	PartitionDesc partdesc = RelationGetPartitionDesc(parent, true);
 	PartitionBoundInfo boundinfo = partdesc->boundinfo;
 	int			with = -1;
 	bool		overlap = false;
@@ -3990,7 +3990,7 @@ get_qual_for_list(Relation parent, PartitionBoundSpec *spec)
 	{
 		int			i;
 		int			ndatums = 0;
-		PartitionDesc pdesc = RelationGetPartitionDesc(parent);
+		PartitionDesc pdesc = RelationGetPartitionDesc(parent, true);	/* XXX correct? */
 		PartitionBoundInfo boundinfo = pdesc->boundinfo;
 
 		if (boundinfo)
@@ -4190,7 +4190,7 @@ get_qual_for_range(Relation parent, PartitionBoundSpec *spec,
 	if (spec->is_default)
 	{
 		List	   *or_expr_args = NIL;
-		PartitionDesc pdesc = RelationGetPartitionDesc(parent);
+		PartitionDesc pdesc = RelationGetPartitionDesc(parent, true);	/* XXX correct? */
 		Oid		   *inhoids = pdesc->oids;
 		int			nparts = pdesc->nparts,
 					i;
diff --git a/src/backend/partitioning/partdesc.c b/src/backend/partitioning/partdesc.c
index f852b6e99d..58570fecfd 100644
--- a/src/backend/partitioning/partdesc.c
+++ b/src/backend/partitioning/partdesc.c
@@ -37,6 +37,7 @@ typedef struct PartitionDirectoryData
 {
 	MemoryContext pdir_mcxt;
 	HTAB	   *pdir_hash;
+	bool		include_detached;
 }			PartitionDirectoryData;
 
 typedef struct PartitionDirectoryEntry
@@ -46,7 +47,7 @@ typedef struct PartitionDirectoryEntry
 	PartitionDesc pd;
 } PartitionDirectoryEntry;
 
-static void RelationBuildPartitionDesc(Relation rel);
+static void RelationBuildPartitionDesc(Relation rel, bool include_detached);
 
 
 /*
@@ -61,13 +62,14 @@ static void RelationBuildPartitionDesc(Relation rel);
  * that the data doesn't become stale.
  */
 PartitionDesc
-RelationGetPartitionDesc(Relation rel)
+RelationGetPartitionDesc(Relation rel, bool include_detached)
 {
 	if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
 		return NULL;
 
-	if (unlikely(rel->rd_partdesc == NULL))
-		RelationBuildPartitionDesc(rel);
+	if (unlikely(rel->rd_partdesc == NULL ||
+				 rel->rd_partdesc->includes_detached != include_detached))
+		RelationBuildPartitionDesc(rel, include_detached);
 
 	return rel->rd_partdesc;
 }
@@ -88,7 +90,7 @@ RelationGetPartitionDesc(Relation rel)
  * permanently.
  */
 static void
-RelationBuildPartitionDesc(Relation rel)
+RelationBuildPartitionDesc(Relation rel, bool include_detached)
 {
 	PartitionDesc partdesc;
 	PartitionBoundInfo boundinfo = NULL;
@@ -110,7 +112,8 @@ RelationBuildPartitionDesc(Relation rel)
 	 * concurrently, whatever this function returns will be accurate as of
 	 * some well-defined point in time.
 	 */
-	inhoids = find_inheritance_children(RelationGetRelid(rel), NoLock);
+	inhoids = find_inheritance_children(RelationGetRelid(rel), include_detached,
+										NoLock);
 	nparts = list_length(inhoids);
 
 	/* Allocate working arrays for OIDs, leaf flags, and boundspecs. */
@@ -238,6 +241,7 @@ RelationBuildPartitionDesc(Relation rel)
 		partdesc->boundinfo = partition_bounds_copy(boundinfo, key);
 		partdesc->oids = (Oid *) palloc(nparts * sizeof(Oid));
 		partdesc->is_leaf = (bool *) palloc(nparts * sizeof(bool));
+		partdesc->includes_detached = include_detached;
 
 		/*
 		 * Assign OIDs from the original array into mapped indexes of the
@@ -280,7 +284,7 @@ RelationBuildPartitionDesc(Relation rel)
  *		Create a new partition directory object.
  */
 PartitionDirectory
-CreatePartitionDirectory(MemoryContext mcxt)
+CreatePartitionDirectory(MemoryContext mcxt, bool include_detached)
 {
 	MemoryContext oldcontext = MemoryContextSwitchTo(mcxt);
 	PartitionDirectory pdir;
@@ -295,6 +299,7 @@ CreatePartitionDirectory(MemoryContext mcxt)
 
 	pdir->pdir_hash = hash_create("partition directory", 256, &ctl,
 								  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	pdir->include_detached = include_detached;
 
 	MemoryContextSwitchTo(oldcontext);
 	return pdir;
@@ -327,7 +332,7 @@ PartitionDirectoryLookup(PartitionDirectory pdir, Relation rel)
 		 */
 		RelationIncrementReferenceCount(rel);
 		pde->rel = rel;
-		pde->pd = RelationGetPartitionDesc(rel);
+		pde->pd = RelationGetPartitionDesc(rel, pdir->include_detached);
 		Assert(pde->pd != NULL);
 	}
 	return pde->pd;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 05bb698cf4..729274b330 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1236,6 +1236,25 @@ ProcessUtilitySlow(ParseState *pstate,
 					AlterTableStmt *atstmt = (AlterTableStmt *) parsetree;
 					Oid			relid;
 					LOCKMODE	lockmode;
+					ListCell   *cell;
+
+					/*
+					 * Disallow ALTER TABLE .. DETACH CONCURRENTLY in a
+					 * transaction block or function.  (Perhaps it could be
+					 * allowed in a procedure, but don't hold your breath.)
+					 */
+					foreach(cell, atstmt->cmds)
+					{
+						AlterTableCmd *cmd = (AlterTableCmd *) lfirst(cell);
+
+						/* Disallow DETACH CONCURRENTLY in a transaction block */
+						if (cmd->subtype == AT_DetachPartition)
+						{
+							if (((PartitionCmd *) cmd->def)->concurrent)
+								PreventInTransactionBlock(isTopLevel,
+														  "ALTER TABLE ... DETACH CONCURRENTLY");
+						}
+					}
 
 					/*
 					 * Figure out lock mode, and acquire lock.  This also does
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 09a2ad2881..7c77c338ce 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -392,11 +392,15 @@ RI_FKey_check(TriggerData *trigdata)
 
 	/*
 	 * Now check that foreign key exists in PK table
+	 *
+	 * XXX detectNewRows must be true when a partitioned table is on the
+	 * referenced side.  The reason is that our snapshot must be fresh
+	 * in order for the hack in find_inheritance_children() to work.
 	 */
 	ri_PerformCheck(riinfo, &qkey, qplan,
 					fk_rel, pk_rel,
 					NULL, newslot,
-					false,
+					pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE,
 					SPI_OK_SELECT);
 
 	if (SPI_finish() != SPI_OK_FINISH)
diff --git a/src/backend/utils/cache/partcache.c b/src/backend/utils/cache/partcache.c
index a6388d980e..21e60f0c5e 100644
--- a/src/backend/utils/cache/partcache.c
+++ b/src/backend/utils/cache/partcache.c
@@ -341,6 +341,7 @@ generate_partition_qual(Relation rel)
 	bool		isnull;
 	List	   *my_qual = NIL,
 			   *result = NIL;
+	Oid			parentrelid;
 	Relation	parent;
 
 	/* Guard against stack overflow due to overly deep partition tree */
@@ -350,9 +351,14 @@ generate_partition_qual(Relation rel)
 	if (rel->rd_partcheckvalid)
 		return copyObject(rel->rd_partcheck);
 
-	/* Grab at least an AccessShareLock on the parent table */
-	parent = relation_open(get_partition_parent(RelationGetRelid(rel)),
-						   AccessShareLock);
+	/*
+	 * Grab at least an AccessShareLock on the parent table.  Must do this
+	 * even if the partition has been partially detached, because transactions
+	 * concurrent with the detach might still be trying to use a partition
+	 * descriptor that includes it.
+	 */
+	parentrelid = get_partition_parent(RelationGetRelid(rel), true);
+	parent = relation_open(parentrelid, AccessShareLock);
 
 	/* Get pg_class.relpartbound */
 	tuple = SearchSysCache1(RELOID, RelationGetRelid(rel));
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index eeac0efc4f..c9f7118a5d 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -2144,7 +2144,12 @@ describeOneTableDetails(const char *schemaname,
 
 		printfPQExpBuffer(&buf,
 						  "SELECT inhparent::pg_catalog.regclass,\n"
-						  "  pg_catalog.pg_get_expr(c.relpartbound, c.oid)");
+						  "  pg_catalog.pg_get_expr(c.relpartbound, c.oid),\n  ");
+
+		appendPQExpBuffer(&buf,
+						  pset.sversion >= 140000 ? "inhdetachpending" :
+						  "false as inhdetachpending");
+
 		/* If verbose, also request the partition constraint definition */
 		if (verbose)
 			appendPQExpBufferStr(&buf,
@@ -2162,17 +2167,19 @@ describeOneTableDetails(const char *schemaname,
 		{
 			char	   *parent_name = PQgetvalue(result, 0, 0);
 			char	   *partdef = PQgetvalue(result, 0, 1);
+			char	   *detached = PQgetvalue(result, 0, 2);
 
-			printfPQExpBuffer(&tmpbuf, _("Partition of: %s %s"), parent_name,
-							  partdef);
+			printfPQExpBuffer(&tmpbuf, _("Partition of: %s %s%s"), parent_name,
+							  partdef,
+							  strcmp(detached, "t") == 0 ? " DETACH PENDING" : "");
 			printTableAddFooter(&cont, tmpbuf.data);
 
 			if (verbose)
 			{
 				char	   *partconstraintdef = NULL;
 
-				if (!PQgetisnull(result, 0, 2))
-					partconstraintdef = PQgetvalue(result, 0, 2);
+				if (!PQgetisnull(result, 0, 3))
+					partconstraintdef = PQgetvalue(result, 0, 3);
 				/* If there isn't any constraint, show that explicitly */
 				if (partconstraintdef == NULL || partconstraintdef[0] == '\0')
 					printfPQExpBuffer(&tmpbuf, _("No partition constraint"));
@@ -3224,9 +3231,20 @@ describeOneTableDetails(const char *schemaname,
 		}
 
 		/* print child tables (with additional info if partitions) */
-		if (pset.sversion >= 100000)
+		if (pset.sversion >= 140000)
 			printfPQExpBuffer(&buf,
 							  "SELECT c.oid::pg_catalog.regclass, c.relkind,"
+							  " inhdetachpending,"
+							  " pg_catalog.pg_get_expr(c.relpartbound, c.oid)\n"
+							  "FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i\n"
+							  "WHERE c.oid = i.inhrelid AND i.inhparent = '%s'\n"
+							  "ORDER BY pg_catalog.pg_get_expr(c.relpartbound, c.oid) = 'DEFAULT',"
+							  " c.oid::pg_catalog.regclass::pg_catalog.text;",
+							  oid);
+		else if (pset.sversion >= 100000)
+			printfPQExpBuffer(&buf,
+							  "SELECT c.oid::pg_catalog.regclass, c.relkind,"
+							  " false AS inhdetachpending,"
 							  " pg_catalog.pg_get_expr(c.relpartbound, c.oid)\n"
 							  "FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i\n"
 							  "WHERE c.oid = i.inhrelid AND i.inhparent = '%s'\n"
@@ -3235,14 +3253,16 @@ describeOneTableDetails(const char *schemaname,
 							  oid);
 		else if (pset.sversion >= 80300)
 			printfPQExpBuffer(&buf,
-							  "SELECT c.oid::pg_catalog.regclass, c.relkind, NULL\n"
+							  "SELECT c.oid::pg_catalog.regclass, c.relkind,"
+							  " false AS inhdetachpending, NULL\n"
 							  "FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i\n"
 							  "WHERE c.oid = i.inhrelid AND i.inhparent = '%s'\n"
 							  "ORDER BY c.oid::pg_catalog.regclass::pg_catalog.text;",
 							  oid);
 		else
 			printfPQExpBuffer(&buf,
-							  "SELECT c.oid::pg_catalog.regclass, c.relkind, NULL\n"
+							  "SELECT c.oid::pg_catalog.regclass, c.relkind,"
+							  " false AS inhdetachpending, NULL\n"
 							  "FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i\n"
 							  "WHERE c.oid = i.inhrelid AND i.inhparent = '%s'\n"
 							  "ORDER BY c.relname;",
@@ -3292,11 +3312,13 @@ describeOneTableDetails(const char *schemaname,
 				else
 					printfPQExpBuffer(&buf, "%*s  %s",
 									  ctw, "", PQgetvalue(result, i, 0));
-				if (!PQgetisnull(result, i, 2))
-					appendPQExpBuffer(&buf, " %s", PQgetvalue(result, i, 2));
+				if (!PQgetisnull(result, i, 3))
+					appendPQExpBuffer(&buf, " %s", PQgetvalue(result, i, 3));
 				if (child_relkind == RELKIND_PARTITIONED_TABLE ||
 					child_relkind == RELKIND_PARTITIONED_INDEX)
 					appendPQExpBufferStr(&buf, ", PARTITIONED");
+				if (strcmp(PQgetvalue(result, i, 2), "t") == 0)
+					appendPQExpBuffer(&buf, " (DETACH PENDING)");
 				if (i < tuples - 1)
 					appendPQExpBufferChar(&buf, ',');
 
diff --git a/src/include/catalog/partition.h b/src/include/catalog/partition.h
index fe3f66befa..c8c7bc1d99 100644
--- a/src/include/catalog/partition.h
+++ b/src/include/catalog/partition.h
@@ -19,7 +19,7 @@
 /* Seed for the extended hash function */
 #define HASH_PARTITION_SEED UINT64CONST(0x7A5B22367996DCFD)
 
-extern Oid	get_partition_parent(Oid relid);
+extern Oid	get_partition_parent(Oid relid, bool even_if_detached);
 extern List *get_partition_ancestors(Oid relid);
 extern Oid	index_get_partition(Relation partition, Oid indexId);
 extern List *map_partition_varattnos(List *expr, int fromrel_varno,
diff --git a/src/include/catalog/pg_inherits.h b/src/include/catalog/pg_inherits.h
index 2b71cad9a2..6d07e1b302 100644
--- a/src/include/catalog/pg_inherits.h
+++ b/src/include/catalog/pg_inherits.h
@@ -34,6 +34,7 @@ CATALOG(pg_inherits,2611,InheritsRelationId)
 	Oid			inhrelid BKI_LOOKUP(pg_class);
 	Oid			inhparent BKI_LOOKUP(pg_class);
 	int32		inhseqno;
+	bool		inhdetachpending;
 } FormData_pg_inherits;
 
 /* ----------------
@@ -49,7 +50,8 @@ DECLARE_INDEX(pg_inherits_parent_index, 2187, on pg_inherits using btree(inhpare
 #define InheritsParentIndexId  2187
 
 
-extern List *find_inheritance_children(Oid parentrelId, LOCKMODE lockmode);
+extern List *find_inheritance_children(Oid parentrelId, bool include_detached,
+									   LOCKMODE lockmode);
 extern List *find_all_inheritors(Oid parentrelId, LOCKMODE lockmode,
 								 List **parents);
 extern bool has_subclass(Oid relationId);
@@ -57,6 +59,8 @@ extern bool has_superclass(Oid relationId);
 extern bool typeInheritsFrom(Oid subclassTypeId, Oid superclassTypeId);
 extern void StoreSingleInheritance(Oid relationId, Oid parentOid,
 								   int32 seqNumber);
-extern bool DeleteInheritsTuple(Oid inhrelid, Oid inhparent);
+extern bool DeleteInheritsTuple(Oid inhrelid, Oid inhparent, bool allow_detached,
+								const char *childname);
+extern bool PartitionHasPendingDetach(Oid partoid);
 
 #endif							/* PG_INHERITS_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 68425eb2c0..0ce19d98ec 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -871,6 +871,7 @@ typedef struct PartitionCmd
 	NodeTag		type;
 	RangeVar   *name;			/* name of partition to attach/detach */
 	PartitionBoundSpec *bound;	/* FOR VALUES, if attaching */
+	bool		concurrent;
 } PartitionCmd;
 
 /****************************************************************************
@@ -1909,6 +1910,7 @@ typedef enum AlterTableType
 	AT_GenericOptions,			/* OPTIONS (...) */
 	AT_AttachPartition,			/* ATTACH PARTITION */
 	AT_DetachPartition,			/* DETACH PARTITION */
+	AT_DetachPartitionFinalize,	/* DETACH PARTITION FINALIZE */
 	AT_AddIdentity,				/* ADD IDENTITY */
 	AT_SetIdentity,				/* SET identity column options */
 	AT_DropIdentity,			/* DROP IDENTITY */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index ca1f950cbe..4bbe53e852 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -166,6 +166,7 @@ PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD, AS_LABEL)
+PG_KEYWORD("finalize", FINALIZE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("following", FOLLOWING, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/include/partitioning/partdesc.h b/src/include/partitioning/partdesc.h
index ff113199e5..7f03ff4271 100644
--- a/src/include/partitioning/partdesc.h
+++ b/src/include/partitioning/partdesc.h
@@ -21,6 +21,7 @@
 typedef struct PartitionDescData
 {
 	int			nparts;			/* Number of partitions */
+	bool		includes_detached;	/* Does it include detached partitions */
 	Oid		   *oids;			/* Array of 'nparts' elements containing
 								 * partition OIDs in order of the their bounds */
 	bool	   *is_leaf;		/* Array of 'nparts' elements storing whether
@@ -30,9 +31,9 @@ typedef struct PartitionDescData
 } PartitionDescData;
 
 
-extern PartitionDesc RelationGetPartitionDesc(Relation rel);
+extern PartitionDesc RelationGetPartitionDesc(Relation rel, bool include_detached);
 
-extern PartitionDirectory CreatePartitionDirectory(MemoryContext mcxt);
+extern PartitionDirectory CreatePartitionDirectory(MemoryContext mcxt, bool include_detached);
 extern PartitionDesc PartitionDirectoryLookup(PartitionDirectory, Relation);
 extern void DestroyPartitionDirectory(PartitionDirectory pdir);
 
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index f66ac58188..44539fe15a 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -132,6 +132,7 @@ extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin);
 extern void ImportSnapshot(const char *idstr);
 extern bool XactHasExportedSnapshots(void);
 extern void DeleteAllExportedSnapshotFiles(void);
+extern void WaitForOlderSnapshots(TransactionId limitXmin, bool progress);
 extern bool ThereAreNoPriorRegisteredSnapshots(void);
 extern bool TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
 												Relation relation,
diff --git a/src/test/isolation/expected/detach-partition-concurrently-1.out b/src/test/isolation/expected/detach-partition-concurrently-1.out
new file mode 100644
index 0000000000..9b4526773e
--- /dev/null
+++ b/src/test/isolation/expected/detach-partition-concurrently-1.out
@@ -0,0 +1,234 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s1b s1s s2detach s1s s1c s1s
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+
+starting permutation: s1b s1s s2detach s1s s3s s3i s1c s3i s2drop s1s
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+step s3s: SELECT * FROM d_listp;
+a              
+
+1              
+step s3i: SELECT relpartbound IS NULL FROM pg_class where relname = 'd_listp2';
+?column?       
+
+f              
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s3i: SELECT relpartbound IS NULL FROM pg_class where relname = 'd_listp2';
+?column?       
+
+t              
+step s2drop: DROP TABLE d_listp2;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+
+starting permutation: s1b s1s s2detach s1ins s1s s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1ins: INSERT INTO d_listp VALUES (1);
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+1              
+step s1c: COMMIT;
+step s2detach: <... completed>
+
+starting permutation: s1b s1s s1ins2 s2detach s1ins s1s s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1ins2: INSERT INTO d_listp VALUES (2);
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1ins: INSERT INTO d_listp VALUES (1);
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+1              
+step s1c: COMMIT;
+step s2detach: <... completed>
+
+starting permutation: s1brr s1s s2detach s1ins s1s s1c
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1ins: INSERT INTO d_listp VALUES (1);
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+1              
+2              
+step s1c: COMMIT;
+step s2detach: <... completed>
+
+starting permutation: s1brr s1s s2detach s1s s1c
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1c: COMMIT;
+step s2detach: <... completed>
+
+starting permutation: s1b s1ins2 s2detach s3ins2 s1c
+step s1b: BEGIN;
+step s1ins2: INSERT INTO d_listp VALUES (2);
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s3ins2: INSERT INTO d_listp VALUES (2);
+ERROR:  no partition of relation "d_listp" found for row
+step s1c: COMMIT;
+step s2detach: <... completed>
+
+starting permutation: s1brr s1prep s1s s2detach s1s s1exec1 s3s s1dealloc s1c
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prep: PREPARE f(int) AS INSERT INTO d_listp VALUES ($1);
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1exec1: EXECUTE f(1);
+step s3s: SELECT * FROM d_listp;
+a              
+
+1              
+step s1dealloc: DEALLOCATE f;
+step s1c: COMMIT;
+step s2detach: <... completed>
+
+starting permutation: s1brr s1prep s1exec2 s2detach s1s s1exec2 s3s s1c s1dealloc
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prep: PREPARE f(int) AS INSERT INTO d_listp VALUES ($1);
+step s1exec2: EXECUTE f(2);
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+2              
+step s1exec2: EXECUTE f(2);
+step s3s: SELECT * FROM d_listp;
+a              
+
+1              
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s1dealloc: DEALLOCATE f;
+
+starting permutation: s1brr s1prep s1s s2detach s1s s1exec2 s1c s1dealloc
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prep: PREPARE f(int) AS INSERT INTO d_listp VALUES ($1);
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1exec2: EXECUTE f(2);
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s1dealloc: DEALLOCATE f;
+
+starting permutation: s1brr s1prep s2detach s1s s1exec2 s1c s1dealloc
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prep: PREPARE f(int) AS INSERT INTO d_listp VALUES ($1);
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1exec2: EXECUTE f(2);
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s1dealloc: DEALLOCATE f;
+
+starting permutation: s1brr s1prep1 s2detach s1s s1exec2 s1c s1dealloc
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prep1: PREPARE f(int) AS INSERT INTO d_listp VALUES (1);
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1exec2: EXECUTE f(2);
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s1dealloc: DEALLOCATE f;
+
+starting permutation: s1brr s1prep2 s2detach s1s s1exec2 s1c s1dealloc
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prep2: PREPARE f(int) AS INSERT INTO d_listp VALUES (2);
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1exec2: EXECUTE f(2);
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s1dealloc: DEALLOCATE f;
diff --git a/src/test/isolation/expected/detach-partition-concurrently-2.out b/src/test/isolation/expected/detach-partition-concurrently-2.out
new file mode 100644
index 0000000000..85be707b40
--- /dev/null
+++ b/src/test/isolation/expected/detach-partition-concurrently-2.out
@@ -0,0 +1,66 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s1b s1s s2d s3i1 s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_lp_fk;
+a              
+
+1              
+2              
+step s2d: ALTER TABLE d_lp_fk DETACH PARTITION d_lp_fk_1 CONCURRENTLY; <waiting ...>
+step s3i1: INSERT INTO d_lp_fk_r VALUES (1);
+ERROR:  insert or update on table "d_lp_fk_r" violates foreign key constraint "d_lp_fk_r_a_fkey"
+step s1c: COMMIT;
+step s2d: <... completed>
+
+starting permutation: s1b s1s s2d s3i2 s3i2 s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_lp_fk;
+a              
+
+1              
+2              
+step s2d: ALTER TABLE d_lp_fk DETACH PARTITION d_lp_fk_1 CONCURRENTLY; <waiting ...>
+step s3i2: INSERT INTO d_lp_fk_r VALUES (2);
+step s3i2: INSERT INTO d_lp_fk_r VALUES (2);
+step s1c: COMMIT;
+step s2d: <... completed>
+
+starting permutation: s1b s1s s3i1 s2d s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_lp_fk;
+a              
+
+1              
+2              
+step s3i1: INSERT INTO d_lp_fk_r VALUES (1);
+step s2d: ALTER TABLE d_lp_fk DETACH PARTITION d_lp_fk_1 CONCURRENTLY;
+ERROR:  removing partition "d_lp_fk_1" violates foreign key constraint "d_lp_fk_r_a_fkey1"
+step s1c: COMMIT;
+
+starting permutation: s1b s1s s3i2 s2d s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_lp_fk;
+a              
+
+1              
+2              
+step s3i2: INSERT INTO d_lp_fk_r VALUES (2);
+step s2d: ALTER TABLE d_lp_fk DETACH PARTITION d_lp_fk_1 CONCURRENTLY; <waiting ...>
+step s1c: COMMIT;
+step s2d: <... completed>
+
+starting permutation: s1b s1s s3b s2d s3i1 s1c s3c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_lp_fk;
+a              
+
+1              
+2              
+step s3b: BEGIN;
+step s2d: ALTER TABLE d_lp_fk DETACH PARTITION d_lp_fk_1 CONCURRENTLY; <waiting ...>
+step s3i1: INSERT INTO d_lp_fk_r VALUES (1);
+ERROR:  insert or update on table "d_lp_fk_r" violates foreign key constraint "d_lp_fk_r_a_fkey"
+step s1c: COMMIT;
+step s2d: <... completed>
+step s3c: COMMIT;
diff --git a/src/test/isolation/expected/detach-partition-concurrently-3.out b/src/test/isolation/expected/detach-partition-concurrently-3.out
new file mode 100644
index 0000000000..88e83638c7
--- /dev/null
+++ b/src/test/isolation/expected/detach-partition-concurrently-3.out
@@ -0,0 +1,282 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1c s1describe s1alter
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1c: COMMIT;
+step s1describe: SELECT 'd3_listp' AS root, * FROM pg_partition_tree('d3_listp')
+					  UNION ALL SELECT 'd3_listp1', * FROM pg_partition_tree('d3_listp1');
+root           relid          parentrelid    isleaf         level          
+
+d3_listp       d3_listp                      f              0              
+d3_listp1      d3_listp1                     t              0              
+step s1alter: ALTER TABLE d3_listp1 ALTER a DROP NOT NULL;
+ERROR:  cannot alter partition "d3_listp1" with an incomplete detach
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1insert s1c
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1insert: INSERT INTO d3_listp VALUES (1);
+ERROR:  no partition of relation "d3_listp" found for row
+step s1c: COMMIT;
+
+starting permutation: s2snitch s1brr s1s s2detach s1cancel s1insert s1c s1spart
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1insert: INSERT INTO d3_listp VALUES (1);
+step s1c: COMMIT;
+step s1spart: SELECT * FROM d3_listp1;
+a              
+
+1              
+1              
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1c s1insertpart
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1c: COMMIT;
+step s1insertpart: INSERT INTO d3_listp1 VALUES (1);
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1c s1drop s1list
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1c: COMMIT;
+step s1drop: DROP TABLE d3_listp;
+step s1list: SELECT relname FROM pg_catalog.pg_class
+					  WHERE relname LIKE 'd3_listp%' ORDER BY 1;
+relname        
+
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1c s1trunc s1spart
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1c: COMMIT;
+step s1trunc: TRUNCATE TABLE d3_listp;
+step s1spart: SELECT * FROM d3_listp1;
+a              
+
+1              
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1c s2begin s2drop s1s s2commit
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1c: COMMIT;
+step s2begin: BEGIN;
+step s2drop: DROP TABLE d3_listp1;
+step s1s: SELECT * FROM d3_listp; <waiting ...>
+step s2commit: COMMIT;
+step s1s: <... completed>
+a              
+
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1c s1b s1spart s2detachfinal s1c
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1c: COMMIT;
+step s1b: BEGIN;
+step s1spart: SELECT * FROM d3_listp1;
+a              
+
+1              
+step s2detachfinal: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 FINALIZE; <waiting ...>
+step s1c: COMMIT;
+step s2detachfinal: <... completed>
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1c s1b s1s s2detachfinal s1c
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1c: COMMIT;
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+step s2detachfinal: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 FINALIZE;
+step s1c: COMMIT;
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1c s1b s1spart s2detachfinal s1c
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1c: COMMIT;
+step s1b: BEGIN;
+step s1spart: SELECT * FROM d3_listp1;
+a              
+
+1              
+step s2detachfinal: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 FINALIZE; <waiting ...>
+step s1c: COMMIT;
+step s2detachfinal: <... completed>
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1c s2begin s2detachfinal s2commit
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1c: COMMIT;
+step s2begin: BEGIN;
+step s2detachfinal: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 FINALIZE;
+step s2commit: COMMIT;
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1c s2begin s2detachfinal s1spart s2commit
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1c: COMMIT;
+step s2begin: BEGIN;
+step s2detachfinal: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 FINALIZE;
+step s1spart: SELECT * FROM d3_listp1; <waiting ...>
+step s2commit: COMMIT;
+step s1spart: <... completed>
+a              
+
+1              
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1c s2begin s2detachfinal s1insertpart s2commit
+step s2snitch: INSERT INTO d3_pid SELECT pg_backend_pid();
+step s1b: BEGIN;
+step s1s: SELECT * FROM d3_listp;
+a              
+
+1              
+step s2detach: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; <waiting ...>
+step s1cancel: SELECT pg_cancel_backend(pid) FROM d3_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1c: COMMIT;
+step s2begin: BEGIN;
+step s2detachfinal: ALTER TABLE d3_listp DETACH PARTITION d3_listp1 FINALIZE;
+step s1insertpart: INSERT INTO d3_listp1 VALUES (1); <waiting ...>
+step s2commit: COMMIT;
+step s1insertpart: <... completed>
+unused step name: s1droppart
diff --git a/src/test/isolation/expected/detach-partition-concurrently-4.out b/src/test/isolation/expected/detach-partition-concurrently-4.out
new file mode 100644
index 0000000000..e52ed99aa6
--- /dev/null
+++ b/src/test/isolation/expected/detach-partition-concurrently-4.out
@@ -0,0 +1,69 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s2snitch s1b s1s s2detach s1cancel s1insert s1c
+step s2snitch: insert into d4_pid select pg_backend_pid();
+step s1b: begin;
+step s1s: select * from d4_primary;
+a              
+
+1              
+step s2detach: alter table d4_primary detach partition d4_primary1 concurrently; <waiting ...>
+step s1cancel: select pg_cancel_backend(pid) from d4_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1insert: insert into d4_fk values (1);
+ERROR:  insert or update on table "d4_fk" violates foreign key constraint "d4_fk_a_fkey"
+step s1c: commit;
+
+starting permutation: s2snitch s1brr s1s s2detach s1cancel s1insert s1c
+step s2snitch: insert into d4_pid select pg_backend_pid();
+step s1brr: begin isolation level repeatable read;
+step s1s: select * from d4_primary;
+a              
+
+1              
+step s2detach: alter table d4_primary detach partition d4_primary1 concurrently; <waiting ...>
+step s1cancel: select pg_cancel_backend(pid) from d4_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
+step s1insert: insert into d4_fk values (1);
+ERROR:  insert or update on table "d4_fk" violates foreign key constraint "d4_fk_a_fkey"
+step s1c: commit;
+
+starting permutation: s2snitch s1b s1s s2detach s3insert s1c
+step s2snitch: insert into d4_pid select pg_backend_pid();
+step s1b: begin;
+step s1s: select * from d4_primary;
+a              
+
+1              
+step s2detach: alter table d4_primary detach partition d4_primary1 concurrently; <waiting ...>
+step s3insert: insert into d4_fk values (1);
+ERROR:  insert or update on table "d4_fk" violates foreign key constraint "d4_fk_a_fkey"
+step s1c: commit;
+step s2detach: <... completed>
+
+starting permutation: s2snitch s1b s1s s2detach s3brr s3insert s3commit s1cancel
+step s2snitch: insert into d4_pid select pg_backend_pid();
+step s1b: begin;
+step s1s: select * from d4_primary;
+a              
+
+1              
+step s2detach: alter table d4_primary detach partition d4_primary1 concurrently; <waiting ...>
+step s3brr: begin isolation level repeatable read;
+step s3insert: insert into d4_fk values (1);
+ERROR:  insert or update on table "d4_fk" violates foreign key constraint "d4_fk_a_fkey"
+step s3commit: commit;
+step s1cancel: select pg_cancel_backend(pid) from d4_pid;
+pg_cancel_backend
+
+t              
+step s2detach: <... completed>
+error in steps s1cancel s2detach: ERROR:  canceling statement due to user request
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 5d6b79e66e..f4c01006fc 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -24,6 +24,10 @@ test: deadlock-hard
 test: deadlock-soft
 test: deadlock-soft-2
 test: deadlock-parallel
+test: detach-partition-concurrently-1
+test: detach-partition-concurrently-2
+test: detach-partition-concurrently-3
+test: detach-partition-concurrently-4
 test: fk-contention
 test: fk-deadlock
 test: fk-deadlock2
diff --git a/src/test/isolation/specs/detach-partition-concurrently-1.spec b/src/test/isolation/specs/detach-partition-concurrently-1.spec
new file mode 100644
index 0000000000..6ca0adac1d
--- /dev/null
+++ b/src/test/isolation/specs/detach-partition-concurrently-1.spec
@@ -0,0 +1,69 @@
+# Test that detach partition concurrently makes the partition invisible at the
+# correct time.
+
+setup
+{
+  DROP TABLE IF EXISTS d_listp, d_listp1, d_listp2;
+  CREATE TABLE d_listp (a int) PARTITION BY LIST(a);
+  CREATE TABLE d_listp1 PARTITION OF d_listp FOR VALUES IN (1);
+  CREATE TABLE d_listp2 PARTITION OF d_listp FOR VALUES IN (2);
+  INSERT INTO d_listp VALUES (1),(2);
+}
+
+teardown {
+    DROP TABLE IF EXISTS d_listp, d_listp2, d_listp_foobar;
+}
+
+session "s1"
+step "s1b"		{ BEGIN; }
+step "s1brr"	{ BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step "s1s"		{ SELECT * FROM d_listp; }
+step "s1ins"	{ INSERT INTO d_listp VALUES (1); }
+step "s1ins2"	{ INSERT INTO d_listp VALUES (2); }
+step "s1prep"	{ PREPARE f(int) AS INSERT INTO d_listp VALUES ($1); }
+step "s1prep1"	{ PREPARE f(int) AS INSERT INTO d_listp VALUES (1); }
+step "s1prep2"	{ PREPARE f(int) AS INSERT INTO d_listp VALUES (2); }
+step "s1exec1"	{ EXECUTE f(1); }
+step "s1exec2"	{ EXECUTE f(2); }
+step "s1dealloc" { DEALLOCATE f; }
+step "s1c"		{ COMMIT; }
+
+session "s2"
+step "s2detach"		{ ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; }
+step "s2drop"	{ DROP TABLE d_listp2; }
+
+session "s3"
+step "s3s"		{ SELECT * FROM d_listp; }
+step "s3i"		{ SELECT relpartbound IS NULL FROM pg_class where relname = 'd_listp2'; }
+step "s3ins2"		{ INSERT INTO d_listp VALUES (2); }
+
+# The transaction that detaches hangs until it sees any older transaction
+# terminate, as does anybody else.
+permutation "s1b" "s1s" "s2detach" "s1s" "s1c" "s1s"
+
+# relpartbound remains set until s1 commits
+# XXX this could be timing dependent :-(
+permutation "s1b" "s1s" "s2detach" "s1s" "s3s" "s3i" "s1c" "s3i" "s2drop" "s1s"
+
+# In read-committed mode, the partition disappears from view of concurrent
+# transactions immediately.  But if a write lock is held, then the detach
+# has to wait.
+permutation "s1b"   "s1s"          "s2detach" "s1ins" "s1s" "s1c"
+permutation "s1b"   "s1s" "s1ins2" "s2detach" "s1ins" "s1s" "s1c"
+
+# In repeatable-read mode, the partition remains visible until commit even
+# if the to-be-detached partition is not locked for write.
+permutation "s1brr" "s1s"          "s2detach" "s1ins" "s1s" "s1c"
+permutation "s1brr" "s1s"          "s2detach"         "s1s" "s1c"
+
+# Another process trying to acquire a write lock will be blocked behind the
+# detacher
+permutation "s1b" "s1ins2" "s2detach" "s3ins2" "s1c"
+
+# a prepared query is not blocked
+permutation "s1brr" "s1prep" "s1s" "s2detach" "s1s" "s1exec1" "s3s" "s1dealloc" "s1c"
+permutation "s1brr" "s1prep" "s1exec2" "s2detach" "s1s" "s1exec2" "s3s" "s1c" "s1dealloc"
+permutation "s1brr" "s1prep" "s1s" "s2detach" "s1s" "s1exec2" "s1c" "s1dealloc"
+permutation "s1brr" "s1prep" "s2detach" "s1s" "s1exec2" "s1c" "s1dealloc"
+permutation "s1brr" "s1prep1" "s2detach" "s1s" "s1exec2" "s1c" "s1dealloc"
+permutation "s1brr" "s1prep2" "s2detach" "s1s" "s1exec2" "s1c" "s1dealloc"
diff --git a/src/test/isolation/specs/detach-partition-concurrently-2.spec b/src/test/isolation/specs/detach-partition-concurrently-2.spec
new file mode 100644
index 0000000000..9281c80a69
--- /dev/null
+++ b/src/test/isolation/specs/detach-partition-concurrently-2.spec
@@ -0,0 +1,41 @@
+# Test that detach partition concurrently makes the partition safe
+# for foreign keys that reference it.
+
+setup
+{
+  DROP TABLE IF EXISTS d_lp_fk, d_lp_fk_1, d_lp_fk_2, d_lp_fk_r;
+
+  CREATE TABLE d_lp_fk (a int PRIMARY KEY) PARTITION BY LIST(a);
+  CREATE TABLE d_lp_fk_1 PARTITION OF d_lp_fk FOR VALUES IN (1);
+  CREATE TABLE d_lp_fk_2 PARTITION OF d_lp_fk FOR VALUES IN (2);
+  INSERT INTO d_lp_fk VALUES (1), (2);
+
+  CREATE TABLE d_lp_fk_r (a int references d_lp_fk);
+}
+
+teardown { DROP TABLE IF EXISTS d_lp_fk, d_lp_fk_1, d_lp_fk_2, d_lp_fk_r; }
+
+session "s1"
+step "s1b"		{ BEGIN; }
+step "s1s"		{ SELECT * FROM d_lp_fk; }
+step "s1c"		{ COMMIT; }
+
+session "s2"
+step "s2d"		{ ALTER TABLE d_lp_fk DETACH PARTITION d_lp_fk_1 CONCURRENTLY; }
+
+session "s3"
+step "s3b"		{ BEGIN; }
+step "s3i1"		{ INSERT INTO d_lp_fk_r VALUES (1); }
+step "s3i2"		{ INSERT INTO d_lp_fk_r VALUES (2); }
+step "s3c"		{ COMMIT; }
+
+# The transaction that detaches hangs until it sees any older transaction
+# terminate.
+permutation "s1b" "s1s" "s2d" "s3i1" "s1c"
+permutation "s1b" "s1s" "s2d" "s3i2" "s3i2" "s1c"
+
+permutation "s1b" "s1s" "s3i1" "s2d" "s1c"
+permutation "s1b" "s1s" "s3i2" "s2d" "s1c"
+
+# what if s3 has an uncommitted insertion?
+permutation "s1b" "s1s" "s3b" "s2d" "s3i1" "s1c" "s3c"
diff --git a/src/test/isolation/specs/detach-partition-concurrently-3.spec b/src/test/isolation/specs/detach-partition-concurrently-3.spec
new file mode 100644
index 0000000000..4b706430e1
--- /dev/null
+++ b/src/test/isolation/specs/detach-partition-concurrently-3.spec
@@ -0,0 +1,66 @@
+# Try various things to happen to a partition with an incomplete detach
+
+setup
+{
+  CREATE TABLE d3_listp (a int) PARTITION BY LIST(a);
+  CREATE TABLE d3_listp1 PARTITION OF d3_listp FOR VALUES IN (1);
+  CREATE TABLE d3_pid (pid int);
+  INSERT INTO d3_listp VALUES (1);
+}
+
+teardown {
+    DROP TABLE IF EXISTS d3_listp, d3_listp1, d3_pid;
+}
+
+session "s1"
+step "s1b"			{ BEGIN; }
+step "s1brr"		{ BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step "s1s"			{ SELECT * FROM d3_listp; }
+step "s1spart"		{ SELECT * FROM d3_listp1; }
+step "s1cancel"		{ SELECT pg_cancel_backend(pid) FROM d3_pid; }
+step "s1c"			{ COMMIT; }
+step "s1alter"		{ ALTER TABLE d3_listp1 ALTER a DROP NOT NULL; }
+step "s1insert"		{ INSERT INTO d3_listp VALUES (1); }
+step "s1insertpart"	{ INSERT INTO d3_listp1 VALUES (1); }
+step "s1drop"		{ DROP TABLE d3_listp; }
+step "s1droppart"	{ DROP TABLE d3_listp1; }
+step "s1trunc"		{ TRUNCATE TABLE d3_listp; }
+step "s1list"		{ SELECT relname FROM pg_catalog.pg_class
+					  WHERE relname LIKE 'd3_listp%' ORDER BY 1; }
+step "s1describe"	{ SELECT 'd3_listp' AS root, * FROM pg_partition_tree('d3_listp')
+					  UNION ALL SELECT 'd3_listp1', * FROM pg_partition_tree('d3_listp1'); }
+
+session "s2"
+step "s2begin"		{ BEGIN; }
+step "s2snitch"		{ INSERT INTO d3_pid SELECT pg_backend_pid(); }
+step "s2detach"		{ ALTER TABLE d3_listp DETACH PARTITION d3_listp1 CONCURRENTLY; }
+step "s2detachfinal"	{ ALTER TABLE d3_listp DETACH PARTITION d3_listp1 FINALIZE; }
+step "s2drop"		{ DROP TABLE d3_listp1; }
+step "s2commit"		{ COMMIT; }
+
+# Try various things while the partition is in "being detached" state, with
+# no session waiting.
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s1cancel" "s1c" "s1describe" "s1alter"
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s1cancel" "s1insert" "s1c"
+permutation "s2snitch" "s1brr" "s1s" "s2detach" "s1cancel" "s1insert" "s1c" "s1spart"
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s1cancel" "s1c" "s1insertpart"
+# "drop" here does both tables
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s1cancel" "s1c" "s1drop" "s1list"
+# "truncate" only does parent, not partition
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s1cancel" "s1c" "s1trunc" "s1spart"
+
+# When a partition with incomplete detach is dropped, we grab lock on parent too.
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s1cancel" "s1c" "s2begin" "s2drop" "s1s" "s2commit"
+
+# Partially detach, then select and try to complete the detach.  Reading
+# from partition blocks (AEL is required on partition); reading from parent
+# does not block.
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s1cancel" "s1c" "s1b" "s1spart" "s2detachfinal" "s1c"
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s1cancel" "s1c" "s1b" "s1s" "s2detachfinal" "s1c"
+
+# DETACH FINALIZE in a transaction block. No insert/select on the partition
+# is allowed concurrently with that.
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s1cancel" "s1c" "s1b" "s1spart" "s2detachfinal" "s1c"
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s1cancel" "s1c" "s2begin" "s2detachfinal" "s2commit"
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s1cancel" "s1c" "s2begin" "s2detachfinal" "s1spart" "s2commit"
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s1cancel" "s1c" "s2begin" "s2detachfinal" "s1insertpart" "s2commit"
diff --git a/src/test/isolation/specs/detach-partition-concurrently-4.spec b/src/test/isolation/specs/detach-partition-concurrently-4.spec
new file mode 100644
index 0000000000..fa72386da7
--- /dev/null
+++ b/src/test/isolation/specs/detach-partition-concurrently-4.spec
@@ -0,0 +1,36 @@
+# This test exercises behavior of foreign keys in the face of concurrent
+# detach of partitions in the referenced table.
+setup {
+	drop table if exists d4_primary, d4_primary1, d4_fk, d4_pid;
+	create table d4_primary (a int primary key) partition by list (a);
+	create table d4_primary1 partition of d4_primary for values in (1);
+	insert into d4_primary values (1);
+	create table d4_fk (a int references d4_primary);
+	create table d4_pid (pid int);
+}
+
+session "s1"
+step "s1b"		{ begin; }
+step "s1brr"	{ begin isolation level repeatable read; }
+step "s1s"		{ select * from d4_primary; }
+step "s1cancel" { select pg_cancel_backend(pid) from d4_pid; }
+step "s1insert"	{ insert into d4_fk values (1); }
+step "s1c"		{ commit; }
+
+session "s2"
+step "s2snitch" { insert into d4_pid select pg_backend_pid(); }
+step "s2detach" { alter table d4_primary detach partition d4_primary1 concurrently; }
+
+session "s3"
+step "s3brr"	{ begin isolation level repeatable read; }
+step "s3insert" { insert into d4_fk values (1); }
+step "s3commit" { commit; }
+
+# Trying to insert into a partially detached partition is rejected
+permutation "s2snitch" "s1b"   "s1s" "s2detach" "s1cancel" "s1insert" "s1c"
+# ... even under REPEATABLE READ mode.
+permutation "s2snitch" "s1brr" "s1s" "s2detach" "s1cancel" "s1insert" "s1c"
+
+# These other cases are fairly trivial.
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s3insert" "s1c"
+permutation "s2snitch" "s1b" "s1s" "s2detach" "s3brr" "s3insert" "s3commit" "s1cancel"
diff --git a/src/test/modules/delay_execution/Makefile b/src/test/modules/delay_execution/Makefile
index f270aebf3a..70f24e846d 100644
--- a/src/test/modules/delay_execution/Makefile
+++ b/src/test/modules/delay_execution/Makefile
@@ -7,7 +7,8 @@ OBJS = \
 	$(WIN32RES) \
 	delay_execution.o
 
-ISOLATION = partition-addition
+ISOLATION = partition-addition \
+	    partition-removal-1
 
 ifdef USE_PGXS
 PG_CONFIG = pg_config
diff --git a/src/test/modules/delay_execution/expected/partition-removal-1.out b/src/test/modules/delay_execution/expected/partition-removal-1.out
new file mode 100644
index 0000000000..427f41c9aa
--- /dev/null
+++ b/src/test/modules/delay_execution/expected/partition-removal-1.out
@@ -0,0 +1,175 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s3lock s1b s1exec s2remp s3check s3unlock s3check s1c
+step s3lock: SELECT pg_advisory_lock(12543);
+pg_advisory_lock
+
+               
+step s1b: BEGIN;
+step s1exec: SELECT * FROM partrem WHERE a <> 1 AND a <> (SELECT 3); <waiting ...>
+step s2remp: ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; <waiting ...>
+step s3check: SELECT * FROM partrem;
+a              b              
+
+1              ABC            
+3              DEF            
+step s3unlock: SELECT pg_advisory_unlock(12543);
+pg_advisory_unlock
+
+t              
+step s1exec: <... completed>
+a              b              
+
+2              JKL            
+step s3check: SELECT * FROM partrem;
+a              b              
+
+1              ABC            
+3              DEF            
+step s1c: COMMIT;
+step s2remp: <... completed>
+
+starting permutation: s3lock s1brr s1exec s2remp s3check s3unlock s3check s1c
+step s3lock: SELECT pg_advisory_lock(12543);
+pg_advisory_lock
+
+               
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1exec: SELECT * FROM partrem WHERE a <> 1 AND a <> (SELECT 3); <waiting ...>
+step s2remp: ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; <waiting ...>
+step s3check: SELECT * FROM partrem;
+a              b              
+
+1              ABC            
+3              DEF            
+step s3unlock: SELECT pg_advisory_unlock(12543);
+pg_advisory_unlock
+
+t              
+step s1exec: <... completed>
+a              b              
+
+2              JKL            
+step s3check: SELECT * FROM partrem;
+a              b              
+
+1              ABC            
+3              DEF            
+step s1c: COMMIT;
+step s2remp: <... completed>
+
+starting permutation: s3lock s1b s1exec2 s2remp s3unlock s1c
+step s3lock: SELECT pg_advisory_lock(12543);
+pg_advisory_lock
+
+               
+step s1b: BEGIN;
+step s1exec2: SELECT * FROM partrem WHERE a <> (SELECT 2) AND a <> 1; <waiting ...>
+step s2remp: ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; <waiting ...>
+step s3unlock: SELECT pg_advisory_unlock(12543);
+pg_advisory_unlock
+
+t              
+step s1exec2: <... completed>
+a              b              
+
+3              DEF            
+step s1c: COMMIT;
+step s2remp: <... completed>
+
+starting permutation: s3lock s1brr s1exec2 s2remp s3unlock s1c
+step s3lock: SELECT pg_advisory_lock(12543);
+pg_advisory_lock
+
+               
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1exec2: SELECT * FROM partrem WHERE a <> (SELECT 2) AND a <> 1; <waiting ...>
+step s2remp: ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; <waiting ...>
+step s3unlock: SELECT pg_advisory_unlock(12543);
+pg_advisory_unlock
+
+t              
+step s1exec2: <... completed>
+a              b              
+
+3              DEF            
+step s1c: COMMIT;
+step s2remp: <... completed>
+
+starting permutation: s3lock s1brr s1prepare s2remp s1execprep s3unlock s1check s1c s1check s1dealloc
+step s3lock: SELECT pg_advisory_lock(12543);
+pg_advisory_lock
+
+               
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prepare: PREPARE ins AS INSERT INTO partrem VALUES ($1, 'GHI');
+step s2remp: ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; <waiting ...>
+step s1execprep: EXECUTE ins(2); <waiting ...>
+step s3unlock: SELECT pg_advisory_unlock(12543);
+pg_advisory_unlock
+
+t              
+step s1execprep: <... completed>
+step s1check: SELECT * FROM partrem WHERE b = 'GHI';
+a              b              
+
+2              GHI            
+step s1c: COMMIT;
+step s2remp: <... completed>
+step s1check: SELECT * FROM partrem WHERE b = 'GHI';
+a              b              
+
+step s1dealloc: DEALLOCATE ins;
+
+starting permutation: s1brr s1prepare s2remp s3lock s1execprep s3unlock s1check s1c s1check s1dealloc
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prepare: PREPARE ins AS INSERT INTO partrem VALUES ($1, 'GHI');
+step s2remp: ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; <waiting ...>
+step s3lock: SELECT pg_advisory_lock(12543);
+pg_advisory_lock
+
+               
+step s1execprep: EXECUTE ins(2); <waiting ...>
+step s3unlock: SELECT pg_advisory_unlock(12543);
+pg_advisory_unlock
+
+t              
+step s1execprep: <... completed>
+step s1check: SELECT * FROM partrem WHERE b = 'GHI';
+a              b              
+
+2              GHI            
+step s1c: COMMIT;
+step s2remp: <... completed>
+step s1check: SELECT * FROM partrem WHERE b = 'GHI';
+a              b              
+
+step s1dealloc: DEALLOCATE ins;
+
+starting permutation: s1brr s1check s3lock s2remp s1prepare s1execprep s3unlock s1check s1c s1check s1dealloc
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1check: SELECT * FROM partrem WHERE b = 'GHI';
+a              b              
+
+step s3lock: SELECT pg_advisory_lock(12543);
+pg_advisory_lock
+
+               
+step s2remp: ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; <waiting ...>
+step s1prepare: PREPARE ins AS INSERT INTO partrem VALUES ($1, 'GHI');
+step s1execprep: EXECUTE ins(2); <waiting ...>
+step s3unlock: SELECT pg_advisory_unlock(12543);
+pg_advisory_unlock
+
+t              
+step s1execprep: <... completed>
+step s1check: SELECT * FROM partrem WHERE b = 'GHI';
+a              b              
+
+2              GHI            
+step s1c: COMMIT;
+step s2remp: <... completed>
+step s1check: SELECT * FROM partrem WHERE b = 'GHI';
+a              b              
+
+step s1dealloc: DEALLOCATE ins;
diff --git a/src/test/modules/delay_execution/specs/partition-removal-1.spec b/src/test/modules/delay_execution/specs/partition-removal-1.spec
new file mode 100644
index 0000000000..5ee2750129
--- /dev/null
+++ b/src/test/modules/delay_execution/specs/partition-removal-1.spec
@@ -0,0 +1,58 @@
+# Test removal of a partition with less-than-exclusive locking.
+
+setup
+{
+  CREATE TABLE partrem (a int, b text) PARTITION BY LIST(a);
+  CREATE TABLE partrem1 PARTITION OF partrem FOR VALUES IN (1);
+  CREATE TABLE partrem2 PARTITION OF partrem FOR VALUES IN (2);
+  CREATE TABLE partrem3 PARTITION OF partrem FOR VALUES IN (3);
+  INSERT INTO partrem VALUES (1, 'ABC');
+  INSERT INTO partrem VALUES (2, 'JKL');
+  INSERT INTO partrem VALUES (3, 'DEF');
+}
+
+teardown
+{
+  DROP TABLE IF EXISTS partrem, partrem2;
+}
+
+session "s1"
+setup		{ LOAD 'delay_execution';
+		  SET delay_execution.post_planning_lock_id = 12543; }
+step "s1b"	{ BEGIN; }
+step "s1brr"	{ BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step "s1exec"	{ SELECT * FROM partrem WHERE a <> 1 AND a <> (SELECT 3); }
+step "s1exec2"	{ SELECT * FROM partrem WHERE a <> (SELECT 2) AND a <> 1; }
+step "s1prepare" { PREPARE ins AS INSERT INTO partrem VALUES ($1, 'GHI'); }
+step "s1execprep" { EXECUTE ins(2); }
+step "s1check" { SELECT * FROM partrem WHERE b = 'GHI'; }
+step "s1c"	{ COMMIT; }
+step "s1dealloc" { DEALLOCATE ins; }
+
+session "s2"
+step "s2remp"	{ ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; }
+
+session "s3"
+step "s3lock"	{ SELECT pg_advisory_lock(12543); }
+step "s3unlock"	{ SELECT pg_advisory_unlock(12543); }
+step "s3check"	{ SELECT * FROM partrem; }
+
+# The SELECT will be planned with all three partitions shown above,
+# of which we expect partrem1 to be pruned at planning and partrem3 at
+# execution. Then we'll block, and by the time the query is actually
+# executed, detach of partrem2 is already underway (so its row doesn't
+# show up in s3's result), but we expect its row to still appear in the
+# result for s1.
+permutation "s3lock" "s1b" "s1exec" "s2remp" "s3check" "s3unlock" "s3check" "s1c"
+permutation "s3lock" "s1brr" "s1exec" "s2remp" "s3check" "s3unlock" "s3check" "s1c"
+
+# In this case we're testing that after pruning partrem2 at runtime, the
+# query still works correctly.
+permutation "s3lock" "s1b" "s1exec2" "s2remp" "s3unlock" "s1c"
+permutation "s3lock" "s1brr" "s1exec2" "s2remp" "s3unlock" "s1c"
+
+# In this case we test that an insert that's prepared in repeatable read
+# mode still works after detaching.
+permutation "s3lock" "s1brr" "s1prepare" "s2remp" "s1execprep" "s3unlock" "s1check" "s1c" "s1check" "s1dealloc"
+permutation "s1brr" "s1prepare" "s2remp" "s3lock" "s1execprep" "s3unlock" "s1check" "s1c" "s1check" "s1dealloc"
+permutation "s1brr" "s1check" "s3lock" "s2remp" "s1prepare" "s1execprep" "s3unlock" "s1check" "s1c" "s1check" "s1dealloc"
diff --git a/src/test/regress/expected/alter_table.out b/src/test/regress/expected/alter_table.out
index bb3f873f22..ec14b060a6 100644
--- a/src/test/regress/expected/alter_table.out
+++ b/src/test/regress/expected/alter_table.out
@@ -4163,6 +4163,35 @@ SELECT * from part_rp;
 (0 rows)
 
 DROP TABLE part_rp;
+-- concurrent detach
+CREATE TABLE range_parted2 (
+	a int
+) PARTITION BY RANGE(a);
+CREATE TABLE part_rp PARTITION OF range_parted2 FOR VALUES FROM (0) to (100);
+BEGIN;
+-- doesn't work in a partition block
+ALTER TABLE range_parted2 DETACH PARTITION part_rp CONCURRENTLY;
+ERROR:  ALTER TABLE ... DETACH CONCURRENTLY cannot run inside a transaction block
+COMMIT;
+CREATE TABLE part_rpd PARTITION OF range_parted2 DEFAULT;
+-- doesn't work if there's a default partition
+ALTER TABLE range_parted2 DETACH PARTITION part_rp CONCURRENTLY;
+ERROR:  cannot detach partitions concurrently when a default partition exists
+-- doesn't work for the default partition
+ALTER TABLE range_parted2 DETACH PARTITION part_rpd CONCURRENTLY;
+ERROR:  cannot detach partitions concurrently when a default partition exists
+DROP TABLE part_rpd;
+-- works fine
+ALTER TABLE range_parted2 DETACH PARTITION part_rp CONCURRENTLY;
+\d+ range_parted2
+                         Partitioned table "public.range_parted2"
+ Column |  Type   | Collation | Nullable | Default | Storage | Stats target | Description 
+--------+---------+-----------+----------+---------+---------+--------------+-------------
+ a      | integer |           |          |         | plain   |              | 
+Partition key: RANGE (a)
+Number of partitions: 0
+
+DROP TABLE range_parted2;
 -- Check ALTER TABLE commands for partitioned tables and partitions
 -- cannot add/drop column to/from *only* the parent
 ALTER TABLE ONLY list_parted2 ADD COLUMN c int;
diff --git a/src/test/regress/sql/alter_table.sql b/src/test/regress/sql/alter_table.sql
index 4cc55d8525..7a9c9252dc 100644
--- a/src/test/regress/sql/alter_table.sql
+++ b/src/test/regress/sql/alter_table.sql
@@ -2678,6 +2678,26 @@ DROP TABLE range_parted2;
 SELECT * from part_rp;
 DROP TABLE part_rp;
 
+-- concurrent detach
+CREATE TABLE range_parted2 (
+	a int
+) PARTITION BY RANGE(a);
+CREATE TABLE part_rp PARTITION OF range_parted2 FOR VALUES FROM (0) to (100);
+BEGIN;
+-- doesn't work in a partition block
+ALTER TABLE range_parted2 DETACH PARTITION part_rp CONCURRENTLY;
+COMMIT;
+CREATE TABLE part_rpd PARTITION OF range_parted2 DEFAULT;
+-- doesn't work if there's a default partition
+ALTER TABLE range_parted2 DETACH PARTITION part_rp CONCURRENTLY;
+-- doesn't work for the default partition
+ALTER TABLE range_parted2 DETACH PARTITION part_rpd CONCURRENTLY;
+DROP TABLE part_rpd;
+-- works fine
+ALTER TABLE range_parted2 DETACH PARTITION part_rp CONCURRENTLY;
+\d+ range_parted2
+DROP TABLE range_parted2;
+
 -- Check ALTER TABLE commands for partitioned tables and partitions
 
 -- cannot add/drop column to/from *only* the parent
-- 
2.20.1

Reply via email to