On Thu, Jul 26, 2018 at 11:14:31PM +0000, Bossart, Nathan wrote:
> On 7/26/18, 3:42 PM, "Michael Paquier" <mich...@paquier.xyz> wrote:
> Minus a possible documentation update, 0003 seems almost ready, too.

The docs mentioned that shared catalogs are processed, so I did not
bother, but visibly your comment is that we could be more precise about
the ownership in this case?  An attempt is attached.

> For 0002, would adding vacuum_skip_rel() before and after
> try_relation_open() in vacuum_rel() be enough?  That way, we could
> avoid emitting an ERROR for only the VACUUM FULL case (and skip it
> with a WARNING like everything else).

Er, we need a lock on it while looking at its data in vacuum_rel(), no?
So the call to vacuum_skip_rel() needs to happen after
try_relation_open(), once we are sure that the relation is opened.
Having two set of checks is actually better as the operation can involve
multiple operations.

The error messages generated by vacuum_skip_rel are not especially smart
when elevel >= ERROR.  As we need a proper errcode for that case as well
just using a separate error message is less confusing.  I have
implemented my idea in the updated set attached.  Another issue I have
found is that when doing for example a system-wide analyze, we would
finish with spurious warnings, as toast relations need to be discarded
from the first set of relations built.

Anyway, I have done more work on the patches, mainly I have fixed the
calls to RangeVarGetRelidExtended using booleans.  I have added
isolation tests for cases which are cheap, aka those not involving a
system-wide operation.  Running those tests on HEAD, it is easy to see
that TRUNCATE or VACUUM complete after a session doing a catalog lookup
commits its transaction.  VACUUM skips a relation and VACUUM FULL issues
an error.

Regarding those patches, I am pretty happy how things turn out for
TRUNCATE and REINDEX, way less for VACUUM, so getting 0001 and 0003
committed first makes the most sense to me as their logic is rather
straight-forward (well way of speaking ;p ).
--
Michael
From 4bb42ed30aae37c7a230ebe64b27a965b275f375 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Fri, 27 Jul 2018 09:50:05 +0900
Subject: [PATCH 1/3] Refactor TRUNCATE execution to avoid early lock lookups

A caller of TRUNCATE can perform early lookup obtention which can cause
other sessions to block on the request done, causing potentially DOS
attacks as even a non-privileged user can attempt a truncation of a
critical catalog table to block even all incoming connection attempts.

This commit fixes the case of TRUNCATE so as RangeVarGetRelidExtended is
used with a callback doing the necessary checks at an earlier stage,
avoiding locking issues at early stages.

Author: Michael Paquier
Discussion: https://postgr.es/m/152512087100.19803.12733865831237526...@wrigleys.postgresql.org
---
 src/backend/commands/tablecmds.c              | 83 +++++++++++++++----
 .../isolation/expected/truncate-conflict.out  | 45 ++++++++++
 src/test/isolation/isolation_schedule         |  1 +
 .../isolation/specs/truncate-conflict.spec    | 27 ++++++
 4 files changed, 138 insertions(+), 18 deletions(-)
 create mode 100644 src/test/isolation/expected/truncate-conflict.out
 create mode 100644 src/test/isolation/specs/truncate-conflict.spec

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index eb2d33dd86..82bb07049d 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -300,7 +300,10 @@ struct DropRelationCallbackState
 #define child_dependency_type(child_is_partition)	\
 	((child_is_partition) ? DEPENDENCY_AUTO : DEPENDENCY_NORMAL)
 
-static void truncate_check_rel(Relation rel);
+static void truncate_check_rel(Oid relid, Form_pg_class reltuple);
+static void truncate_check_session(Relation rel);
+static void RangeVarCallbackForTruncate(const RangeVar *relation,
+							Oid relId, Oid oldRelId, void *arg);
 static List *MergeAttributes(List *schema, List *supers, char relpersistence,
 				bool is_partition, List **supOids, List **supconstr,
 				int *supOidCount);
