Hi, Attached is the updated patch.
On Mon, 16 Oct 2017 10:07:48 +0900 (JST) Tatsuo Ishii <is...@sraoss.co.jp> wrote: > >> > It would be nice if the message would be something like: > >> > > >> > DETAIL: Views that return aggregate functions are not lockable > You could add a flag to view_query_is_auto_updatable() to switch the > message between > > DETAIL: Views that return aggregate functions are not automatically > updatable. > > and > > DETAIL: Views that return aggregate functions are not lockable I didn't want to change the interface of view_query_is_auto_updatable() because this might be called from other third-patry software, so I renamed this function to view_query_is_auto_updatable_or_lockable() and added the flag to this. I created view_query_is_auto_updatable() as a wrapper of this function. I also made view_query_is_lockable() that returns a other message than view_query_is_auto_updatable(). > On Tue, 17 Oct 2017 11:59:05 +0900 (JST) > Tatsuo Ishii <is...@sraoss.co.jp> wrote: > > 1) Leave as it is (ignore tables appearing in a subquery) > > > > 2) Lock all tables including in a subquery > > > > 3) Check subquery in the view > > So it seem #1 is the most reasonable way to deal with the problem > > assuming that it's user's responsibility to take appropriate locks on > > the tables in the subquery. I adopted #1 and I didn't change anything about this. Regards, -- Yugo Nagata <nag...@sraoss.co.jp>
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c index 9fe9e02..80e00f7 100644 --- a/src/backend/commands/lockcmds.c +++ b/src/backend/commands/lockcmds.c @@ -19,15 +19,20 @@ #include "commands/lockcmds.h" #include "miscadmin.h" #include "parser/parse_clause.h" +#include "parser/parsetree.h" #include "storage/lmgr.h" #include "utils/acl.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "rewrite/rewriteHandler.h" +#include "access/heapam.h" -static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait); -static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode); +static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid); +static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode, Oid userid); static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid, void *arg); +static void LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait); +static void LockViewCheck(Relation view, Query *viewquery); /* * LOCK TABLE @@ -62,8 +67,10 @@ LockTableCommand(LockStmt *lockstmt) RangeVarCallbackForLockTable, (void *) &lockstmt->mode); - if (recurse) - LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait); + if (get_rel_relkind(reloid) == RELKIND_VIEW) + LockViewRecurse(reloid, reloid, lockstmt->mode, lockstmt->nowait); + else if (recurse) + LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait, GetUserId()); } } @@ -86,15 +93,17 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid, return; /* woops, concurrently dropped; no permissions * check */ - /* Currently, we only allow plain tables to be locked */ - if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) + + /* Currently, we only allow plain tables or auto-updatable views to be locked */ + if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && + relkind != RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", + errmsg("\"%s\" is not a table or view", rv->relname))); /* Check permissions. */ - aclresult = LockTableAclCheck(relid, lockmode); + aclresult = LockTableAclCheck(relid, lockmode, GetUserId()); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_CLASS, rv->relname); } @@ -107,7 +116,7 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid, * multiply-inheriting children more than once, but that's no problem. */ static void -LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait) +LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid) { List *children; ListCell *lc; @@ -120,7 +129,7 @@ LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait) AclResult aclresult; /* Check permissions before acquiring the lock. */ - aclresult = LockTableAclCheck(childreloid, lockmode); + aclresult = LockTableAclCheck(childreloid, lockmode, userid); if (aclresult != ACLCHECK_OK) { char *relname = get_rel_name(childreloid); @@ -157,15 +166,82 @@ LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait) continue; } - LockTableRecurse(childreloid, lockmode, nowait); + LockTableRecurse(childreloid, lockmode, nowait, userid); } } /* + * Apply LOCK TABLE recursively over a view + */ +static void +LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait) +{ + Relation view; + Query *viewquery; + RangeTblRef *rtr; + RangeTblEntry *base_rte; + Oid baseoid; + AclResult aclresult; + char *relname; + char relkind; + + view = heap_open(reloid, NoLock); + viewquery = get_view_query(view); + + /* Check whether the view is lockable */ + LockViewCheck(view, viewquery); + + heap_close(view, NoLock); + + Assert(list_length(viewquery->jointree->fromlist) == 1); + rtr = linitial_node(RangeTblRef, viewquery->jointree->fromlist); + base_rte = rt_fetch(rtr->rtindex, viewquery->rtable); + Assert(base_rte->rtekind == RTE_RELATION); + + baseoid = base_rte->relid; + relname = get_rel_name(baseoid); + relkind = get_rel_relkind(baseoid); + + /* Currently, we only allow plain tables or auto-updatable views to be locked */ + if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && + relkind != RELKIND_VIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table or view", + relname))); + + /* Check infinite recursion in the view definition */ + if (baseoid == root_reloid) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("infinite recursion detected in rules for relation \"%s\"", + get_rel_name(root_reloid)))); + + /* Check permissions with the view owner's priviledge. */ + aclresult = LockTableAclCheck(baseoid, lockmode, view->rd_rel->relowner); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_CLASS, relname); + + /* We have enough rights to lock the relation; do so. */ + if (!nowait) + LockRelationOid(baseoid, lockmode); + else if (!ConditionalLockRelationOid(baseoid, lockmode)) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s\"", + relname))); + + if (relkind == RELKIND_VIEW) + LockViewRecurse(baseoid, root_reloid, lockmode, nowait); + else if (base_rte->inh) + LockTableRecurse(baseoid, lockmode, nowait, view->rd_rel->relowner); +} + +/* * Check whether the current user is permitted to lock this relation. */ static AclResult -LockTableAclCheck(Oid reloid, LOCKMODE lockmode) +LockTableAclCheck(Oid reloid, LOCKMODE lockmode, Oid userid) { AclResult aclresult; AclMode aclmask; @@ -178,7 +254,50 @@ LockTableAclCheck(Oid reloid, LOCKMODE lockmode) else aclmask = ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE; - aclresult = pg_class_aclcheck(reloid, GetUserId(), aclmask); + aclresult = pg_class_aclcheck(reloid, userid, aclmask); return aclresult; } + +/* + * Check whether the view is lockable. + * + * Currently, only auto-updatable views can be locked, that is, + * views whose definition are simple and that doesn't have + * instead of rules or triggers are lockable. + */ +static void +LockViewCheck(Relation view, Query *viewquery) +{ + const char *relname = RelationGetRelationName(view); + const char *lockable_detail; + int i; + + lockable_detail = view_query_is_lockable(viewquery); + if (lockable_detail) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot lock view \"%s\"", relname), + errdetail_internal("%s", _(lockable_detail)))); + + + /* Confirm the view doesn't have instead of rules */ + for (i=0; i<view->rd_rules->numLocks; i++) + { + if (view->rd_rules->rules[i]->isInstead && view->rd_rules->rules[i]->event != CMD_SELECT) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot lock view \"%s\"", relname), + errdetail_internal("views that have an INSTEAD OF rule are not lockable"))); + } + + /* Confirm the view doesn't have instead of triggers */ + if (view->trigdesc && + (view->trigdesc->trig_insert_instead_row || + view->trigdesc->trig_update_instead_row || + view->trigdesc->trig_delete_instead_row)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot lock view \"%s\"", relname), + errdetail_internal("views that have an INSTEAD OF trigger are not lockable"))); +} diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index e93552a..da7fce0 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -2236,20 +2236,23 @@ view_col_is_auto_updatable(RangeTblRef *rtr, TargetEntry *tle) /* - * view_query_is_auto_updatable - test whether the specified view definition - * represents an auto-updatable view. Returns NULL (if the view can be updated) - * or a message string giving the reason that it cannot be. + * view_query_is_auto_updatable_or_lockable - test whether the specified view + * definition represents an auto-updatable view. Returns NULL (if the view can + * be updated) or a message string giving the reason that it cannot be. * * If check_cols is true, the view is required to have at least one updatable * column (necessary for INSERT/UPDATE). Otherwise the view's columns are not * checked for updatability. See also view_cols_are_auto_updatable. * + * If for_lock is true, the returned message describes the reason that it + * cannot be lockable rather than auto-updatable. + * * Note that the checks performed here are only based on the view definition. * We do not check whether any base relations referred to by the view are * updatable. */ -const char * -view_query_is_auto_updatable(Query *viewquery, bool check_cols) +static const char * +view_query_is_auto_updatable_or_lockable(Query *viewquery, bool check_cols, bool for_lock) { RangeTblRef *rtr; RangeTblEntry *base_rte; @@ -2287,22 +2290,34 @@ view_query_is_auto_updatable(Query *viewquery, bool check_cols) *---------- */ if (viewquery->distinctClause != NIL) - return gettext_noop("Views containing DISTINCT are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views containing DISTINCT are not automatically updatable.") : + gettext_noop("Views containing DISTINCT are not lockable.")); if (viewquery->groupClause != NIL || viewquery->groupingSets) - return gettext_noop("Views containing GROUP BY are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views containing GROUP BY are not automatically updatable.") : + gettext_noop("Views containing GROUP BY are not lockable.")); if (viewquery->havingQual != NULL) - return gettext_noop("Views containing HAVING are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views containing HAVING are not automatically updatable.") : + gettext_noop("Views containing HAVING are not lockable.")); if (viewquery->setOperations != NULL) - return gettext_noop("Views containing UNION, INTERSECT, or EXCEPT are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views containing UNION, INTERSECT, or EXCEPT are not automatically updatable.") : + gettext_noop("Views containing UNION, INTERSECT, or EXCEPT are not lockable.")); if (viewquery->cteList != NIL) - return gettext_noop("Views containing WITH are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views containing WITH are not automatically updatable.") : + gettext_noop("Views containing WITH are not lockable.")); if (viewquery->limitOffset != NULL || viewquery->limitCount != NULL) - return gettext_noop("Views containing LIMIT or OFFSET are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views containing LIMIT or OFFSET are not automatically updatable.") : + gettext_noop("Views containing LIMIT or OFFSET are not lockable.")); /* * We must not allow window functions or set returning functions in the @@ -2314,24 +2329,34 @@ view_query_is_auto_updatable(Query *viewquery, bool check_cols) * unique row in the underlying base relation. */ if (viewquery->hasAggs) - return gettext_noop("Views that return aggregate functions are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views that return aggregate functions are not automatically updatable.") : + gettext_noop("Views that return aggregate functions are not lockable.")); if (viewquery->hasWindowFuncs) - return gettext_noop("Views that return window functions are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views that return window functions are not automatically updatable.") : + gettext_noop("Views that return window functions are not lockable.")); if (viewquery->hasTargetSRFs) - return gettext_noop("Views that return set-returning functions are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views that return set-returning functions are not automatically updatable.") : + gettext_noop("Views that return set-returning functions are not lockable.")); /* * The view query should select from a single base relation, which must be * a table or another view. */ if (list_length(viewquery->jointree->fromlist) != 1) - return gettext_noop("Views that do not select from a single table or view are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views that do not select from a single table or view are not automatically updatable.") : + gettext_noop("Views that do not select from a single table or view are not lockable.")); rtr = (RangeTblRef *) linitial(viewquery->jointree->fromlist); if (!IsA(rtr, RangeTblRef)) - return gettext_noop("Views that do not select from a single table or view are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views that do not select from a single table or view are not automatically updatable.") : + gettext_noop("Views that do not select from a single table or view are not lockable.")); base_rte = rt_fetch(rtr->rtindex, viewquery->rtable); if (base_rte->rtekind != RTE_RELATION || @@ -2339,10 +2364,14 @@ view_query_is_auto_updatable(Query *viewquery, bool check_cols) base_rte->relkind != RELKIND_FOREIGN_TABLE && base_rte->relkind != RELKIND_VIEW && base_rte->relkind != RELKIND_PARTITIONED_TABLE)) - return gettext_noop("Views that do not select from a single table or view are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views that do not select from a single table or view are not automatically updatable.") : + gettext_noop("Views that do not select from a single table or view are not lockable.")); if (base_rte->tablesample) - return gettext_noop("Views containing TABLESAMPLE are not automatically updatable."); + return (!for_lock ? + gettext_noop("Views containing TABLESAMPLE are not automatically updatable.") : + gettext_noop("Views containing TABLESAMPLE are not lockable.")); /* * Check that the view has at least one updatable column. This is required @@ -2374,6 +2403,29 @@ view_query_is_auto_updatable(Query *viewquery, bool check_cols) /* + * view_query_is_auto_updatable - a simple wrapper function of + * view_query_is_auto_updatable_or_lockable + */ +const char * +view_query_is_auto_updatable(Query *viewquery, bool check_cols) +{ + return view_query_is_auto_updatable_or_lockable(viewquery, check_cols, false); +} + +/* + * view_query_is_lockable - a wrapper function of + * view_query_is_auto_updatable_or_lockable to check whether a given view is + * lockable. Returns NULL (if the view can be locked) or a message string + * giving the reason that it cannot be. + */ +const char * +view_query_is_lockable(Query *viewquery) +{ + return view_query_is_auto_updatable_or_lockable(viewquery, false, true); +} + + +/* * view_cols_are_auto_updatable - test whether all of the required columns of * an auto-updatable view are actually updatable. Returns NULL (if all the * required columns can be updated) or a message string giving the reason that diff --git a/src/include/rewrite/rewriteHandler.h b/src/include/rewrite/rewriteHandler.h index 86ae571..edd6693 100644 --- a/src/include/rewrite/rewriteHandler.h +++ b/src/include/rewrite/rewriteHandler.h @@ -29,6 +29,7 @@ extern void rewriteTargetListUD(Query *parsetree, RangeTblEntry *target_rte, extern Query *get_view_query(Relation view); extern const char *view_query_is_auto_updatable(Query *viewquery, bool check_cols); +extern const char *view_query_is_lockable(Query *viewquery); extern int relation_is_updatable(Oid reloid, bool include_triggers, Bitmapset *include_cols); diff --git a/src/test/regress/expected/lock.out b/src/test/regress/expected/lock.out index fd27344..8c1f587 100644 --- a/src/test/regress/expected/lock.out +++ b/src/test/regress/expected/lock.out @@ -5,7 +5,9 @@ CREATE SCHEMA lock_schema1; SET search_path = lock_schema1; CREATE TABLE lock_tbl1 (a BIGINT); -CREATE VIEW lock_view1 AS SELECT 1; +CREATE VIEW lock_view1 AS SELECT * FROM lock_tbl1; +CREATE VIEW lock_view2 AS SELECT count(*) FROM lock_tbl1; +CREATE VIEW lock_view3 AS SELECT 1; CREATE ROLE regress_rol_lock1; ALTER ROLE regress_rol_lock1 SET search_path = lock_schema1; GRANT USAGE ON SCHEMA lock_schema1 TO regress_rol_lock1; @@ -30,8 +32,17 @@ LOCK TABLE lock_tbl1 IN SHARE MODE NOWAIT; LOCK TABLE lock_tbl1 IN SHARE ROW EXCLUSIVE MODE NOWAIT; LOCK TABLE lock_tbl1 IN EXCLUSIVE MODE NOWAIT; LOCK TABLE lock_tbl1 IN ACCESS EXCLUSIVE MODE NOWAIT; -LOCK TABLE lock_view1 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-table -ERROR: "lock_view1" is not a table +LOCK TABLE lock_view1 IN EXCLUSIVE MODE; -- OK; can lock an autoupdatable view +ROLLBACK; +BEGIN TRANSACTION; +LOCK TABLE lock_view2 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-autoupdatable view +ERROR: cannot lock view "lock_view2" +DETAIL: Views that return aggregate functions are not lockable. +ROLLBACK; +BEGIN TRANSACTION; +LOCK TABLE lock_view3 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-autoupdatable view +ERROR: cannot lock view "lock_view3" +DETAIL: Views that do not select from a single table or view are not lockable. ROLLBACK; -- Verify that we can lock a table with inheritance children. CREATE TABLE lock_tbl2 (b BIGINT) INHERITS (lock_tbl1); @@ -55,6 +66,8 @@ RESET ROLE; -- Clean up -- DROP VIEW lock_view1; +DROP VIEW lock_view2; +DROP VIEW lock_view3; DROP TABLE lock_tbl3; DROP TABLE lock_tbl2; DROP TABLE lock_tbl1; diff --git a/src/test/regress/sql/lock.sql b/src/test/regress/sql/lock.sql index 567e8bc..d8e97d7 100644 --- a/src/test/regress/sql/lock.sql +++ b/src/test/regress/sql/lock.sql @@ -6,7 +6,9 @@ CREATE SCHEMA lock_schema1; SET search_path = lock_schema1; CREATE TABLE lock_tbl1 (a BIGINT); -CREATE VIEW lock_view1 AS SELECT 1; +CREATE VIEW lock_view1 AS SELECT * FROM lock_tbl1; +CREATE VIEW lock_view2 AS SELECT count(*) FROM lock_tbl1; +CREATE VIEW lock_view3 AS SELECT 1; CREATE ROLE regress_rol_lock1; ALTER ROLE regress_rol_lock1 SET search_path = lock_schema1; GRANT USAGE ON SCHEMA lock_schema1 TO regress_rol_lock1; @@ -33,7 +35,13 @@ LOCK TABLE lock_tbl1 IN SHARE MODE NOWAIT; LOCK TABLE lock_tbl1 IN SHARE ROW EXCLUSIVE MODE NOWAIT; LOCK TABLE lock_tbl1 IN EXCLUSIVE MODE NOWAIT; LOCK TABLE lock_tbl1 IN ACCESS EXCLUSIVE MODE NOWAIT; -LOCK TABLE lock_view1 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-table +LOCK TABLE lock_view1 IN EXCLUSIVE MODE; -- OK; can lock an autoupdatable view +ROLLBACK; +BEGIN TRANSACTION; +LOCK TABLE lock_view2 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-autoupdatable view +ROLLBACK; +BEGIN TRANSACTION; +LOCK TABLE lock_view3 IN EXCLUSIVE MODE; -- Will fail; can't lock a non-autoupdatable view ROLLBACK; -- Verify that we can lock a table with inheritance children. @@ -59,6 +67,8 @@ RESET ROLE; -- Clean up -- DROP VIEW lock_view1; +DROP VIEW lock_view2; +DROP VIEW lock_view3; DROP TABLE lock_tbl3; DROP TABLE lock_tbl2; DROP TABLE lock_tbl1;