(2010/01/24 12:40), KaiGai Kohei wrote:
Perhaps, it may be a good idea to make two conceptual patches both head of
the ATPrepCmd() and ATExec*(). They will make clear good/bad points between
two approaches.

I tried to make two conceptual patches.

* pgsql-at-rework-prep.1.patch

It adds ATPermCmd(Relation rel, AlterTableCmd *cmd) that is called from the
head of ATPrepCmd(). This function enables to divide the logic of permission
checks depending on cmd->subtype from ATPrepCmd().
In most of subcommand (it does not check permission except for ownership of
the relation to be altered), it calls ATSimplePermissions() or similar.
Or, it does not anything at the stage for rest of exceptions.

Good: Here is only one entrypoint to call ATPermCmd().
 Bad: Although most of logics are consolidated into ATPremCmd(), we need to
      put individual checks on some of exception cases.

Was it matching with what you suggested? Or, am I missing something?

* pgsql-at-rework-exec.2.patch

It moves permission checks into the head (or just after all needed information
was gathered) of ATExec*() functions. The ATPrepCmd() checks only correctness
of relkind and ensure the relation is not system catalog.
This basis/concept can be applied to ALTER TABLE RENAME TO/SET SCHEMA cases 
also.

Good: Concept is clear, and less exceptions.
Good: We can apply this concept (just before execution) on other database
      objects which does not have explicit preparation stage.
 Bad: All the ATExec*() function has to call the permission checks.

My preference is the later approach. Indeed, it has larger number of entrypoints
compared to the ATPermCom() functions, but its concept is clear and we can also
apply same basis on the code path that does not go through ATPrepCmd().

P.S In the right way, this patch should also touch CheckRelationOwnership() or
    DefineIndex() logic, but I omitted them because of simplifies.

Thanks,
--
KaiGai Kohei <kai...@kaigai.gr.jp>
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
***************
*** 262,269 **** static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
  static void ATRewriteTables(List **wqueue);
  static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
  static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
! static void ATSimplePermissions(Relation rel, bool allowView);
! static void ATSimplePermissionsRelationOrIndex(Relation rel);
  static void ATSimpleRecursion(List **wqueue, Relation rel,
  				  AlterTableCmd *cmd, bool recurse);
  static void ATOneLevelRecursion(List **wqueue, Relation rel,
--- 262,270 ----
  static void ATRewriteTables(List **wqueue);
  static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
  static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
! static void ATCheckRelkind(Relation rel, bool viewOK, bool indexOK);
! static void ATCheckOwnership(Relation rel);
! static void ATCheckNoCatalog(Relation rel);
  static void ATSimpleRecursion(List **wqueue, Relation rel,
  				  AlterTableCmd *cmd, bool recurse);
  static void ATOneLevelRecursion(List **wqueue, Relation rel,
***************
*** 280,291 **** static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
  				 const char *colName);
  static void ATExecColumnDefault(Relation rel, const char *colName,
  					Node *newDefault);
- static void ATPrepSetStatistics(Relation rel, const char *colName,
- 					Node *newValue);
  static void ATExecSetStatistics(Relation rel, const char *colName,
  					Node *newValue);
- static void ATPrepSetDistinct(Relation rel, const char *colName,
- 					Node *newValue);
  static void ATExecSetDistinct(Relation rel, const char *colName,
  				 Node *newValue);
  static void ATExecSetStorage(Relation rel, const char *colName,
--- 281,288 ----
***************
*** 1929,1942 **** renameatt(Oid myrelid,
  	 *
  	 * normally, only the owner of a class can change its schema.
  	 */
! 	if (!pg_class_ownercheck(myrelid, GetUserId()))
! 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
! 					   RelationGetRelationName(targetrelation));
! 	if (!allowSystemTableMods && IsSystemRelation(targetrelation))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! 				 errmsg("permission denied: \"%s\" is a system catalog",
! 						RelationGetRelationName(targetrelation))));
  
  	/*
  	 * if the 'recurse' flag is set then we are supposed to rename this
--- 1926,1934 ----
  	 *
  	 * normally, only the owner of a class can change its schema.
  	 */
! 	ATCheckOwnership(targetrelation);
! 
! 	ATCheckNoCatalog(targetrelation);
  
  	/*
  	 * if the 'recurse' flag is set then we are supposed to rename this
***************
*** 2048,2053 **** RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
--- 2040,2046 ----
  	Relation	targetrelation;
  	Oid			namespaceId;
  	char		relkind;
+ 	AclResult	aclresult;
  
  	/*
  	 * Grab an exclusive lock on the target table, index, sequence or view,
***************
*** 2075,2080 **** RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
--- 2068,2086 ----
  						RelationGetRelationName(targetrelation))));
  
  	/*
+ 	 * Permission checks
+ 	 * XXX - CheckRelationOwnership() should be removed from alter.c
+ 	 */
+ 	ATCheckOwnership(targetrelation);
+ 
+ 	ATCheckNoCatalog(targetrelation);
+ 
+ 	aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), ACL_CREATE);
+ 	if (aclresult != ACLCHECK_OK)
+ 		aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+ 					   get_namespace_name(namespaceId));
+ 
+ 	/*
  	 * Don't allow ALTER TABLE on composite types. We want people to use ALTER
  	 * TYPE for that.
  	 */
