A cfbot failure showed I had missed ORDER BY in some test queries.

On Sun, Dec 27, 2020 at 08:31:48PM -0800, Noah Misch wrote:
> I ended up blocking DDL that creates role memberships involving the new role;
> see reasons in user.c comments.  Lifting those restrictions looked feasible,
> but it was inessential to the mission, and avoiding unintended consequences
> would have been tricky.

Later, I pondered the case of pg_database_owner owning a shared object
(database or tablespace).  The behavior is certainly odd.  For a tablespace,
any database owner can act as the tablespace owner (but only when connected to
a database that the role owns).  For a database, likewise.  When connected to
a database having datdba=pg_database_owner, no particular role acts as the
owner, just superusers and SECURITY DEFINER functions owned by
pg_database_owner.  I don't have high hopes for that being useful, but I
couldn't quite convince myself to ban it.  Attached v2 does expand an
AddRoleMems() comment to discuss this, though.
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Merge similar algorithms into roles_is_member_of().
    
    The next commit would have complicated two or three algorithms, so take
    this opportunity to consolidate.  No functional changes.
    
    Reviewed by FIXME.
    
    Discussion: https://postgr.es/m/FIXME

diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c
index fe6c444..1adacb9 100644
--- a/src/backend/utils/adt/acl.c
+++ b/src/backend/utils/adt/acl.c
@@ -50,32 +50,24 @@ typedef struct
 /*
  * We frequently need to test whether a given role is a member of some other
  * role.  In most of these tests the "given role" is the same, namely the
- * active current user.  So we can optimize it by keeping a cached list of
- * all the roles the "given role" is a member of, directly or indirectly.
- *
- * There are actually two caches, one computed under "has_privs" rules
- * (do not recurse where rolinherit isn't true) and one computed under
- * "is_member" rules (recurse regardless of rolinherit).
+ * active current user.  So we can optimize it by keeping cached lists of all
+ * the roles the "given role" is a member of, directly or indirectly.
  *
  * Possibly this mechanism should be generalized to allow caching membership
  * info for multiple roles?
  *
- * The has_privs cache is:
- * cached_privs_role is the role OID the cache is for.
- * cached_privs_roles is an OID list of roles that cached_privs_role
- *             has the privileges of (always including itself).
- * The cache is valid if cached_privs_role is not InvalidOid.
- *
- * The is_member cache is similarly:
- * cached_member_role is the role OID the cache is for.
- * cached_membership_roles is an OID list of roles that cached_member_role
- *             is a member of (always including itself).
- * The cache is valid if cached_member_role is not InvalidOid.
+ * Each element of cached_roles is an OID list of constituent roles for the
+ * corresponding element of cached_role (always including the cached_role
+ * itself).  One cache has ROLERECURSE_PRIVS semantics, and the other has
+ * ROLERECURSE_MEMBERS semantics.
  */
-static Oid     cached_privs_role = InvalidOid;
-static List *cached_privs_roles = NIL;
-static Oid     cached_member_role = InvalidOid;
-static List *cached_membership_roles = NIL;
+enum RoleRecurseType
+{
+       ROLERECURSE_PRIVS = 0,          /* recurse if rolinherit */
+       ROLERECURSE_MEMBERS = 1         /* recurse unconditionally */
+};
+static Oid     cached_role[] = {InvalidOid, InvalidOid};
+static List *cached_roles[] = {NIL, NIL};
 
 
 static const char *getid(const char *s, char *n);
