On Thu, 29 Mar 2018 17:26:36 -0700 Andres Freund <and...@anarazel.de> wrote:
Thank you for your comments. I attach a patch to fix issues you've pointed out. > Hi, > > On 2018-03-28 20:26:48 +0900, Yugo Nagata wrote: > > diff --git a/doc/src/sgml/ref/lock.sgml b/doc/src/sgml/ref/lock.sgml > > index b2c7203..96d477c 100644 > > --- a/doc/src/sgml/ref/lock.sgml > > +++ b/doc/src/sgml/ref/lock.sgml > > @@ -46,6 +46,11 @@ LOCK [ TABLE ] [ ONLY ] <replaceable > > class="parameter">name</replaceable> [ * ] > > </para> > > > > <para> > > + When a view is specified to be locked, all relations appearing in the > > view > > + definition query are also locked recursively with the same lock mode. > > + </para> > > Trailing space added. I'd remove "specified to be" from the sentence. Fixed. > > I think mentioning how this interacts with permissions would be a good > idea too. Given how operations use the view's owner to recurse, that's > not obvious. Should also explain what permissions are required to do the > operation on the view. Added a description about permissions into the documentation. > > > > @@ -86,15 +92,17 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid > > relid, Oid oldrelid, > > return; /* woops, concurrently > > dropped; no permissions > > * check */ > > > > - /* Currently, we only allow plain tables to be locked */ > > - if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) > > + > > This newline looks spurious to me. Removed. > > > > /* > > + * Apply LOCK TABLE recursively over a view > > + * > > + * All tables and views appearing in the view definition query are locked > > + * recursively with the same lock mode. > > + */ > > + > > +typedef struct > > +{ > > + Oid root_reloid; > > + LOCKMODE lockmode; > > + bool nowait; > > + Oid viewowner; > > + Oid viewoid; > > +} LockViewRecurse_context; > > Probably wouldn't hurt to pgindent the larger changes in the patch. > > > > +static bool > > +LockViewRecurse_walker(Node *node, LockViewRecurse_context *context) > > +{ > > + if (node == NULL) > > + return false; > > + > > + if (IsA(node, Query)) > > + { > > + Query *query = (Query *) node; > > + ListCell *rtable; > > + > > + foreach(rtable, query->rtable) > > + { > > + RangeTblEntry *rte = lfirst(rtable); > > + AclResult aclresult; > > + > > + Oid relid = rte->relid; > > + char relkind = rte->relkind; > > + char *relname = get_rel_name(relid); > > + > > + /* The OLD and NEW placeholder entries in the view's > > rtable are skipped. */ > > + if (relid == context->viewoid && > > + (!strcmp(rte->eref->aliasname, "old") || > > !strcmp(rte->eref->aliasname, "new"))) > > + continue; > > + > > + /* Currently, we only allow plain tables or views to be > > locked. */ > > + if (relkind != RELKIND_RELATION && relkind != > > RELKIND_PARTITIONED_TABLE && > > + relkind != RELKIND_VIEW) > > + continue; > > + > > + /* Check infinite recursion in the view definition. */ > > + if (relid == context->root_reloid) > > + ereport(ERROR, > > + > > (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), > > + errmsg("infinite recursion > > detected in rules for relation \"%s\"", > > + > > get_rel_name(context->root_reloid)))); > > Hm, how can that happen? And if it can happen, why can it only happen > with the root relation? For example, the following queries cause the infinite recursion of views. This is detected and the error is raised. create table t (i int); create view v1 as select 1; create view v2 as select * from v1; create or replace view v1 as select * from v2; begin; lock v1; abort; However, I found that the previous patch could not handle the following situation in which the root relation itself doesn't have infinite recursion. create view v3 as select * from v1; begin; lock v3; abort; This fixed in the attached patch. Regards, > > Greetings, > > Andres Freund -- Yugo Nagata <nag...@sraoss.co.jp>
diff --git a/doc/src/sgml/ref/lock.sgml b/doc/src/sgml/ref/lock.sgml index 96d477c..12303fe 100644 --- a/doc/src/sgml/ref/lock.sgml +++ b/doc/src/sgml/ref/lock.sgml @@ -46,8 +46,8 @@ LOCK [ TABLE ] [ ONLY ] <replaceable class="parameter">name</replaceable> [ * ] </para> <para> - When a view is specified to be locked, all relations appearing in the view - definition query are also locked recursively with the same lock mode. + When a view is locked, all relations appearing in the view definition + query are also locked recursively with the same lock mode. </para> <para> @@ -174,6 +174,13 @@ LOCK [ TABLE ] [ ONLY ] <replaceable class="parameter">name</replaceable> [ * ] </para> <para> + The user performint the lock on the view must have the corresponding privilege + on the view. In addition the view's owner must have the relevant privileges on + the underlying base relations, but the user performing the lock does + not need any permissions on the underlying base relations. + </para> + + <para> <command>LOCK TABLE</command> is useless outside a transaction block: the lock would remain held only to the completion of the statement. Therefore <productname>PostgreSQL</productname> reports an error if <command>LOCK</command> diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c index b247c0f..c46dc36 100644 --- a/src/backend/commands/lockcmds.c +++ b/src/backend/commands/lockcmds.c @@ -31,7 +31,7 @@ static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid use static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode, Oid userid); static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid, void *arg); -static void LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait); +static void LockViewRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, List *ancestor_views); /* * LOCK TABLE @@ -67,7 +67,7 @@ LockTableCommand(LockStmt *lockstmt) (void *) &lockstmt->mode); if (get_rel_relkind(reloid) == RELKIND_VIEW) - LockViewRecurse(reloid, reloid, lockstmt->mode, lockstmt->nowait); + LockViewRecurse(reloid, lockstmt->mode, lockstmt->nowait, NIL); else if (recurse) LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait, GetUserId()); } @@ -92,7 +92,6 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid, return; /* woops, concurrently dropped; no permissions * check */ - /* Currently, we only allow plain tables or views to be locked */ if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && relkind != RELKIND_VIEW) @@ -178,11 +177,11 @@ LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid) typedef struct { - Oid root_reloid; LOCKMODE lockmode; bool nowait; Oid viewowner; Oid viewoid; + List *ancestor_views; } LockViewRecurse_context; static bool @@ -216,11 +215,11 @@ LockViewRecurse_walker(Node *node, LockViewRecurse_context *context) continue; /* Check infinite recursion in the view definition. */ - if (relid == context->root_reloid) + if (list_member_oid(context->ancestor_views, relid)) ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), errmsg("infinite recursion detected in rules for relation \"%s\"", - get_rel_name(context->root_reloid)))); + get_rel_name(relid)))); /* Check permissions with the view owner's privilege. */ aclresult = LockTableAclCheck(relid, context->lockmode, context->viewowner); @@ -237,7 +236,7 @@ LockViewRecurse_walker(Node *node, LockViewRecurse_context *context) relname))); if (relkind == RELKIND_VIEW) - LockViewRecurse(relid, context->root_reloid, context->lockmode, context->nowait); + LockViewRecurse(relid, context->lockmode, context->nowait, context->ancestor_views); else if (rte->inh) LockTableRecurse(relid, context->lockmode, context->nowait, context->viewowner); } @@ -254,7 +253,7 @@ LockViewRecurse_walker(Node *node, LockViewRecurse_context *context) } static void -LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait) +LockViewRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, List *ancestor_views) { LockViewRecurse_context context; @@ -264,14 +263,17 @@ LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait) view = heap_open(reloid, NoLock); viewquery = get_view_query(view); - context.root_reloid = root_reloid; context.lockmode = lockmode; context.nowait = nowait; context.viewowner = view->rd_rel->relowner; context.viewoid = reloid; + context.ancestor_views = lcons_oid(reloid, ancestor_views); LockViewRecurse_walker((Node *) viewquery, &context); + ancestor_views = list_delete_oid(ancestor_views, reloid); + + heap_close(view, NoLock); }