***************
*** 2382,2395 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  	switch (cmd->subtype)
  	{
  		case AT_AddColumn:		/* ADD COLUMN */
! 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			ATPrepAddColumn(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
  			break;
  		case AT_AddColumnToView:		/* add column via CREATE OR REPLACE
  										 * VIEW */
! 			ATSimplePermissions(rel, true);
  			/* Performs own recursion */
  			ATPrepAddColumn(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
--- 2388,2403 ----
  	switch (cmd->subtype)
  	{
  		case AT_AddColumn:		/* ADD COLUMN */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Performs own recursion */
  			ATPrepAddColumn(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
  			break;
  		case AT_AddColumnToView:		/* add column via CREATE OR REPLACE
  										 * VIEW */
! 			ATCheckRelkind(rel, true, false);
! 			ATCheckNoCatalog(rel);
  			/* Performs own recursion */
  			ATPrepAddColumn(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
***************
*** 2402,2444 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			 * substitutes default values into INSERTs before it expands
  			 * rules.
  			 */
! 			ATSimplePermissions(rel, true);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
! 			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
! 			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN SET STATISTICS */
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
! 			/* Performs own permission checks */
! 			ATPrepSetStatistics(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetDistinct:	/* ALTER COLUMN SET STATISTICS DISTINCT */
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
! 			/* Performs own permission checks */
! 			ATPrepSetDistinct(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN SET STORAGE */
! 			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_DropColumn:		/* DROP COLUMN */
! 			ATSimplePermissions(rel, false);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
--- 2410,2455 ----
  			 * substitutes default values into INSERTs before it expands
  			 * rules.
  			 */
! 			ATCheckRelkind(rel, true, false);
! 			ATCheckNoCatalog(rel);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN SET STATISTICS */
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
! 			ATCheckRelkind(rel, false, true);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetDistinct:	/* ALTER COLUMN SET STATISTICS DISTINCT */
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
! 			ATCheckRelkind(rel, false, true);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN SET STORAGE */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_DropColumn:		/* DROP COLUMN */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
***************
*** 2446,2458 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_DROP;
  			break;
  		case AT_AddIndex:		/* ADD INDEX */
! 			ATSimplePermissions(rel, false);
  			/* This command never recurses */
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_INDEX;
  			break;
  		case AT_AddConstraint:	/* ADD CONSTRAINT */
! 			ATSimplePermissions(rel, false);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
--- 2457,2471 ----
  			pass = AT_PASS_DROP;
  			break;
  		case AT_AddIndex:		/* ADD INDEX */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* This command never recurses */
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_INDEX;
  			break;
  		case AT_AddConstraint:	/* ADD CONSTRAINT */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
***************
*** 2460,2466 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_DropConstraint:	/* DROP CONSTRAINT */
! 			ATSimplePermissions(rel, false);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
--- 2473,2480 ----
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_DropConstraint:	/* DROP CONSTRAINT */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
***************
*** 2468,2474 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_DROP;
  			break;
  		case AT_AlterColumnType:		/* ALTER COLUMN TYPE */
! 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
  			pass = AT_PASS_ALTER_TYPE;
--- 2482,2489 ----
  			pass = AT_PASS_DROP;
  			break;
  		case AT_AlterColumnType:		/* ALTER COLUMN TYPE */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Performs own recursion */
  			ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
  			pass = AT_PASS_ALTER_TYPE;
***************
*** 2480,2499 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			break;
  		case AT_ClusterOn:		/* CLUSTER ON */
  		case AT_DropCluster:	/* SET WITHOUT CLUSTER */
! 			ATSimplePermissions(rel, false);
  			/* These commands never recurse */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
  			break;
  		case AT_AddOids:		/* SET WITH OIDS */
! 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			if (!rel->rd_rel->relhasoids || recursing)
  				ATPrepAddOids(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
  			break;
  		case AT_DropOids:		/* SET WITHOUT OIDS */
! 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			if (rel->rd_rel->relhasoids)
  			{
--- 2495,2517 ----
  			break;
  		case AT_ClusterOn:		/* CLUSTER ON */
  		case AT_DropCluster:	/* SET WITHOUT CLUSTER */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* These commands never recurse */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
  			break;
  		case AT_AddOids:		/* SET WITH OIDS */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Performs own recursion */
  			if (!rel->rd_rel->relhasoids || recursing)
  				ATPrepAddOids(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
  			break;
  		case AT_DropOids:		/* SET WITHOUT OIDS */
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* Performs own recursion */
  			if (rel->rd_rel->relhasoids)
  			{
***************
*** 2507,2520 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetTableSpace:	/* SET TABLESPACE */
! 			ATSimplePermissionsRelationOrIndex(rel);
  			/* This command never recurses */
  			ATPrepSetTableSpace(tab, rel, cmd->name);
  			pass = AT_PASS_MISC;	/* doesn't actually matter */
  			break;
  		case AT_SetRelOptions:	/* SET (...) */
  		case AT_ResetRelOptions:		/* RESET (...) */
! 			ATSimplePermissionsRelationOrIndex(rel);
  			/* This command never recurses */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
--- 2525,2540 ----
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetTableSpace:	/* SET TABLESPACE */
! 			ATCheckRelkind(rel, false, true);
! 			ATCheckNoCatalog(rel);
  			/* This command never recurses */
  			ATPrepSetTableSpace(tab, rel, cmd->name);
  			pass = AT_PASS_MISC;	/* doesn't actually matter */
  			break;
  		case AT_SetRelOptions:	/* SET (...) */
  		case AT_ResetRelOptions:		/* RESET (...) */
! 			ATCheckRelkind(rel, false, true);
! 			ATCheckNoCatalog(rel);
  			/* This command never recurses */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
***************
*** 2533,2539 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  		case AT_DisableRule:
  		case AT_AddInherit:		/* INHERIT / NO INHERIT */
  		case AT_DropInherit:
! 			ATSimplePermissions(rel, false);
  			/* These commands never recurse */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
--- 2553,2560 ----
  		case AT_DisableRule:
  		case AT_AddInherit:		/* INHERIT / NO INHERIT */
  		case AT_DropInherit:
! 			ATCheckRelkind(rel, false, false);
! 			ATCheckNoCatalog(rel);
  			/* These commands never recurse */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
***************
*** 2709,2715 **** ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
--- 2730,2746 ----
  			 */
  			break;
  		case AT_SetTableSpace:	/* SET TABLESPACE */
+ 			{
+ 				Oid			tablespaceId = tab->newTableSpace;
+ 				AclResult	aclresult;
  
+ 				ATCheckOwnership(rel);
+ 				aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
+ 												   ACL_CREATE);
+ 				if (aclresult != ACLCHECK_OK)
+ 					aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
+ 								   get_tablespace_name(tablespaceId));
+ 			}
  			/*
  			 * Nothing to do here; Phase 3 does the work
  			 */
***************
*** 3278,3343 **** ATGetQueueEntry(List **wqueue, Relation rel)
  }
  
  /*
!  * ATSimplePermissions
   *
!  * - Ensure that it is a relation (or possibly a view)
!  * - Ensure this user is the owner
!  * - Ensure that it is not a system table
   */
  static void
! ATSimplePermissions(Relation rel, bool allowView)
  {
! 	if (rel->rd_rel->relkind != RELKIND_RELATION)
! 	{
! 		if (allowView)
! 		{
! 			if (rel->rd_rel->relkind != RELKIND_VIEW)
! 				ereport(ERROR,
! 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 						 errmsg("\"%s\" is not a table or view",
! 								RelationGetRelationName(rel))));
! 		}
! 		else
! 			ereport(ERROR,
! 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 					 errmsg("\"%s\" is not a table",
! 							RelationGetRelationName(rel))));
! 	}
  
! 	/* Permissions checks */
! 	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
! 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
! 					   RelationGetRelationName(rel));
! 
! 	if (!allowSystemTableMods && IsSystemRelation(rel))
  		ereport(ERROR,
! 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! 				 errmsg("permission denied: \"%s\" is a system catalog",
! 						RelationGetRelationName(rel))));
  }
  
- /*
-  * ATSimplePermissionsRelationOrIndex
-  *
-  * - Ensure that it is a relation or an index
-  * - Ensure this user is the owner
-  * - Ensure that it is not a system table
-  */
  static void
! ATSimplePermissionsRelationOrIndex(Relation rel)
  {
- 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
- 		rel->rd_rel->relkind != RELKIND_INDEX)
- 		ereport(ERROR,
- 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- 				 errmsg("\"%s\" is not a table or index",
- 						RelationGetRelationName(rel))));
- 
- 	/* Permissions checks */
  	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
  		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
  					   RelationGetRelationName(rel));
  
  	if (!allowSystemTableMods && IsSystemRelation(rel))
  		ereport(ERROR,
  				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
--- 3309,3347 ----
  }
  
  /*
!  * ATCheckRelkind
!  * ATCheckOwnership
!  * ATCheckOwnership
   *
!  * These are originally checked in ATSimplePermissions
   */
  static void
! ATCheckRelkind(Relation rel, bool viewOK, bool indexOK)
  {
! 	char	relkind = rel->rd_rel->relkind;
  
! 	if (relkind != RELKIND_RELATION &&
! 		(!viewOK || relkind != RELKIND_VIEW) &&
! 		(!indexOK || relkind != RELKIND_INDEX))
  		ereport(ERROR,
! 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 				 errmsg("\"%s\" is not a table%s%s",
! 						RelationGetRelationName(rel),
! 						!viewOK ? "" : " or view",
! 						!indexOK ? "" : " or index")));
  }
  
  static void
! ATCheckOwnership(Relation rel)
  {
  	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
  		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
  					   RelationGetRelationName(rel));
+ }
  
+ static void
+ ATCheckNoCatalog(Relation rel)
+ {
  	if (!allowSystemTableMods && IsSystemRelation(rel))
  		ereport(ERROR,
  				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
***************
*** 3584,3589 **** ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
--- 3588,3596 ----
  	Form_pg_type tform;
  	Expr	   *defval;
  
+ 	/* permission check to add a new column */
+ 	ATCheckOwnership(rel);
+ 
  	attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
  
  	/*
***************
*** 3884,3889 **** ATExecDropNotNull(Relation rel, const char *colName)
--- 3891,3899 ----
  	List	   *indexoidlist;
  	ListCell   *indexoidscan;
  
+ 	/* permission check */
+ 	ATCheckOwnership(rel);
+ 
  	/*
  	 * lookup the attribute
  	 */
***************
*** 3976,3981 **** ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
--- 3986,3994 ----
  	AttrNumber	attnum;
  	Relation	attr_rel;
  
+ 	/* permission check */
+ 	ATCheckOwnership(rel);
+ 
  	/*
  	 * lookup the attribute
  	 */
***************
*** 4026,4031 **** ATExecColumnDefault(Relation rel, const char *colName,
--- 4039,4047 ----
  {
  	AttrNumber	attnum;
  
+ 	/* permission check */
+ 	ATCheckOwnership(rel);
+ 
  	/*
  	 * get the number of the attribute
  	 */
***************
*** 4071,4098 **** ATExecColumnDefault(Relation rel, const char *colName,
   * ALTER TABLE ALTER COLUMN SET STATISTICS
   */
  static void
- ATPrepSetStatistics(Relation rel, const char *colName, Node *newValue)
- {
- 	/*
- 	 * We do our own permission checking because (a) we want to allow SET
- 	 * STATISTICS on indexes (for expressional index columns), and (b) we want
- 	 * to allow SET STATISTICS on system catalogs without requiring
- 	 * allowSystemTableMods to be turned on.
- 	 */
- 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
- 		rel->rd_rel->relkind != RELKIND_INDEX)
- 		ereport(ERROR,
- 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- 				 errmsg("\"%s\" is not a table or index",
- 						RelationGetRelationName(rel))));
- 
- 	/* Permissions checks */
- 	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
- 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- 					   RelationGetRelationName(rel));
- }
- 
- static void
  ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
  {
  	int			newtarget;
--- 4087,4092 ----
***************
*** 4100,4105 **** ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
--- 4094,4102 ----
  	HeapTuple	tuple;
  	Form_pg_attribute attrtuple;
  
+ 	/* permission check */
+ 	ATCheckOwnership(rel);
+ 
  	Assert(IsA(newValue, Integer));
  	newtarget = intVal(newValue);
  
***************
*** 4155,4182 **** ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
   * ALTER TABLE ALTER COLUMN SET STATISTICS DISTINCT
   */
  static void
- ATPrepSetDistinct(Relation rel, const char *colName, Node *newValue)
- {
- 	/*
- 	 * We do our own permission checking because (a) we want to allow SET
- 	 * DISTINCT on indexes (for expressional index columns), and (b) we want
- 	 * to allow SET DISTINCT on system catalogs without requiring
- 	 * allowSystemTableMods to be turned on.
- 	 */
- 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
- 		rel->rd_rel->relkind != RELKIND_INDEX)
- 		ereport(ERROR,
- 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- 				 errmsg("\"%s\" is not a table or index",
- 						RelationGetRelationName(rel))));
- 
- 	/* Permissions checks */
- 	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
- 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- 					   RelationGetRelationName(rel));
- }
- 
- static void
  ATExecSetDistinct(Relation rel, const char *colName, Node *newValue)
  {
  	float4		newdistinct;
--- 4152,4157 ----
***************
*** 4184,4189 **** ATExecSetDistinct(Relation rel, const char *colName, Node *newValue)
--- 4159,4167 ----
  	HeapTuple	tuple;
  	Form_pg_attribute attrtuple;
  
+ 	/* permission check */
+ 	ATCheckOwnership(rel);
+ 
  	switch (nodeTag(newValue))
  	{
  		case T_Integer:
***************
*** 4251,4256 **** ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
--- 4229,4237 ----
  	HeapTuple	tuple;
  	Form_pg_attribute attrtuple;
  
+ 	/* permission check */
+ 	ATCheckOwnership(rel);
+ 
  	Assert(IsA(newValue, String));
  	storagemode = strVal(newValue);
  
***************
*** 4332,4340 **** ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
  	List	   *children;
  	ObjectAddress object;
  
! 	/* At top level, permission check was done in ATPrepCmd, else do it */
  	if (recursing)
! 		ATSimplePermissions(rel, false);
  
  	/*
  	 * get the number of the attribute
--- 4313,4327 ----
  	List	   *children;
  	ObjectAddress object;
  
! 	/* At top level, sanity check was done in ATPrepCmd, else do it */
  	if (recursing)
! 	{
! 		ATCheckRelkind(rel, false, false);
! 		ATCheckNoCatalog(rel);
! 	}
! 
! 	/* permission check */
! 	ATCheckOwnership(rel);
  
  	/*
  	 * get the number of the attribute
***************
*** 4525,4530 **** ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
--- 4512,4528 ----
  
  	Assert(IsA(stmt, IndexStmt));
  
+ 	/*
+      * XXX - IMO, we should apply all the permission checks in DefineIndex()
+ 	 * when it is invoked with check_rights == true.
+ 	 * Here are only two code paths. The one is ATExecAddIndex() by AT_AddIndex,
+ 	 * the other is standard_ProcessUtility() by T_IndexStmt which also calls
+ 	 * CheckRelationOwnership() on the relation to be indexed just before
+ 	 * DefineIndex() invocation.
+ 	 */
+ 	ATCheckOwnership(rel);
+ 	/* XXX - ACL_CREATE on pg_namespace should be checked here? */
+ 
  	/* suppress schema rights check when rebuilding existing index */
  	check_rights = !is_rebuild;
  	/* skip index build if phase 3 will have to rewrite table anyway */
***************
*** 4634,4642 **** ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
  	List	   *children;
  	ListCell   *child;
  
! 	/* At top level, permission check was done in ATPrepCmd, else do it */
  	if (recursing)
! 		ATSimplePermissions(rel, false);
  
  	/*
  	 * Call AddRelationNewConstraints to do the work, making sure it works on
--- 4632,4645 ----
  	List	   *children;
  	ListCell   *child;
  
! 	/* At top level, sanity check was done in ATPrepCmd, else do it */
  	if (recursing)
! 	{
! 		ATCheckRelkind(rel, false, false);
! 		ATCheckNoCatalog(rel);
! 	}
! 	/* Permission check */
! 	ATCheckOwnership(rel);
  
  	/*
  	 * Call AddRelationNewConstraints to do the work, making sure it works on
***************
*** 4754,4770 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
  	 * Validity checks (permission checks wait till we have the column
  	 * numbers)
  	 */
! 	if (pkrel->rd_rel->relkind != RELKIND_RELATION)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 				 errmsg("referenced relation \"%s\" is not a table",
! 						RelationGetRelationName(pkrel))));
! 
! 	if (!allowSystemTableMods && IsSystemRelation(pkrel))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! 				 errmsg("permission denied: \"%s\" is a system catalog",
! 						RelationGetRelationName(pkrel))));
  
  	/*
  	 * Disallow reference from permanent table to temp table or vice versa.
--- 4757,4764 ----
  	 * Validity checks (permission checks wait till we have the column
  	 * numbers)
  	 */