@@ -4675,8 +4667,8 @@ initialize_acl(void)
        {
                /*
                 * In normal mode, set a callback on any syscache invalidation 
of rows
-                * of pg_auth_members (for each AUTHMEM search in this file) or
-                * pg_authid (for has_rolinherit())
+                * of pg_auth_members (for roles_is_member_of()) or pg_authid 
(for
+                * has_rolinherit())
                 */
                CacheRegisterSyscacheCallback(AUTHMEMROLEMEM,
                                                                          
RoleMembershipCacheCallback,
@@ -4695,8 +4687,8 @@ static void
 RoleMembershipCacheCallback(Datum arg, int cacheid, uint32 hashvalue)
 {
        /* Force membership caches to be recomputed on next use */
-       cached_privs_role = InvalidOid;
-       cached_member_role = InvalidOid;
+       cached_role[ROLERECURSE_PRIVS] = InvalidOid;
+       cached_role[ROLERECURSE_MEMBERS] = InvalidOid;
 }
 
 
@@ -4718,113 +4710,33 @@ has_rolinherit(Oid roleid)
 
 
 /*
- * Get a list of roles that the specified roleid has the privileges of
- *
- * This is defined not to recurse through roles that don't have rolinherit
- * set; for such roles, membership implies the ability to do SET ROLE, but
- * the privileges are not available until you've done so.
- *
- * Since indirect membership testing is relatively expensive, we cache
- * a list of memberships.  Hence, the result is only guaranteed good until
- * the next call of roles_has_privs_of()!
- *
- * For the benefit of select_best_grantor, the result is defined to be
- * in breadth-first order, ie, closer relationships earlier.
- */
-static List *
-roles_has_privs_of(Oid roleid)
-{
-       List       *roles_list;
-       ListCell   *l;
-       List       *new_cached_privs_roles;
-       MemoryContext oldctx;
-
-       /* If cache is already valid, just return the list */
-       if (OidIsValid(cached_privs_role) && cached_privs_role == roleid)
-               return cached_privs_roles;
-
-       /*
-        * Find all the roles that roleid is a member of, including multi-level
-        * recursion.  The role itself will always be the first element of the
-        * resulting list.
-        *
-        * Each element of the list is scanned to see if it adds any indirect
-        * memberships.  We can use a single list as both the record of
-        * already-found memberships and the agenda of roles yet to be scanned.
-        * This is a bit tricky but works because the foreach() macro doesn't
-        * fetch the next list element until the bottom of the loop.
-        */
-       roles_list = list_make1_oid(roleid);
-
-       foreach(l, roles_list)
-       {
-               Oid                     memberid = lfirst_oid(l);
-               CatCList   *memlist;
-               int                     i;
-
-               /* Ignore non-inheriting roles */
-               if (!has_rolinherit(memberid))
-                       continue;
-
-               /* Find roles that memberid is directly a member of */
-               memlist = SearchSysCacheList1(AUTHMEMMEMROLE,
-                                                                         
ObjectIdGetDatum(memberid));
-               for (i = 0; i < memlist->n_members; i++)
-               {
-                       HeapTuple       tup = &memlist->members[i]->tuple;
-                       Oid                     otherid = 
((Form_pg_auth_members) GETSTRUCT(tup))->roleid;
-
-                       /*
-                        * Even though there shouldn't be any loops in the 
membership
-                        * graph, we must test for having already seen this 
role. It is
-                        * legal for instance to have both A->B and A->C->B.
-                        */
-                       roles_list = list_append_unique_oid(roles_list, 
otherid);
-               }
-               ReleaseSysCacheList(memlist);
-       }
-
-       /*
-        * Copy the completed list into TopMemoryContext so it will persist.
-        */
-       oldctx = MemoryContextSwitchTo(TopMemoryContext);
-       new_cached_privs_roles = list_copy(roles_list);
-       MemoryContextSwitchTo(oldctx);
-       list_free(roles_list);
-
-       /*
-        * Now safe to assign to state variable
-        */
-       cached_privs_role = InvalidOid; /* just paranoia */
-       list_free(cached_privs_roles);
-       cached_privs_roles = new_cached_privs_roles;
-       cached_privs_role = roleid;
-
-       /* And now we can return the answer */
-       return cached_privs_roles;
-}
-
-
-/*
  * Get a list of roles that the specified roleid is a member of
  *
- * This is defined to recurse through roles regardless of rolinherit.
+ * Type ROLERECURSE_PRIVS recurses only through roles that have rolinherit
+ * set, while ROLERECURSE_MEMBERS recurses through all roles.  This sets
+ * *is_admin==true if and only if role "roleid" has an ADMIN OPTION membership
+ * in role "admin_of".
  *
  * Since indirect membership testing is relatively expensive, we cache
  * a list of memberships.  Hence, the result is only guaranteed good until
  * the next call of roles_is_member_of()!
+ *
+ * For the benefit of select_best_grantor, the result is defined to be
+ * in breadth-first order, ie, closer relationships earlier.
  */
 static List *
-roles_is_member_of(Oid roleid)
+roles_is_member_of(Oid roleid, enum RoleRecurseType type,
+                                  Oid admin_of, bool *is_admin)
 {
        List       *roles_list;
        ListCell   *l;
-       List       *new_cached_membership_roles;
+       List       *new_cached_roles;
        MemoryContext oldctx;
 
-       /* If cache is already valid, just return the list */
-       if (OidIsValid(cached_member_role) && cached_member_role == roleid)
-               return cached_membership_roles;
+       /* If cache is valid and ADMIN OPTION not sought, just return the list 
*/
+       if (cached_role[type] == roleid && !OidIsValid(admin_of) &&
+               OidIsValid(cached_role[type]))
+               return cached_roles[type];
 
        /*
         * Find all the roles that roleid is a member of, including multi-level
@@ -4845,6 +4757,9 @@ roles_is_member_of(Oid roleid)
                CatCList   *memlist;
                int                     i;
 
+               if (type == ROLERECURSE_PRIVS && !has_rolinherit(memberid))
+                       continue;                       /* ignore 
non-inheriting roles */
+
                /* Find roles that memberid is directly a member of */
                memlist = SearchSysCacheList1(AUTHMEMMEMROLE,
                                                                          
ObjectIdGetDatum(memberid));
@@ -4854,6 +4769,15 @@ roles_is_member_of(Oid roleid)
                        Oid                     otherid = 
((Form_pg_auth_members) GETSTRUCT(tup))->roleid;
 
                        /*
+                        * While otherid==InvalidOid shouldn't appear in the 
catalog, the
+                        * OidIsValid() avoids crashing if that arises.
+                        */
+                       if (otherid == admin_of &&
+                               ((Form_pg_auth_members) 
GETSTRUCT(tup))->admin_option &&
+                               OidIsValid(admin_of))
+                               *is_admin = true;
+
+                       /*
                         * Even though there shouldn't be any loops in the 
membership
                         * graph, we must test for having already seen this 
role. It is
                         * legal for instance to have both A->B and A->C->B.
@@ -4867,20 +4791,20 @@ roles_is_member_of(Oid roleid)
         * Copy the completed list into TopMemoryContext so it will persist.
         */
        oldctx = MemoryContextSwitchTo(TopMemoryContext);
-       new_cached_membership_roles = list_copy(roles_list);
+       new_cached_roles = list_copy(roles_list);
        MemoryContextSwitchTo(oldctx);
        list_free(roles_list);
 
        /*
         * Now safe to assign to state variable
         */
-       cached_member_role = InvalidOid;        /* just paranoia */
-       list_free(cached_membership_roles);
-       cached_membership_roles = new_cached_membership_roles;
-       cached_member_role = roleid;
+       cached_role[type] = InvalidOid; /* just paranoia */
+       list_free(cached_roles[type]);
+       cached_roles[type] = new_cached_roles;
+       cached_role[type] = roleid;
 
        /* And now we can return the answer */
-       return cached_membership_roles;
+       return cached_roles[type];
 }
 
 
@@ -4906,7 +4830,9 @@ has_privs_of_role(Oid member, Oid role)
         * Find all the roles that member has the privileges of, including
         * multi-level recursion, then see if target role is any one of them.
         */
-       return list_member_oid(roles_has_privs_of(member), role);
+       return list_member_oid(roles_is_member_of(member, ROLERECURSE_PRIVS,
+                                                                               
          InvalidOid, NULL),
+                                                  role);
 }
 
 
@@ -4930,7 +4856,9 @@ is_member_of_role(Oid member, Oid role)
         * Find all the roles that member is a member of, including multi-level
         * recursion, then see if target role is any one of them.
         */
-       return list_member_oid(roles_is_member_of(member), role);
+       return list_member_oid(roles_is_member_of(member, ROLERECURSE_MEMBERS,
+                                                                               
          InvalidOid, NULL),
+                                                  role);
 }
 
 /*
@@ -4964,7 +4892,9 @@ is_member_of_role_nosuper(Oid member, Oid role)
         * Find all the roles that member is a member of, including multi-level
         * recursion, then see if target role is any one of them.
         */