@@ -1336,15 +1339,25 @@ ExecuteTruncate(TruncateStmt *stmt)
 		bool		recurse = rv->inh;
 		Oid			myrelid;
 
-		rel = heap_openrv(rv, AccessExclusiveLock);
-		myrelid = RelationGetRelid(rel);
+		myrelid = RangeVarGetRelidExtended(rv, AccessExclusiveLock,
+										   0, RangeVarCallbackForTruncate,
+										   NULL);
+
+		/* open the relation, we already hold a lock on it */
+		rel = heap_open(myrelid, NoLock);
+
 		/* don't throw error for "TRUNCATE foo, foo" */
 		if (list_member_oid(relids, myrelid))
 		{
 			heap_close(rel, AccessExclusiveLock);
 			continue;
 		}
-		truncate_check_rel(rel);
+
+		/*
+		 * RangeVarGetRelidExtended has done some basic checks with its
+		 * callback, and only remain session-level checks.
+		 */
+		truncate_check_session(rel);
 		rels = lappend(rels, rel);
 		relids = lappend_oid(relids, myrelid);
 		/* Log this relation only if needed for logical decoding */
@@ -1367,7 +1380,9 @@ ExecuteTruncate(TruncateStmt *stmt)
 
 				/* find_all_inheritors already got lock */
 				rel = heap_open(childrelid, NoLock);
-				truncate_check_rel(rel);
+				truncate_check_rel(RelationGetRelid(rel), rel->rd_rel);
+				truncate_check_session(rel);
+
 				rels = lappend(rels, rel);
 				relids = lappend_oid(relids, childrelid);
 				/* Log this relation only if needed for logical decoding */
@@ -1450,7 +1465,8 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
 				ereport(NOTICE,
 						(errmsg("truncate cascades to table \"%s\"",
 								RelationGetRelationName(rel))));
-				truncate_check_rel(rel);
+				truncate_check_rel(relid, rel->rd_rel);
+				truncate_check_session(rel);
 				rels = lappend(rels, rel);
 				relids = lappend_oid(relids, relid);
 				/* Log this relation only if needed for logical decoding */
@@ -1700,38 +1716,47 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
 }
 
 /*
- * Check that a given rel is safe to truncate.  Subroutine for ExecuteTruncate
+ * Check that a given relation is safe to truncate.  Subroutine for
+ * ExecuteTruncate and RangeVarCallbackForTruncate.
  */
 static void
-truncate_check_rel(Relation rel)
+truncate_check_rel(Oid relid, Form_pg_class reltuple)
 {
 	AclResult	aclresult;
+	char	   *relname = NameStr(reltuple->relname);
 
 	/*
 	 * Only allow truncate on regular tables and partitioned tables (although,
 	 * the latter are only being included here for the following checks; no
 	 * physical truncation will occur in their case.)
 	 */
-	if (rel->rd_rel->relkind != RELKIND_RELATION &&
-		rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+	if (reltuple->relkind != RELKIND_RELATION &&
+
+		reltuple->relkind != RELKIND_PARTITIONED_TABLE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-				 errmsg("\"%s\" is not a table",
-						RelationGetRelationName(rel))));
+				 errmsg("\"%s\" is not a table", relname)));
 
 	/* Permissions checks */
-	aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
-								  ACL_TRUNCATE);
+	aclresult = pg_class_aclcheck(relid, GetUserId(), ACL_TRUNCATE);
 	if (aclresult != ACLCHECK_OK)
-		aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
-					   RelationGetRelationName(rel));
+		aclcheck_error(aclresult, get_relkind_objtype(reltuple->relkind),
+					   relname);
 