! 	ATCheckRelkind(pkrel, false, false);
! 	ATCheckNoCatalog(pkrel);
  
  	/*
  	 * Disallow reference from permanent table to temp table or vice versa.
***************
*** 4832,4837 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
--- 4826,4832 ----
  	/*
  	 * Now we can check permissions.
  	 */
+ 	ATCheckOwnership(rel);
  	checkFkeyPermissions(pkrel, pkattnum, numpks);
  	checkFkeyPermissions(rel, fkattnum, numfks);
  
***************
*** 5582,5590 **** ATExecDropConstraint(Relation rel, const char *constrName,
  	bool		found = false;
  	bool		is_check_constraint = false;
  
! 	/* At top level, permission check was done in ATPrepCmd, else do it */
  	if (recursing)
! 		ATSimplePermissions(rel, false);
  
  	conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
  
--- 5577,5590 ----
  	bool		found = false;
  	bool		is_check_constraint = false;
  
! 	/* At top level, sanity check was done in ATPrepCmd, else do it */
  	if (recursing)
! 	{
! 		ATCheckRelkind(rel, false, false);
! 		ATCheckNoCatalog(rel);
! 	}
! 	/* Permission checks */
! 	ATCheckOwnership(rel);
  
  	conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
  
***************
*** 5915,5920 **** ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
--- 5915,5923 ----
  	SysScanDesc scan;
  	HeapTuple	depTup;
  
+ 	/* Permission checks */
+ 	ATCheckOwnership(rel);
+ 
  	attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
  
  	/* Look up the target column */
***************
*** 6695,6700 **** ATExecClusterOn(Relation rel, const char *indexName)
--- 6698,6706 ----
  {
  	Oid			indexOid;
  
+ 	/* Permission checks */
+ 	ATCheckOwnership(rel);
+ 
  	indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace);
  
  	if (!OidIsValid(indexOid))
***************
*** 6719,6724 **** ATExecClusterOn(Relation rel, const char *indexName)
--- 6725,6733 ----
  static void
  ATExecDropCluster(Relation rel)
  {
+ 	/* Permission checks */
+ 	ATCheckOwnership(rel);
+ 
  	mark_index_clustered(rel, InvalidOid);
  }
  
***************
*** 6729,6735 **** static void
  ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
  {
  	Oid			tablespaceId;
- 	AclResult	aclresult;
  
  	/* Check that the tablespace exists */
  	tablespaceId = get_tablespace_oid(tablespacename);
--- 6738,6743 ----
***************
*** 6738,6748 **** ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
  				(errcode(ERRCODE_UNDEFINED_OBJECT),
  				 errmsg("tablespace \"%s\" does not exist", tablespacename)));
  
- 	/* Check its permissions */
- 	aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
- 	if (aclresult != ACLCHECK_OK)
- 		aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
- 
  	/* Save info for Phase 3 to do the real work */
  	if (OidIsValid(tab->newTableSpace))
  		ereport(ERROR,
--- 6746,6751 ----
***************
*** 6769,6774 **** ATExecSetRelOptions(Relation rel, List *defList, bool isReset)
--- 6772,6780 ----
  	bool		repl_repl[Natts_pg_class];
  	static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
  
+ 	/* Permission checks */
+ 	ATCheckOwnership(rel);
+ 
  	if (defList == NIL)
  		return;					/* nothing to do */
  
***************
*** 7091,7096 **** static void
--- 7097,7105 ----
  ATExecEnableDisableTrigger(Relation rel, char *trigname,
  						   char fires_when, bool skip_system)
  {
+ 	/* Permission checks */
+ 	ATCheckOwnership(rel);
+ 
  	EnableDisableTrigger(rel, trigname, fires_when, skip_system);
  }
  
***************
*** 7103,7108 **** static void
--- 7112,7123 ----
  ATExecEnableDisableRule(Relation rel, char *trigname,
  						char fires_when)
  {
+ 	/*
+ 	 * Permission checks
+ 	 * XXX - pg_class_ownercheck() in EnableDisableRule() should be removed
+ 	 */
+ 	ATCheckOwnership(rel);
+ 
  	EnableDisableRule(rel, trigname, fires_when);
  }
  
***************
*** 7131,7140 **** ATExecAddInherit(Relation child_rel, RangeVar *parent)
  	parent_rel = heap_openrv(parent, AccessShareLock);
  
  	/*
! 	 * Must be owner of both parent and child -- child was checked by
! 	 * ATSimplePermissions call in ATPrepCmd
  	 */
! 	ATSimplePermissions(parent_rel, false);
  
  	/* Permanent rels cannot inherit from temporary ones */
  	if (parent_rel->rd_istemp && !child_rel->rd_istemp)
--- 7146,7161 ----
  	parent_rel = heap_openrv(parent, AccessShareLock);
  
  	/*
! 	 * Sanity check
  	 */
! 	ATCheckRelkind(parent_rel, false, false);
! 	ATCheckNoCatalog(parent_rel);
! 
! 	/*
! 	 * Must be owner of both parent and child
! 	 */
! 	ATCheckOwnership(child_rel);
! 	ATCheckOwnership(parent_rel);
  
  	/* Permanent rels cannot inherit from temporary ones */
  	if (parent_rel->rd_istemp && !child_rel->rd_istemp)
***************
*** 7782,7787 **** AlterTableNamespace(RangeVar *relation, const char *newschema,
--- 7803,7816 ----
  							RelationGetRelationName(rel))));
  	}
  