-       return list_member_oid(roles_is_member_of(member), role);
+       return list_member_oid(roles_is_member_of(member, ROLERECURSE_MEMBERS,
+                                                                               
          InvalidOid, NULL),
+                                                  role);
 }
 
 
@@ -4977,8 +4907,6 @@ bool
 is_admin_of_role(Oid member, Oid role)
 {
        bool            result = false;
-       List       *roles_list;
-       ListCell   *l;
 
        if (superuser_arg(member))
                return true;
@@ -5016,44 +4944,7 @@ is_admin_of_role(Oid member, Oid role)
                return member == GetSessionUserId() &&
                        !InLocalUserIdChange() && 
!InSecurityRestrictedOperation();
 
-       /*
-        * Find all the roles that member is a member of, including multi-level
-        * recursion.  We build a list in the same way that is_member_of_role 
does
-        * to track visited and unvisited roles.
-        */
-       roles_list = list_make1_oid(member);
-
-       foreach(l, roles_list)
-       {
-               Oid                     memberid = lfirst_oid(l);
-               CatCList   *memlist;
-               int                     i;
-
-               /* Find roles that memberid is directly a member of */
-               memlist = SearchSysCacheList1(AUTHMEMMEMROLE,
-                                                                         
ObjectIdGetDatum(memberid));
-               for (i = 0; i < memlist->n_members; i++)
-               {
-                       HeapTuple       tup = &memlist->members[i]->tuple;
-                       Oid                     otherid = 
((Form_pg_auth_members) GETSTRUCT(tup))->roleid;
-
-                       if (otherid == role &&
-                               ((Form_pg_auth_members) 
GETSTRUCT(tup))->admin_option)
-                       {
-                               /* Found what we came for, so can stop 
searching */
-                               result = true;
-                               break;
-                       }
-
-                       roles_list = list_append_unique_oid(roles_list, 
otherid);
-               }
-               ReleaseSysCacheList(memlist);
-               if (result)
-                       break;
-       }
-
-       list_free(roles_list);
-
+       (void) roles_is_member_of(member, ROLERECURSE_MEMBERS, role, &result);
        return result;
 }
 