-	if (!allowSystemTableMods && IsSystemRelation(rel))
+	if (!allowSystemTableMods && IsSystemClass(relid, reltuple))
 		ereport(ERROR,
 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 				 errmsg("permission denied: \"%s\" is a system catalog",
-						RelationGetRelationName(rel))));
+						relname)));
+}
 
+/*
+ * Set of sanity checks at session-level to check if a given relation
+ * is safe to truncate.
+ */
+static void
+truncate_check_session(Relation rel)
+{
 	/*
 	 * Don't allow truncate on temp tables of other backends ... their local
 	 * buffer manager is not going to cope.
@@ -13419,6 +13444,28 @@ RangeVarCallbackOwnsTable(const RangeVar *relation,
 		aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relId)), relation->relname);
 }
 
+/*
+ * Callback to RangeVarGetRelidExtended() for TRUNCATE processing.
+ */
+static void
+RangeVarCallbackForTruncate(const RangeVar *relation,
+							Oid relId, Oid oldRelId, void *arg)
+{
+	HeapTuple	tuple;
+
+	/* Nothing to do if the relation was not found. */
+	if (!OidIsValid(relId))
+		return;
+
+	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId));
+	if (!HeapTupleIsValid(tuple))	/* should not happen */
+		elog(ERROR, "cache lookup failed for relation %u", relId);
+
+	truncate_check_rel(relId, (Form_pg_class) GETSTRUCT(tuple));
+
+	ReleaseSysCache(tuple);
+}
+
 /*
  * Callback to RangeVarGetRelidExtended(), similar to
  * RangeVarCallbackOwnsTable() but without checks on the type of the relation.
diff --git a/src/test/isolation/expected/truncate-conflict.out b/src/test/isolation/expected/truncate-conflict.out
new file mode 100644
index 0000000000..6938528380
--- /dev/null
+++ b/src/test/isolation/expected/truncate-conflict.out
@@ -0,0 +1,45 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_begin s1_catalog_lookup s2_auth s2_truncate s1_commit
+step s1_begin: BEGIN;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_truncate: TRUNCATE pg_authid;
+ERROR:  permission denied for table pg_authid
+step s1_commit: COMMIT;
+
+starting permutation: s1_begin s2_auth s2_truncate s1_catalog_lookup s1_commit
+step s1_begin: BEGIN;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_truncate: TRUNCATE pg_authid;
+ERROR:  permission denied for table pg_authid
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s1_commit: COMMIT;
+
+starting permutation: s1_begin s2_auth s1_catalog_lookup s2_truncate s1_commit
+step s1_begin: BEGIN;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s2_truncate: TRUNCATE pg_authid;
+ERROR:  permission denied for table pg_authid
+step s1_commit: COMMIT;
+
+starting permutation: s2_auth s2_truncate s1_begin s1_catalog_lookup s1_commit
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_truncate: TRUNCATE pg_authid;
+ERROR:  permission denied for table pg_authid
+step s1_begin: BEGIN;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s1_commit: COMMIT;
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index d5594e80e2..48ae740739 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -76,3 +76,4 @@ test: partition-key-update-2
 test: partition-key-update-3
 test: partition-key-update-4
 test: plpgsql-toast
+test: truncate-conflict
diff --git a/src/test/isolation/specs/truncate-conflict.spec b/src/test/isolation/specs/truncate-conflict.spec
new file mode 100644
index 0000000000..f70aca9f69
--- /dev/null
+++ b/src/test/isolation/specs/truncate-conflict.spec
@@ -0,0 +1,27 @@
+# Test for lock lookup conflicts with TRUNCATE when working on unowned
+# relations, particularly catalogs should trigger an ERROR for all the
+# scenarios present here.
+
+setup
+{
+	CREATE ROLE regress_truncate_conflict;
+}
+
+teardown
+{
+	DROP ROLE regress_truncate_conflict;
+}
+
+session "s1"
+step "s1_begin"          { BEGIN; }
+step "s1_catalog_lookup" { SELECT count(*) >= 0 FROM pg_stat_activity; }
+step "s1_commit"         { COMMIT; }
+
+session "s2"
+step "s2_auth"           { SET ROLE regress_truncate_conflict; }
+step "s2_truncate"       { TRUNCATE pg_authid; }
+
+permutation "s1_begin" "s1_catalog_lookup" "s2_auth" "s2_truncate" "s1_commit"
+permutation "s1_begin" "s2_auth" "s2_truncate" "s1_catalog_lookup" "s1_commit"
+permutation "s1_begin" "s2_auth" "s1_catalog_lookup" "s2_truncate" "s1_commit"
+permutation "s2_auth" "s2_truncate" "s1_begin" "s1_catalog_lookup" "s1_commit"
-- 
2.18.0

From b5094ec11384e785dd506430f1f90e56478e8e3d Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Fri, 27 Jul 2018 11:16:11 +0900
Subject: [PATCH 2/3] Refactor VACUUM execution to avoid early lock lookups

A caller of VACUUM can perform early lookup obtention which can cause
other sessions to block on the request done, causing potentially DOS
attacks as even a non-privileged user can attempt a truncation of a
critical catalog table to block even all incoming connection attempts.

Contrary to TRUNCATE, a client could attempt a system-wide VACUUM after
building the list of relations to VACUUM, which can cause vacuum_rel()
to try to lock the relation but the thing would just lock.  When the
client specifies a list of relations and the relation needs to be
skipped, fail hard so as there is no conflict with any relation a user
has no rights to work on.

vacuum_rel() already had the sanity checks needed, except that those
were applied too late.  This commit refactors the code so as relation
skips are checked beforehand, making it safe to lock lookup attacks.

Author: Michael Paquier
Discussion: https://postgr.es/m/152512087100.19803.12733865831237526...@wrigleys.postgresql.org
---
 src/backend/commands/vacuum.c                 | 183 +++++++++++++-----
 .../isolation/expected/vacuum-conflict.out    |  89 +++++++++
 src/test/isolation/isolation_schedule         |   1 +
 src/test/isolation/specs/vacuum-conflict.spec |  33 ++++
 4 files changed, 254 insertions(+), 52 deletions(-)
 create mode 100644 src/test/isolation/expected/vacuum-conflict.out
 create mode 100644 src/test/isolation/specs/vacuum-conflict.spec

diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 5736f12b8f..79dd01874e 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -68,7 +68,11 @@ static BufferAccessStrategy vac_strategy;
 
 
 /* non-export function prototypes */