+ 	/*
+ 	 * Permission checks
+ 	 * XXX - CheckRelationOwnership() should be removed from alter.c
+ 	 */
+ 	ATCheckOwnership(rel);
+ 
+ 	ATCheckNoCatalog(rel);
+ 
  	/* get schema OID and check its permissions */
  	nspOid = LookupCreationNamespace(newschema);
  
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
***************
*** 254,259 **** static void validateForeignKeyConstraint(Constraint *fkconstraint,
--- 254,260 ----
  static void createForeignKeyTriggers(Relation rel, Constraint *fkconstraint,
  						 Oid constraintOid, Oid indexOid);
  static void ATController(Relation rel, List *cmds, bool recurse);
+ static void ATPermCmd(Relation rel, AlterTableCmd *cmd);
  static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  		  bool recurse, bool recursing);
  static void ATRewriteCatalogs(List **wqueue);
***************
*** 262,267 **** static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
--- 263,271 ----
  static void ATRewriteTables(List **wqueue);
  static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
  static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
+ static void ATCheckRelkind(Relation rel, bool viewOK, bool indexOK);
+ static void ATCheckOwnership(Relation rel);
+ static void ATCheckNoCatalog(Relation rel);
  static void ATSimplePermissions(Relation rel, bool allowView);
  static void ATSimplePermissionsRelationOrIndex(Relation rel);
  static void ATSimpleRecursion(List **wqueue, Relation rel,
***************
*** 284,291 **** static void ATPrepSetStatistics(Relation rel, const char *colName,
  					Node *newValue);
  static void ATExecSetStatistics(Relation rel, const char *colName,
  					Node *newValue);
- static void ATPrepSetDistinct(Relation rel, const char *colName,
- 					Node *newValue);
  static void ATExecSetDistinct(Relation rel, const char *colName,
  				 Node *newValue);
  static void ATExecSetStorage(Relation rel, const char *colName,
--- 288,293 ----
***************
*** 1929,1942 **** renameatt(Oid myrelid,
  	 *
  	 * normally, only the owner of a class can change its schema.
  	 */
! 	if (!pg_class_ownercheck(myrelid, GetUserId()))
! 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
! 					   RelationGetRelationName(targetrelation));
! 	if (!allowSystemTableMods && IsSystemRelation(targetrelation))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! 				 errmsg("permission denied: \"%s\" is a system catalog",
! 						RelationGetRelationName(targetrelation))));
  
  	/*
  	 * if the 'recurse' flag is set then we are supposed to rename this
--- 1931,1938 ----
  	 *
  	 * normally, only the owner of a class can change its schema.
  	 */