@@ -5128,7 +5019,8 @@ select_best_grantor(Oid roleId, AclMode privileges,
         * roles_has_privs_of() throughout this loop, because aclmask_direct()
         * doesn't query any role memberships.
         */
-       roles_list = roles_has_privs_of(roleId);
+       roles_list = roles_is_member_of(roleId, ROLERECURSE_PRIVS,
+                                                                       
InvalidOid, NULL);
 
        /* initialize candidate result as default */
        *grantorId = roleId;
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Add "pg_database_owner" default role.
    
    Membership consists, implicitly, of the current database owner.  Expect
    use in template databases.  Once pg_database_owner has rights within a
    template, each owner of a database instantiated from that template will
    exercise those rights.
    
    Reviewed by FIXME.
    
    Discussion: https://postgr.es/m/20201228043148.ga1053...@rfd.leadboat.com

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 3a22665..b49ceda 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -10079,6 +10079,9 @@ SCRAM-SHA-256$<replaceable>&lt;iteration 
count&gt;</replaceable>:<replaceable>&l
    <primary>pg_group</primary>
   </indexterm>
 
+  <!-- Unlike information_schema.applicable_roles, this shows no members for
+       pg_database_owner.  The v8.1 catalog would have shown no members if
+       that role had existed at the time. -->
   <para>
    The view <structname>pg_group</structname> exists for backwards
    compatibility: it emulates a catalog that existed in
diff --git a/doc/src/sgml/user-manag.sgml b/doc/src/sgml/user-manag.sgml
index cc08252..758493c 100644
--- a/doc/src/sgml/user-manag.sgml
+++ b/doc/src/sgml/user-manag.sgml
@@ -540,6 +540,10 @@ DROP ROLE doomed_role;
        <literal>pg_stat_scan_tables</literal>.</entry>
       </row>
       <row>
+       <entry>pg_database_owner</entry>
+       <entry>None.  Membership consists, implicitly, of the current database 
owner.</entry>
+      </row>
+      <row>
        <entry>pg_signal_backend</entry>
        <entry>Signal another backend to cancel a query or terminate its 
session.</entry>
       </row>
@@ -572,6 +576,17 @@ DROP ROLE doomed_role;
   </para>
 
   <para>
+  The <literal>pg_database_owner</literal> role has one implicit,
+  situation-dependent member, namely the owner of the current database.  The
+  role conveys no rights at first.  Like any role, it can own objects or
+  receive grants of access privileges.  Consequently, once
+  <literal>pg_database_owner</literal> has rights within a template database,
+  each owner of a database instantiated from that template will exercise those
+  rights.  <literal>pg_database_owner</literal> cannot be a member of any
+  role, and it cannot have non-implicit members.
+  </para>
+
+  <para>
   The <literal>pg_signal_backend</literal> role is intended to allow
   administrators to enable trusted, but non-superuser, roles to send signals
   to other backends. Currently this role enables sending of signals for
diff --git a/src/backend/catalog/information_schema.sql 
b/src/backend/catalog/information_schema.sql
index 5ab47e7..6db4950 100644
--- a/src/backend/catalog/information_schema.sql
+++ b/src/backend/catalog/information_schema.sql
@@ -255,7 +255,14 @@ CREATE VIEW applicable_roles AS
     SELECT CAST(a.rolname AS sql_identifier) AS grantee,
            CAST(b.rolname AS sql_identifier) AS role_name,
            CAST(CASE WHEN m.admin_option THEN 'YES' ELSE 'NO' END AS 
yes_or_no) AS is_grantable
-    FROM pg_auth_members m
+    FROM (SELECT member, roleid, admin_option FROM pg_auth_members
+          -- This UNION could be UNION ALL, but UNION works even if we start
+          -- to allow explicit pg_database_owner membership.
+          UNION
+          SELECT datdba, pg_authid.oid, false
+          FROM pg_database, pg_authid
+          WHERE datname = current_database() AND rolname = 'pg_database_owner'
+         )  m
          JOIN pg_authid a ON (m.member = a.oid)
          JOIN pg_authid b ON (m.roleid = b.oid)
     WHERE pg_has_role(a.oid, 'USAGE');
diff --git a/src/backend/commands/user.c b/src/backend/commands/user.c
index 0e6800b..438f3ef 100644
--- a/src/backend/commands/user.c
+++ b/src/backend/commands/user.c
@@ -1497,6 +1497,18 @@ AddRoleMems(const char *rolename, Oid roleid,
        }
 
        /*
+        * The charter of pg_database_owner is to have exactly one, implicit,
+        * situation-dependent member.  There's no technical need for this
+        * restriction.  (One could lift it and take the further step of making
+        * pg_database_ownercheck() equivalent to has_privs_of_role(roleid,
+        * DEFAULT_ROLE_DATABASE_OWNER), in which case explicit,
+        * situation-independent members could act as the owner of any 
database.)
+        */
+       if (roleid == DEFAULT_ROLE_DATABASE_OWNER)
+               ereport(ERROR,
+                               errmsg("role \"%s\" cannot have explicit 
members", rolename));
+
+       /*
         * The role membership grantor of record has little significance at
         * present.  Nonetheless, inasmuch as users might look to it for a crude
         * audit trail, let only superusers impute the grant to a third party.
@@ -1525,6 +1537,30 @@ AddRoleMems(const char *rolename, Oid roleid,
                bool            new_record_repl[Natts_pg_auth_members];
 
                /*
+                * pg_database_owner is never a role member.  Lifting this 
restriction
+                * would require a policy decision about membership loops.  One 
could
+                * prevent loops, which would include making "ALTER DATABASE x 
OWNER
+                * TO proposed_datdba" fail if 
is_member_of_role(pg_database_owner,
+                * proposed_datdba).  Hence, gaining a membership could reduce 
what a
+                * role could do.  Alternately, one could allow these 
memberships to
+                * complete loops.  A role could then have actual WITH ADMIN 
OPTION on
+                * itself, prompting a decision about is_admin_of_role() 
treatment of
+                * the case.
+                *
+                * Lifting this restriction also has policy implications for 
ownership
+                * of shared objects (databases and tablespaces).  We allow such
+                * ownership, but we might find cause to ban it in the future.
+                * Designing such a ban would more troublesome if the design 
had to
+                * address pg_database_owner being a member of role FOO that 
owns a
+                * shared object.  (The effect of such ownership is that any 
owner of
+                * another database can act as the owner of affected shared 
objects.)
+                */
+               if (memberid == DEFAULT_ROLE_DATABASE_OWNER)
+                       ereport(ERROR,
+                                       errmsg("role \"%s\" cannot be a member 
of any role",
+                                                  
get_rolespec_name(memberRole)));
+
+               /*
                 * Refuse creation of membership loops, including the trivial 
case
                 * where a role is made a member of itself.  We do this by 
checking to
                 * see if the target role is already a member of the proposed 
member
diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c
index 1adacb9..94d61b7 100644
--- a/src/backend/utils/adt/acl.c
+++ b/src/backend/utils/adt/acl.c
@@ -22,6 +22,7 @@
 #include "catalog/pg_auth_members.h"
 #include "catalog/pg_authid.h"
 #include "catalog/pg_class.h"
+#include "catalog/pg_database.h"
 #include "catalog/pg_type.h"
 #include "commands/dbcommands.h"
 #include "commands/proclang.h"
@@ -68,6 +69,7 @@ enum RoleRecurseType
 };
 static Oid     cached_role[] = {InvalidOid, InvalidOid};
 static List *cached_roles[] = {NIL, NIL};
+static uint32 cached_db_hash;
 
 
 static const char *getid(const char *s, char *n);
@@ -4665,10 +4667,14 @@ initialize_acl(void)
 {
        if (!IsBootstrapProcessingMode())
        {
+               cached_db_hash =
+                       GetSysCacheHashValue1(DATABASEOID,
+                                                                 
ObjectIdGetDatum(MyDatabaseId));
+
                /*
                 * In normal mode, set a callback on any syscache invalidation 
of rows
-                * of pg_auth_members (for roles_is_member_of()) or pg_authid 
(for
-                * has_rolinherit())
+                * of pg_auth_members (for roles_is_member_of()), pg_authid (for
+                * has_rolinherit()), or pg_database (for roles_is_member_of())
                 */
                CacheRegisterSyscacheCallback(AUTHMEMROLEMEM,
                                                                          
RoleMembershipCacheCallback,
@@ -4676,6 +4682,9 @@ initialize_acl(void)
                CacheRegisterSyscacheCallback(AUTHOID,
                                                                          
RoleMembershipCacheCallback,
                                                                          
(Datum) 0);
+               CacheRegisterSyscacheCallback(DATABASEOID,
+                                                                         
RoleMembershipCacheCallback,
+                                                                         
(Datum) 0);
        }
 }
 
@@ -4686,6 +4695,13 @@ initialize_acl(void)
 static void
 RoleMembershipCacheCallback(Datum arg, int cacheid, uint32 hashvalue)
 {
+       if (cacheid == DATABASEOID &&
+               hashvalue != cached_db_hash &&
+               hashvalue != 0)
+       {
+               return;                                 /* ignore pg_database 
changes for other DBs */
+       }
+
        /* Force membership caches to be recomputed on next use */
        cached_role[ROLERECURSE_PRIVS] = InvalidOid;
        cached_role[ROLERECURSE_MEMBERS] = InvalidOid;
@@ -4728,6 +4744,7 @@ static List *
 roles_is_member_of(Oid roleid, enum RoleRecurseType type,
                                   Oid admin_of, bool *is_admin)
 {
+       Oid                     dba;
        List       *roles_list;
        ListCell   *l;
        List       *new_cached_roles;
@@ -4739,6 +4756,24 @@ roles_is_member_of(Oid roleid, enum RoleRecurseType type,
                return cached_roles[type];
 
        /*
+        * Role expansion happens in a non-database backend when guc.c checks
+        * DEFAULT_ROLE_READ_ALL_SETTINGS for a physical walsender SHOW command.
+        * In that case, no role gets pg_database_owner.
+        */
+       if (!OidIsValid(MyDatabaseId))
+               dba = InvalidOid;
+       else
+       {
+               HeapTuple       dbtup;
+
+               dbtup = SearchSysCache1(DATABASEOID, 
ObjectIdGetDatum(MyDatabaseId));
+               if (!HeapTupleIsValid(dbtup))
+                       elog(ERROR, "cache lookup failed for database %u", 
MyDatabaseId);
+               dba = ((Form_pg_database) GETSTRUCT(dbtup))->datdba;
+               ReleaseSysCache(dbtup);
+       }
+
+       /*
         * Find all the roles that roleid is a member of, including multi-level
         * recursion.  The role itself will always be the first element of the
         * resulting list.
@@ -4785,6 +4820,11 @@ roles_is_member_of(Oid roleid, enum RoleRecurseType type,
                        roles_list = list_append_unique_oid(roles_list, 
otherid);
                }
                ReleaseSysCacheList(memlist);
+
+               /* implement pg_database_owner implicit membership */
+               if (memberid == dba && OidIsValid(dba))
+                       roles_list = list_append_unique_oid(roles_list,
+                                                                               
                DEFAULT_ROLE_DATABASE_OWNER);
        }
 
        /*
diff --git a/src/backend/utils/cache/catcache.c 
b/src/backend/utils/cache/catcache.c
index 3613ae5..76ef00b 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -1076,8 +1076,9 @@ InitCatCachePhase2(CatCache *cache, bool touch_index)
  *             criticalRelcachesBuilt), we don't have to worry anymore.
  *
  *             Similarly, during backend startup we have to be able to use the
- *             pg_authid and pg_auth_members syscaches for authentication even 
if
- *             we don't yet have relcache entries for those catalogs' indexes.
+ *             pg_authid, pg_auth_members and pg_database syscaches for
+ *             authentication even if we don't yet have relcache entries for 
those
+ *             catalogs' indexes.
  */
 static bool
 IndexScanOK(CatCache *cache, ScanKey cur_skey)
@@ -1110,6 +1111,7 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey)
                case AUTHNAME:
                case AUTHOID:
                case AUTHMEMMEMROLE:
+               case DATABASEOID:
 
                        /*
                         * Protect authentication lookups occurring before 
relcache has
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 14150d0..972cbdc 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -3508,6 +3508,7 @@ describeRoles(const char *pattern, bool verbose, bool 
showSystem)
 
        printTableAddHeader(&cont, gettext_noop("Role name"), true, align);
        printTableAddHeader(&cont, gettext_noop("Attributes"), true, align);
+       /* ignores implicit memberships from superuser & pg_database_owner */
        printTableAddHeader(&cont, gettext_noop("Member of"), true, align);
 
        if (verbose && pset.sversion >= 80200)
diff --git a/src/include/catalog/pg_authid.dat 
b/src/include/catalog/pg_authid.dat
index 7c08851..9b64291 100644
--- a/src/include/catalog/pg_authid.dat
+++ b/src/include/catalog/pg_authid.dat
@@ -20,6 +20,11 @@
   rolcreaterole => 't', rolcreatedb => 't', rolcanlogin => 't',
   rolreplication => 't', rolbypassrls => 't', rolconnlimit => '-1',
   rolpassword => '_null_', rolvaliduntil => '_null_' },
+{ oid => '8778', oid_symbol => 'DEFAULT_ROLE_DATABASE_OWNER',
+  rolname => 'pg_database_owner', rolsuper => 'f', rolinherit => 't',
+  rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f',
+  rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1',
+  rolpassword => '_null_', rolvaliduntil => '_null_' },
 { oid => '3373', oid_symbol => 'DEFAULT_ROLE_MONITOR',
   rolname => 'pg_monitor', rolsuper => 'f', rolinherit => 't',
   rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f',
diff --git a/src/test/regress/expected/privileges.out 
b/src/test/regress/expected/privileges.out
index 7754c20..fcd4262 100644
--- a/src/test/regress/expected/privileges.out
+++ b/src/test/regress/expected/privileges.out
@@ -1632,6 +1632,67 @@ SELECT * FROM pg_largeobject LIMIT 0;
 SET SESSION AUTHORIZATION regress_priv_user1;
 SELECT * FROM pg_largeobject LIMIT 0;                  -- to be denied
 ERROR:  permission denied for table pg_largeobject
+-- test pg_database_owner
+RESET SESSION AUTHORIZATION;
+GRANT pg_database_owner TO regress_priv_user1;
+ERROR:  role "pg_database_owner" cannot have explicit members
+GRANT regress_priv_user1 TO pg_database_owner;
+ERROR:  role "pg_database_owner" cannot be a member of any role
+CREATE TABLE datdba_only ();
+ALTER TABLE datdba_only OWNER TO pg_database_owner;
+REVOKE DELETE ON datdba_only FROM pg_database_owner;
+SELECT
+       pg_has_role('regress_priv_user1', 'pg_database_owner', 'USAGE') as priv,
+       pg_has_role('regress_priv_user1', 'pg_database_owner', 'MEMBER') as mem,
+       pg_has_role('regress_priv_user1', 'pg_database_owner',
+                               'MEMBER WITH ADMIN OPTION') as admin;
+ priv | mem | admin 
+------+-----+-------
+ f    | f   | f
+(1 row)
+
+BEGIN;
+DO $$BEGIN EXECUTE format(
+       'ALTER DATABASE %I OWNER TO regress_priv_group2', current_catalog); 
END$$;
+SELECT
+       pg_has_role('regress_priv_user1', 'pg_database_owner', 'USAGE') as priv,
+       pg_has_role('regress_priv_user1', 'pg_database_owner', 'MEMBER') as mem,
+       pg_has_role('regress_priv_user1', 'pg_database_owner',
+                               'MEMBER WITH ADMIN OPTION') as admin;
+ priv | mem | admin 
+------+-----+-------
+ t    | t   | f
+(1 row)
+
+SET SESSION AUTHORIZATION regress_priv_user1;
+TABLE information_schema.enabled_roles ORDER BY role_name COLLATE "C";
+      role_name      
+---------------------
+ pg_database_owner
+ regress_priv_group2
+ regress_priv_user1
+(3 rows)
+
+TABLE information_schema.applicable_roles ORDER BY role_name COLLATE "C";
+       grantee       |      role_name      | is_grantable 
+---------------------+---------------------+--------------
+ regress_priv_group2 | pg_database_owner   | NO
+ regress_priv_user1  | regress_priv_group2 | NO
+(2 rows)
+
+INSERT INTO datdba_only DEFAULT VALUES;
+SAVEPOINT q; DELETE FROM datdba_only; ROLLBACK TO q;
+ERROR:  permission denied for table datdba_only
+SET SESSION AUTHORIZATION regress_priv_user2;
+TABLE information_schema.enabled_roles;
+     role_name      
+--------------------
+ regress_priv_user2
+(1 row)
+
+INSERT INTO datdba_only DEFAULT VALUES;
+ERROR:  permission denied for table datdba_only
+ROLLBACK;
 -- test default ACLs
 \c -
 CREATE SCHEMA testns;
diff --git a/src/test/regress/sql/privileges.sql 
b/src/test/regress/sql/privileges.sql
index 4911ad4..212e9ef 100644
--- a/src/test/regress/sql/privileges.sql
+++ b/src/test/regress/sql/privileges.sql
@@ -975,6 +975,37 @@ SELECT * FROM pg_largeobject LIMIT 0;
 SET SESSION AUTHORIZATION regress_priv_user1;
 SELECT * FROM pg_largeobject LIMIT 0;                  -- to be denied
 
+-- test pg_database_owner
+RESET SESSION AUTHORIZATION;
+GRANT pg_database_owner TO regress_priv_user1;
+GRANT regress_priv_user1 TO pg_database_owner;
+CREATE TABLE datdba_only ();
+ALTER TABLE datdba_only OWNER TO pg_database_owner;
+REVOKE DELETE ON datdba_only FROM pg_database_owner;
+SELECT
+       pg_has_role('regress_priv_user1', 'pg_database_owner', 'USAGE') as priv,
+       pg_has_role('regress_priv_user1', 'pg_database_owner', 'MEMBER') as mem,
+       pg_has_role('regress_priv_user1', 'pg_database_owner',
+                               'MEMBER WITH ADMIN OPTION') as admin;
+
+BEGIN;
+DO $$BEGIN EXECUTE format(
+       'ALTER DATABASE %I OWNER TO regress_priv_group2', current_catalog); 
END$$;
+SELECT
+       pg_has_role('regress_priv_user1', 'pg_database_owner', 'USAGE') as priv,
+       pg_has_role('regress_priv_user1', 'pg_database_owner', 'MEMBER') as mem,
+       pg_has_role('regress_priv_user1', 'pg_database_owner',
+                               'MEMBER WITH ADMIN OPTION') as admin;
+SET SESSION AUTHORIZATION regress_priv_user1;
+TABLE information_schema.enabled_roles ORDER BY role_name COLLATE "C";
+TABLE information_schema.applicable_roles ORDER BY role_name COLLATE "C";
+INSERT INTO datdba_only DEFAULT VALUES;
+SAVEPOINT q; DELETE FROM datdba_only; ROLLBACK TO q;
+SET SESSION AUTHORIZATION regress_priv_user2;
+TABLE information_schema.enabled_roles;
+INSERT INTO datdba_only DEFAULT VALUES;
+ROLLBACK;
+
 -- test default ACLs
 \c -
 

Reply via email to