-static List *expand_vacuum_rel(VacuumRelation *vrel);
+static void RangeVarCallbackForVacuum(const RangeVar *relation,
+				  Oid relId, Oid oldRelId, void *arg);
+static bool vacuum_check_rel(Oid relid, Form_pg_class reltuple, int elevel,
+							 bool include_toast);
+static List *expand_vacuum_rel(VacuumRelation *vrel, int options);
 static List *get_all_vacuum_rels(void);
 static void vac_truncate_clog(TransactionId frozenXID,
 				  MultiXactId minMulti,
@@ -257,7 +261,7 @@ vacuum(int options, List *relations, VacuumParams *params,
 			List	   *sublist;
 			MemoryContext old_context;
 
-			sublist = expand_vacuum_rel(vrel);
+			sublist = expand_vacuum_rel(vrel, options);
 			old_context = MemoryContextSwitchTo(vac_context);
 			newrels = list_concat(newrels, sublist);
 			MemoryContextSwitchTo(old_context);
@@ -408,6 +412,117 @@ vacuum(int options, List *relations, VacuumParams *params,
 	vac_context = NULL;
 }
 
+/*
+ * Callback to RangeVarGetRelidExtended() for VACUUM processing when
+ * the list of relations to work on is provided by caller of a manual
+ * VACUUM.
+ */
+void
+RangeVarCallbackForVacuum(const RangeVar *relation,
+						  Oid relId, Oid oldRelId, void *arg)
+{
+	HeapTuple	tuple;
+	int		   *options = (int *) arg;
+	Form_pg_class reltuple;
+
+	Assert(!IsAutoVacuumWorkerProcess());
+
+	/* Nothing to do if the relation was not found. */
+	if (!OidIsValid(relId))
+		return;
+
+	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId));
+	if (!HeapTupleIsValid(tuple))	/* should not happen */
+		elog(ERROR, "cache lookup failed for relation %u", relId);
+
+	reltuple = (Form_pg_class) GETSTRUCT(tuple);
+
+	/* Check permissions */
+
+	/*
+	 * VACUUM FULL takes an exclusive lock on the relation vacuumed, hence
+	 * restrict its access similarly to what vacuum_rel() would do.  Issuing
+	 * an ERROR makes sure that there is no lock escalation when a
+	 * non-privileged user lists this relation.
+	 */
+	if (((*options) & VACOPT_FULL) != 0)
+	{
+		(void) vacuum_check_rel(relId, reltuple, ERROR, false);
+	}
+
+	ReleaseSysCache(tuple);
+}
+
+/*
+ * Check if a given relation can be safely vacuumed or not.  If the
+ * relation needs to be skipped, issue a log message and return true to
+ * let the caller know what to do with this relation.
+ */
+static bool
+vacuum_check_rel(Oid relid, Form_pg_class reltuple, int elevel,
+				 bool include_toast)
+{
+	const char *errstr;
+	const char *logdetail;
+
+	/*
+	 * gettext_noop makes sure that no translation happens until the log
+	 * is generated.
+	 */
+	if (elevel >= ERROR)
+		errstr = gettext_noop("cannot vacuum relation \"%s\"");
+	else
+		errstr = gettext_noop("skipping vacuum of relation \"%s\"");
+
+	/*
+	 * Check permissions.
+	 *
+	 * We allow the user to vacuum a table if he is superuser, the table
+	 * owner, or the database owner (but in the latter case, only if it's not
+	 * a shared relation).  pg_class_ownercheck includes the superuser case.
+	 *
+	 * Note we may choose to treat permissions failure as a WARNING (depending
+	 * on elevel set by caller) and keep trying to vacuum the rest of the
+	 * database --- is this appropriate?
+	 */
+	if (!(pg_class_ownercheck(relid, GetUserId()) ||
+		  (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !reltuple->relisshared)))
+	{
+		if (reltuple->relisshared)
+			logdetail = gettext_noop("Only superuser can vacuum it.");
+		else if (reltuple->relnamespace == PG_CATALOG_NAMESPACE)
+			logdetail = gettext_noop("Only superuser or database owner can vacuum it.");
+		else
+			logdetail = gettext_noop("Only table or database owner can vacuum it.");
+
+		ereport(elevel,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 errmsg(errstr, NameStr(reltuple->relname)),
+				 errdetail_log("%s", logdetail)));
+		return true;
+	}
+
+	/*
+	 * Check that it's of a vacuumable relkind.  Toast relations are
+	 * discarded if this check is done at early stages when building the
+	 * list of relations to vacuum.
+	 */
+	if (reltuple->relkind != RELKIND_RELATION &&
+		reltuple->relkind != RELKIND_MATVIEW &&
+		reltuple->relkind != RELKIND_PARTITIONED_TABLE &&
+		!(include_toast && reltuple->relkind == RELKIND_TOASTVALUE))
+	{
+		ereport(elevel,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg(errstr, NameStr(reltuple->relname)),
+				 errdetail("Non-tables or special system tables are not allowed.")));
+		return true;
+	}
+
+	return false;
+}
+
+
 /*
  * Given a VacuumRelation, fill in the table OID if it wasn't specified,
  * and optionally add VacuumRelations for partitions of the table.
@@ -423,7 +538,7 @@ vacuum(int options, List *relations, VacuumParams *params,
  * are made in vac_context.
  */
 static List *