! 	ATCheckOwnership(targetrelation);
! 	ATCheckNoCatalog(targetrelation);
  
  	/*
  	 * if the 'recurse' flag is set then we are supposed to rename this
***************
*** 2047,2052 **** RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
--- 2043,2049 ----
  {
  	Relation	targetrelation;
  	Oid			namespaceId;
+ 	AclResult	aclresult;
  	char		relkind;
  
  	/*
***************
*** 2075,2080 **** RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
--- 2072,2089 ----
  						RelationGetRelationName(targetrelation))));
  
  	/*
+ 	 * XXX - we assume CheckRelationOwnership() and pg_namespace_aclcheck()
+ 	 * should be moved to here from ExecRenameStmt().
+ 	 */
+ 	ATCheckOwnership(targetrelation);
+ 	aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), ACL_CREATE);
+ 	if (aclresult != ACLCHECK_OK)
+ 		aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+ 					   get_namespace_name(namespaceId));
+ 
+ 	ATCheckNoCatalog(targetrelation);
+ 
+ 	/*
  	 * Don't allow ALTER TABLE on composite types. We want people to use ALTER
  	 * TYPE for that.
  	 */
***************
*** 2349,2354 **** ATController(Relation rel, List *cmds, bool recurse)
--- 2358,2462 ----
  }
  
  /*
+  * ATPermCmd
+  *
+  * It checks permission on the given Relation to be altered, if possible.
+  * Some of ALTER TABLE option needs to be checked during execution phase.
+  */
+ static void
+ ATPermCmd(Relation rel, AlterTableCmd *cmd)
+ {
+ 	switch (cmd->subtype)
+ 	{
+ 		case AT_AddColumn:		/* ADD COLUMN */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_AddColumnToView:	/* add column via CREATE OR REPLACE VIEW */
+ 			ATSimplePermissions(rel, true);
+ 			break;
+ 		case AT_ColumnDefault:	/* ALTER COLUMN DEFAULT */
+ 			ATSimplePermissions(rel, true);
+ 			break;
+ 		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_SetStatistics:	/* ALTER COLUMN SET STATISTICS */
+ 		case AT_SetDistinct:	/* ALTER COLUMN SET STATISTICS DISTINCT */
+ 			ATCheckRelkind(rel, false, true);
+ 			ATCheckOwnership(rel);
+ 			break;
+ 		case AT_SetStorage:		/* ALTER COLUMN SET STORAGE */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_DropColumn:		/* DROP COLUMN */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_AddIndex:		/* ADD INDEX */
+ 			/* Permission checks are during execution phase */
+ 			break;
+ 		case AT_AddConstraint:	/* ADD CONSTRAINT */
+ 			/* Permission checks are during execution phase */
+ 			break;
+ 		case AT_DropConstraint:	/* DROP CONSTRAINT */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_AlterColumnType:		/* ALTER COLUMN TYPE */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_ChangeOwner:	/* ALTER OWNER */
+ 			/* Permission checks are during execution phase */
+ 			break;
+ 		case AT_ClusterOn:		/* CLUSTER ON */
+ 		case AT_DropCluster:	/* SET WITHOUT CLUSTER */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_AddOids:		/* SET WITH OIDS */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_DropOids:		/* SET WITHOUT OIDS */
+ 			ATSimplePermissions(rel, false);
+ 			/*
+ 			 * Note that ATPrepCmd() with AT_DropOids also calls ATPrepCmd() with
+ 			 * pseudo AT_DropColumn recursively, so it applies permission checks
+ 			 * twice.
+ 			 */
+ 			break;
+ 		case AT_SetTableSpace:	/* SET TABLESPACE */
+ 			/* Permission checks are during ATPrepSetTablespace() */
+ 			break;
+ 		case AT_SetRelOptions:	/* SET (...) */
+ 		case AT_ResetRelOptions:		/* RESET (...) */
+ 			ATSimplePermissionsRelationOrIndex(rel);
+ 			break;
+ 		case AT_EnableTrig:		/* ENABLE TRIGGER variants */
+ 		case AT_EnableAlwaysTrig:
+ 		case AT_EnableReplicaTrig:
+ 		case AT_EnableTrigAll:
+ 		case AT_EnableTrigUser:
+ 		case AT_DisableTrig:	/* DISABLE TRIGGER variants */
+ 		case AT_DisableTrigAll:
+ 		case AT_DisableTrigUser:
+ 		case AT_EnableRule:		/* ENABLE/DISABLE RULE variants */
+ 		case AT_EnableAlwaysRule:
+ 		case AT_EnableReplicaRule:
+ 		case AT_DisableRule:
+ 		case AT_DropInherit:	/* NO INHERIT */
+ 			ATSimplePermissions(rel, false);
+ 			break;
+ 		case AT_AddInherit:		/* INHERIT */
+ 			/* Permission checks are during preparation phase */
+ 			break;
+ 		default:				/* oops */
+ 			elog(ERROR, "unrecognized alter table type: %d",
+ 				 (int) cmd->subtype);
+ 			break;
+ 	}
+ }
+ 
+ /*
   * ATPrepCmd
   *
   * Traffic cop for ALTER TABLE Phase 1 operations, including simple
***************
*** 2382,2395 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  	switch (cmd->subtype)
  	{
  		case AT_AddColumn:		/* ADD COLUMN */