-expand_vacuum_rel(VacuumRelation *vrel)
+expand_vacuum_rel(VacuumRelation *vrel, int options)
 {
 	List	   *vacrels = NIL;
 	MemoryContext oldcontext;
@@ -454,7 +569,9 @@ expand_vacuum_rel(VacuumRelation *vrel)
 		 * below, as well as find_all_inheritors's expectation that the caller
 		 * holds some lock on the starting relation.
 		 */
-		relid = RangeVarGetRelid(vrel->relation, AccessShareLock, false);
+		relid = RangeVarGetRelidExtended(vrel->relation, AccessShareLock,
+										 0, RangeVarCallbackForVacuum,
+										 (void *) &options);
 
 		/*
 		 * Make a returnable VacuumRelation for this rel.
@@ -545,15 +662,10 @@ get_all_vacuum_rels(void)
 	{
 		Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
 		MemoryContext oldcontext;
+		Oid			relid = HeapTupleGetOid(tuple);
 
-		/*
-		 * We include partitioned tables here; depending on which operation is
-		 * to be performed, caller will decide whether to process or ignore
-		 * them.
-		 */
-		if (classForm->relkind != RELKIND_RELATION &&
-			classForm->relkind != RELKIND_MATVIEW &&
-			classForm->relkind != RELKIND_PARTITIONED_TABLE)
+		/* check permissions of relation */
+		if (vacuum_check_rel(relid, classForm, DEBUG1, false))
 			continue;
 
 		/*
@@ -563,7 +675,7 @@ get_all_vacuum_rels(void)
 		 */
 		oldcontext = MemoryContextSwitchTo(vac_context);
 		vacrels = lappend(vacrels, makeVacuumRelation(NULL,
-													  HeapTupleGetOid(tuple),
+													  relid,
 													  NIL));
 		MemoryContextSwitchTo(oldcontext);
 	}