- 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			ATPrepAddColumn(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
  			break;
  		case AT_AddColumnToView:		/* add column via CREATE OR REPLACE
  										 * VIEW */
- 			ATSimplePermissions(rel, true);
  			/* Performs own recursion */
  			ATPrepAddColumn(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
--- 2490,2501 ----
***************
*** 2402,2444 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			 * substitutes default values into INSERTs before it expands
  			 * rules.
  			 */
- 			ATSimplePermissions(rel, true);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
- 			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
- 			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN SET STATISTICS */
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
- 			/* Performs own permission checks */
- 			ATPrepSetStatistics(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetDistinct:	/* ALTER COLUMN SET STATISTICS DISTINCT */
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
- 			/* Performs own permission checks */
- 			ATPrepSetDistinct(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN SET STORAGE */
- 			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_DropColumn:		/* DROP COLUMN */
- 			ATSimplePermissions(rel, false);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
--- 2508,2541 ----
***************
*** 2446,2458 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_DROP;
  			break;
  		case AT_AddIndex:		/* ADD INDEX */
- 			ATSimplePermissions(rel, false);
  			/* This command never recurses */
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_INDEX;
  			break;
  		case AT_AddConstraint:	/* ADD CONSTRAINT */
- 			ATSimplePermissions(rel, false);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
--- 2543,2553 ----
***************
*** 2460,2466 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_DropConstraint:	/* DROP CONSTRAINT */
- 			ATSimplePermissions(rel, false);
  			/* Recursion occurs during execution phase */
  			/* No command-specific prep needed except saving recurse flag */
  			if (recurse)
--- 2555,2560 ----
***************
*** 2468,2474 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_DROP;
  			break;
  		case AT_AlterColumnType:		/* ALTER COLUMN TYPE */
- 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
  			pass = AT_PASS_ALTER_TYPE;
--- 2562,2567 ----
***************
*** 2480,2499 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			break;
  		case AT_ClusterOn:		/* CLUSTER ON */
  		case AT_DropCluster:	/* SET WITHOUT CLUSTER */
- 			ATSimplePermissions(rel, false);
  			/* These commands never recurse */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
  			break;
  		case AT_AddOids:		/* SET WITH OIDS */
- 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			if (!rel->rd_rel->relhasoids || recursing)
  				ATPrepAddOids(wqueue, rel, recurse, cmd);
  			pass = AT_PASS_ADD_COL;
  			break;
  		case AT_DropOids:		/* SET WITHOUT OIDS */
- 			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
  			if (rel->rd_rel->relhasoids)
  			{
--- 2573,2589 ----
***************
*** 2507,2524 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetTableSpace:	/* SET TABLESPACE */
- 			ATSimplePermissionsRelationOrIndex(rel);
  			/* This command never recurses */
  			ATPrepSetTableSpace(tab, rel, cmd->name);
  			pass = AT_PASS_MISC;	/* doesn't actually matter */
  			break;
  		case AT_SetRelOptions:	/* SET (...) */
  		case AT_ResetRelOptions:		/* RESET (...) */
- 			ATSimplePermissionsRelationOrIndex(rel);
- 			/* This command never recurses */
- 			/* No command-specific prep needed */
- 			pass = AT_PASS_MISC;
- 			break;
  		case AT_EnableTrig:		/* ENABLE TRIGGER variants */
  		case AT_EnableAlwaysTrig:
  		case AT_EnableReplicaTrig:
--- 2597,2608 ----
***************
*** 2533,2539 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
  		case AT_DisableRule:
  		case AT_AddInherit:		/* INHERIT / NO INHERIT */
  		case AT_DropInherit:
- 			ATSimplePermissions(rel, false);
  			/* These commands never recurse */
  			/* No command-specific prep needed */
  			pass = AT_PASS_MISC;
--- 2617,2622 ----
***************
*** 3278,3314 **** ATGetQueueEntry(List **wqueue, Relation rel)
  }
  
  /*
!  * ATSimplePermissions
   *
!  * - Ensure that it is a relation (or possibly a view)
!  * - Ensure this user is the owner
!  * - Ensure that it is not a system table
   */
  static void
! ATSimplePermissions(Relation rel, bool allowView)
  {
! 	if (rel->rd_rel->relkind != RELKIND_RELATION)
! 	{
! 		if (allowView)
! 		{
! 			if (rel->rd_rel->relkind != RELKIND_VIEW)
! 				ereport(ERROR,
! 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 						 errmsg("\"%s\" is not a table or view",
! 								RelationGetRelationName(rel))));
! 		}
! 		else
! 			ereport(ERROR,
! 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 					 errmsg("\"%s\" is not a table",
! 							RelationGetRelationName(rel))));
! 	}
  
! 	/* Permissions checks */
  	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
  		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
  					   RelationGetRelationName(rel));
  
  	if (!allowSystemTableMods && IsSystemRelation(rel))
  		ereport(ERROR,
  				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
--- 3361,3399 ----
  }
  
  /*
!  * ATCheckRelkind
!  * ATCheckOwnership
!  * ATCheckOwnership
   *
!  * These are originally checked in ATSimplePermissions
   */
  static void
! ATCheckRelkind(Relation rel, bool viewOK, bool indexOK)
  {
! 	char	relkind = rel->rd_rel->relkind;
  
! 	if (relkind != RELKIND_RELATION &&
! 		(!viewOK || relkind != RELKIND_RELATION) &&
! 		(!indexOK || relkind != RELKIND_INDEX))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 				 errmsg("\"%s\" is not a table%s%s",
! 						RelationGetRelationName(rel),
! 						!viewOK ? "" : " or view",
! 						!indexOK ? "" : " or index")));
! }
! 
! static void
! ATCheckOwnership(Relation rel)
! {
  	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
  		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
  					   RelationGetRelationName(rel));
+ }
  
+ static void
+ ATCheckNoCatalog(Relation rel)
+ {
  	if (!allowSystemTableMods && IsSystemRelation(rel))
  		ereport(ERROR,
  				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
***************
*** 3317,3322 **** ATSimplePermissions(Relation rel, bool allowView)
--- 3402,3422 ----
  }
  
  /*
+  * ATSimplePermissions
+  *
+  * - Ensure that it is a relation (or possibly a view)
+  * - Ensure this user is the owner
+  * - Ensure that it is not a system table
+  */
+ static void
+ ATSimplePermissions(Relation rel, bool allowView)
+ {
+ 	ATCheckRelkind(rel, allowView, false);
+ 	ATCheckOwnership(rel);
+ 	ATCheckNoCatalog(rel);
+ }
+ 
+ /*
   * ATSimplePermissionsRelationOrIndex
   *
   * - Ensure that it is a relation or an index
***************
*** 3326,3348 **** ATSimplePermissions(Relation rel, bool allowView)
  static void
  ATSimplePermissionsRelationOrIndex(Relation rel)
  {
! 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
! 		rel->rd_rel->relkind != RELKIND_INDEX)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 				 errmsg("\"%s\" is not a table or index",
! 						RelationGetRelationName(rel))));
! 
! 	/* Permissions checks */
! 	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
! 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
! 					   RelationGetRelationName(rel));
! 
! 	if (!allowSystemTableMods && IsSystemRelation(rel))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! 				 errmsg("permission denied: \"%s\" is a system catalog",
! 						RelationGetRelationName(rel))));
  }
  
  /*
--- 3426,3434 ----
  static void
  ATSimplePermissionsRelationOrIndex(Relation rel)
  {
! 	ATCheckRelkind(rel, false, true);
! 	ATCheckOwnership(rel);
! 	ATCheckNoCatalog(rel);
  }
  
  /*
***************
*** 4071,4098 **** ATExecColumnDefault(Relation rel, const char *colName,
   * ALTER TABLE ALTER COLUMN SET STATISTICS
   */
  static void