@@ -1436,47 +1548,14 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	}
 
 	/*
-	 * Check permissions.
-	 *
-	 * We allow the user to vacuum a table if he is superuser, the table
-	 * owner, or the database owner (but in the latter case, only if it's not
-	 * a shared relation).  pg_class_ownercheck includes the superuser case.
-	 *
-	 * Note we choose to treat permissions failure as a WARNING and keep
-	 * trying to vacuum the rest of the DB --- is this appropriate?
+	 * Check if relation needs to be skipped.  This check happens as well
+	 * when building the relation list to VACUUM for a manual operation,
+	 * and needs to be done here as well as this could be spawned across
+	 * multiple transactions.
 	 */
-	if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
-		  (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
+	if (vacuum_check_rel(RelationGetRelid(onerel), onerel->rd_rel, WARNING,
+						 true))
 	{
-		if (onerel->rd_rel->relisshared)
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- only superuser can vacuum it",
-							RelationGetRelationName(onerel))));
-		else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
-							RelationGetRelationName(onerel))));
-		else
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
-							RelationGetRelationName(onerel))));
-		relation_close(onerel, lmode);
-		PopActiveSnapshot();
-		CommitTransactionCommand();
-		return false;
-	}
-
-	/*
-	 * Check that it's of a vacuumable relkind.
-	 */
-	if (onerel->rd_rel->relkind != RELKIND_RELATION &&
-		onerel->rd_rel->relkind != RELKIND_MATVIEW &&
-		onerel->rd_rel->relkind != RELKIND_TOASTVALUE &&
-		onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-	{
-		ereport(WARNING,
-				(errmsg("skipping \"%s\" --- cannot vacuum non-tables or special system tables",
-						RelationGetRelationName(onerel))));
 		relation_close(onerel, lmode);
 		PopActiveSnapshot();
 		CommitTransactionCommand();
diff --git a/src/test/isolation/expected/vacuum-conflict.out b/src/test/isolation/expected/vacuum-conflict.out
new file mode 100644
index 0000000000..a8b7120dd4
--- /dev/null
+++ b/src/test/isolation/expected/vacuum-conflict.out
@@ -0,0 +1,89 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_begin s1_catalog_lookup s2_auth s2_vacuum s1_commit
+step s1_begin: BEGIN;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s2_auth: SET ROLE regress_truncate_conflict;
+WARNING:  skipping vacuum of relation "pg_authid"
+step s2_vacuum: VACUUM pg_authid;
+step s1_commit: COMMIT;
+
+starting permutation: s1_begin s2_auth s2_vacuum s1_catalog_lookup s1_commit
+step s1_begin: BEGIN;
+step s2_auth: SET ROLE regress_truncate_conflict;
+WARNING:  skipping vacuum of relation "pg_authid"
+step s2_vacuum: VACUUM pg_authid;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s1_commit: COMMIT;
+
+starting permutation: s1_begin s2_auth s1_catalog_lookup s2_vacuum s1_commit
+step s1_begin: BEGIN;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+WARNING:  skipping vacuum of relation "pg_authid"
+step s2_vacuum: VACUUM pg_authid;
+step s1_commit: COMMIT;
+
+starting permutation: s2_auth s2_vacuum s1_begin s1_catalog_lookup s1_commit
+step s2_auth: SET ROLE regress_truncate_conflict;
+WARNING:  skipping vacuum of relation "pg_authid"
+step s2_vacuum: VACUUM pg_authid;
+step s1_begin: BEGIN;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s1_commit: COMMIT;
+
+starting permutation: s1_begin s1_catalog_lookup s2_auth s2_full s1_commit
+step s1_begin: BEGIN;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_full: VACUUM FULL pg_authid;
+ERROR:  cannot vacuum relation "pg_authid"
+step s1_commit: COMMIT;
+
+starting permutation: s1_begin s2_auth s2_full s1_catalog_lookup s1_commit
+step s1_begin: BEGIN;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_full: VACUUM FULL pg_authid;
+ERROR:  cannot vacuum relation "pg_authid"
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s1_commit: COMMIT;
+
+starting permutation: s1_begin s2_auth s1_catalog_lookup s2_full s1_commit
+step s1_begin: BEGIN;
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s2_full: VACUUM FULL pg_authid;
+ERROR:  cannot vacuum relation "pg_authid"
+step s1_commit: COMMIT;
+
+starting permutation: s2_auth s2_full s1_begin s1_catalog_lookup s1_commit
+step s2_auth: SET ROLE regress_truncate_conflict;
+step s2_full: VACUUM FULL pg_authid;
+ERROR:  cannot vacuum relation "pg_authid"
+step s1_begin: BEGIN;
+step s1_catalog_lookup: SELECT count(*) >= 0 FROM pg_stat_activity;
+?column?       
+
+t              
+step s1_commit: COMMIT;
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 48ae740739..0ff25888af 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -77,3 +77,4 @@ test: partition-key-update-3
 test: partition-key-update-4
 test: plpgsql-toast
 test: truncate-conflict
+test: vacuum-conflict
diff --git a/src/test/isolation/specs/vacuum-conflict.spec b/src/test/isolation/specs/vacuum-conflict.spec
new file mode 100644
index 0000000000..b9bd174014
--- /dev/null
+++ b/src/test/isolation/specs/vacuum-conflict.spec
@@ -0,0 +1,33 @@
+# Test for lock lookup conflicts with VACUUM when working on unowned
+# relations, particularly catalogs should trigger an ERROR for VACUUM
+# FULL and fall through for a manual VACUUM, skipping the relation.
+
+setup
+{
+	CREATE ROLE regress_truncate_conflict;
+}
+
+teardown
+{
+	DROP ROLE regress_truncate_conflict;
+}
+
+session "s1"
+step "s1_begin"          { BEGIN; }
+step "s1_catalog_lookup" { SELECT count(*) >= 0 FROM pg_stat_activity; }
+step "s1_commit"         { COMMIT; }
+
+session "s2"
+step "s2_auth"           { SET ROLE regress_truncate_conflict; }
+step "s2_vacuum"         { VACUUM pg_authid; }
+step "s2_full"           { VACUUM FULL pg_authid; }
+
+permutation "s1_begin" "s1_catalog_lookup" "s2_auth" "s2_vacuum" "s1_commit"
+permutation "s1_begin" "s2_auth" "s2_vacuum" "s1_catalog_lookup" "s1_commit"
+permutation "s1_begin" "s2_auth" "s1_catalog_lookup" "s2_vacuum" "s1_commit"
+permutation "s2_auth" "s2_vacuum" "s1_begin" "s1_catalog_lookup" "s1_commit"
+
+permutation "s1_begin" "s1_catalog_lookup" "s2_auth" "s2_full" "s1_commit"
+permutation "s1_begin" "s2_auth" "s2_full" "s1_catalog_lookup" "s1_commit"
+permutation "s1_begin" "s2_auth" "s1_catalog_lookup" "s2_full" "s1_commit"
+permutation "s2_auth" "s2_full" "s1_begin" "s1_catalog_lookup" "s1_commit"
-- 
2.18.0

From c788ec02fd2695faeb8dc791335cced849065714 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 26 Jul 2018 14:12:38 +0900
Subject: [PATCH 3/3] Restrict access to system-wide REINDEX for non-privileged
 users

A database owner running a system-wide REINDEX has the possibility to
also do the operation on critical system catalogs, which can cause a
PostgreSQL to go unresponsive or even block authentication.  This commit
makes sure that a user running a REINDEX SYSTEM or DATABASE only works
on the following relations:
- The user is a superuser
- The user is the table owner
- The user is the database owner, only if the relation worked on is not
shared.

Author: Michael Paquier
Discussion: https://postgr.es/m/152512087100.19803.12733865831237526...@wrigleys.postgresql.org
---
 doc/src/sgml/ref/reindex.sgml    |  3 ++-
 src/backend/commands/indexcmds.c | 10 ++++++++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/ref/reindex.sgml b/doc/src/sgml/ref/reindex.sgml
index 1c21fafb80..5cd1c56352 100644
--- a/doc/src/sgml/ref/reindex.sgml
+++ b/doc/src/sgml/ref/reindex.sgml
@@ -227,7 +227,8 @@ REINDEX [ ( VERBOSE ) ] { INDEX | TABLE | SCHEMA | DATABASE | SYSTEM } <replacea
    Reindexing a single index or table requires being the owner of that
    index or table.  Reindexing a database requires being the owner of
    the database (note that the owner can therefore rebuild indexes of
-   tables owned by other users).  Of course, superusers can always
+   tables owned by other users).  Reindexing a shared catalog requires
+   being the owner of that shared catalog.  Of course, superusers can always
    reindex anything.
   </para>
 
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index b9dad9672e..1bfc34bc70 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -2415,6 +2415,16 @@ ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
 			!IsSystemClass(relid, classtuple))
 			continue;
 
+		/*
+		 * We allow the user to reindex a table if he is superuser, the table
+		 * owner, or the database owner (but in the latter case, only if it's not
+		 * a shared relation).
+		 */
+		if (!(pg_class_ownercheck(relid, GetUserId()) ||
+			  (pg_database_ownercheck(MyDatabaseId, GetUserId()) &&
+			   !classtuple->relisshared)))
+			continue;
+
 		/* Save the list of relation OIDs in private context */
 		old = MemoryContextSwitchTo(private_context);
 
-- 
2.18.0

Attachment: signature.asc
Description: PGP signature

Reply via email to