- ATPrepSetStatistics(Relation rel, const char *colName, Node *newValue)
- {
- 	/*
- 	 * We do our own permission checking because (a) we want to allow SET
- 	 * STATISTICS on indexes (for expressional index columns), and (b) we want
- 	 * to allow SET STATISTICS on system catalogs without requiring
- 	 * allowSystemTableMods to be turned on.
- 	 */
- 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
- 		rel->rd_rel->relkind != RELKIND_INDEX)
- 		ereport(ERROR,
- 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- 				 errmsg("\"%s\" is not a table or index",
- 						RelationGetRelationName(rel))));
- 
- 	/* Permissions checks */
- 	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
- 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- 					   RelationGetRelationName(rel));
- }
- 
- static void
  ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
  {
  	int			newtarget;
--- 4157,4162 ----
***************
*** 4155,4182 **** ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
   * ALTER TABLE ALTER COLUMN SET STATISTICS DISTINCT
   */
  static void
- ATPrepSetDistinct(Relation rel, const char *colName, Node *newValue)
- {
- 	/*
- 	 * We do our own permission checking because (a) we want to allow SET
- 	 * DISTINCT on indexes (for expressional index columns), and (b) we want
- 	 * to allow SET DISTINCT on system catalogs without requiring
- 	 * allowSystemTableMods to be turned on.
- 	 */
- 	if (rel->rd_rel->relkind != RELKIND_RELATION &&
- 		rel->rd_rel->relkind != RELKIND_INDEX)
- 		ereport(ERROR,
- 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- 				 errmsg("\"%s\" is not a table or index",
- 						RelationGetRelationName(rel))));
- 
- 	/* Permissions checks */
- 	if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
- 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- 					   RelationGetRelationName(rel));
- }
- 
- static void
  ATExecSetDistinct(Relation rel, const char *colName, Node *newValue)
  {
  	float4		newdistinct;
--- 4219,4224 ----
***************
*** 4525,4530 **** ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
--- 4567,4582 ----
  
  	Assert(IsA(stmt, IndexStmt));
  
+ 	/*
+ 	 * XXX - IMO, we should apply all the permission checks in DefineIndex()
+ 	 * when it is invoked with check_rights == true.
+ 	 * Here are only two code paths. The one is ATExecAddIndex() by AT_AddIndex,
+ 	 * the other is standard_ProcessUtility() by T_IndexStmt which also calls
+ 	 * CheckRelationOwnership() on the relation to be indexed just before
+ 	 * DefineIndex() invocation.
+ 	 */
+ 	ATSimplePermissions(rel, false);
+ 
  	/* suppress schema rights check when rebuilding existing index */
  	check_rights = !is_rebuild;
  	/* skip index build if phase 3 will have to rewrite table anyway */
***************
*** 4634,4642 **** ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
  	List	   *children;
  	ListCell   *child;
  
! 	/* At top level, permission check was done in ATPrepCmd, else do it */
! 	if (recursing)
! 		ATSimplePermissions(rel, false);
  
  	/*
  	 * Call AddRelationNewConstraints to do the work, making sure it works on
--- 4686,4693 ----
  	List	   *children;
  	ListCell   *child;
  
! 	/* ATPermCmd does not check permission */
! 	ATSimplePermissions(rel, false);
  
  	/*
  	 * Call AddRelationNewConstraints to do the work, making sure it works on
***************
*** 4742,4747 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
--- 4793,4804 ----
  	Oid			constrOid;
  
  	/*
+ 	 * Sanity checks can be early
+ 	 */
+ 	ATCheckRelkind(rel, false, false);
+ 	ATCheckNoCatalog(rel);
+ 
+ 	/*
  	 * Grab an exclusive lock on the pk table, so that someone doesn't delete
  	 * rows out from under us. (Although a lesser lock would do for that
  	 * purpose, we'll need exclusive lock anyway to add triggers to the pk
***************
*** 4754,4770 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
  	 * Validity checks (permission checks wait till we have the column
  	 * numbers)
  	 */
! 	if (pkrel->rd_rel->relkind != RELKIND_RELATION)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 				 errmsg("referenced relation \"%s\" is not a table",
! 						RelationGetRelationName(pkrel))));
! 
! 	if (!allowSystemTableMods && IsSystemRelation(pkrel))
! 		ereport(ERROR,
! 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
! 				 errmsg("permission denied: \"%s\" is a system catalog",
! 						RelationGetRelationName(pkrel))));
  
  	/*
  	 * Disallow reference from permanent table to temp table or vice versa.
--- 4811,4818 ----
  	 * Validity checks (permission checks wait till we have the column
  	 * numbers)
  	 */
! 	ATCheckRelkind(pkrel, false, false);
! 	ATCheckNoCatalog(pkrel);
  
  	/*
  	 * Disallow reference from permanent table to temp table or vice versa.
***************
*** 4832,4837 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
--- 4880,4886 ----
  	/*
  	 * Now we can check permissions.
  	 */
+ 	ATCheckOwnership(rel);
  	checkFkeyPermissions(pkrel, pkattnum, numpks);
  	checkFkeyPermissions(rel, fkattnum, numfks);
  
***************
*** 6739,6748 **** ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
--- 6788,6802 ----
  				 errmsg("tablespace \"%s\" does not exist", tablespacename)));
  
  	/* Check its permissions */
+ 	ATCheckRelkind(rel, false, true);
+ 	ATCheckOwnership(rel);
+ 
  	aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), ACL_CREATE);
  	if (aclresult != ACLCHECK_OK)
  		aclcheck_error(aclresult, ACL_KIND_TABLESPACE, tablespacename);
  
+ 	ATCheckNoCatalog(rel);
+ 
  	/* Save info for Phase 3 to do the real work */
  	if (OidIsValid(tab->newTableSpace))
  		ereport(ERROR,
***************
*** 7131,7139 **** ATExecAddInherit(Relation child_rel, RangeVar *parent)
  	parent_rel = heap_openrv(parent, AccessShareLock);
  
  	/*
! 	 * Must be owner of both parent and child -- child was checked by
! 	 * ATSimplePermissions call in ATPrepCmd
  	 */
  	ATSimplePermissions(parent_rel, false);
  
  	/* Permanent rels cannot inherit from temporary ones */
--- 7185,7193 ----
  	parent_rel = heap_openrv(parent, AccessShareLock);
  
  	/*
! 	 * Must be owner of both parent and child
  	 */
+ 	ATSimplePermissions(child_rel, false);
  	ATSimplePermissions(parent_rel, false);
  
  	/* Permanent rels cannot inherit from temporary ones */
***************
*** 7782,7787 **** AlterTableNamespace(RangeVar *relation, const char *newschema,
--- 7836,7847 ----
  							RelationGetRelationName(rel))));
  	}
  
+ 	/*
+ 	 * XXX - No need to check relkind here
+ 	 */
+ 	ATCheckOwnership(rel);
+ 	ATCheckNoCatalog(rel);
+ 
  	/* get schema OID and check its permissions */
  	nspOid = LookupCreationNamespace(newschema);
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to