On 2024-Apr-13, jian he wrote:

> I wonder is there any incompatibility issue, or do we need to say something
> about the new behavior when dropping a key column?

Umm, yeah, maybe we should document it in ALTER TABLE DROP PRIMARY KEY
and in the release notes to note the different behavior.

> only minor cosmetic issue:
> + if (unconstrained_cols)
> i would like change it to
> + if (unconstrained_cols != NIL)
> 
> + foreach(lc, unconstrained_cols)
> we can change to
> + foreach_int(attnum, unconstrained_cols)
> per commit
> https://git.postgresql.org/cgit/postgresql.git/commit/?id=14dd0f27d7cd56ffae9ecdbe324965073d01a9ff

Ah, yeah.  I did that, rewrote some comments and refined the tests a
little bit to ensure the pg_upgrade behavior is sane.  I intend to get
this pushed tomorrow, if nothing ugly comes up.

CI run: https://cirrus-ci.com/build/5471117953990656

-- 
Álvaro Herrera         PostgreSQL Developer  —  https://www.EnterpriseDB.com/
"La gente vulgar sólo piensa en pasar el tiempo;
el que tiene talento, en aprovecharlo"
>From 28393502b3d11116fcf430264c10f931e948bedc Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 11 Apr 2024 12:29:34 +0200
Subject: [PATCH v2] Better handle indirect constraint drops

It is possible for certain cases to remove not-null constraints without
maintaining the attnotnull in its correct state; for example if you drop
a column that's part of the primary key, and the other columns of the PK don't
have not-null constraints, then we should reset the attnotnull flags for
those other columns; up to this commit, we didn't.  Handle those cases
better by doing the attnotnull reset in RemoveConstraintById() instead
of in dropconstraint_internal().

However, there are some cases where we must not do so.  For example if
those other columns are in replica identity indexes or are generated
identity columns, we must keep attnotnull set, even though it results in
the catalog inconsistency that no not-null constraint supports that.

Because the attnotnull reset now happens in more places than before, for
instance when a column of the primary key changes type, we need an
additional trick to reinstate it as necessary.  Introduce a new
alter-table pass that does this, which needs simply reschedule some
AT_SetAttNotNull subcommands that were already being generated and
ignored.

Because of the exceptions in which attnotnull is not reset noted above,
we also include a pg_dump hack to include a not-null constraint when the
attnotnull flag is set even if no pg_constraint row exists.  This part
is undesirable but necessary, because failing to handle the case can
result in unrestorable dumps.

Reported-by: Tender Wang <tndrw...@gmail.com>
Co-authored-by: Tender Wang <tndrw...@gmail.com>
Reviewed-by: jian he <jian.universal...@gmail.com>
Discussion: https://postgr.es/m/CAHewXN=hMbNa3d43NOR=ocgdgptt18s-1fmuecoegesyek4...@mail.gmail.com
---
 src/backend/catalog/pg_constraint.c       | 116 ++++++++++++++++++-
 src/backend/commands/tablecmds.c          | 135 +++++++++++-----------
 src/bin/pg_dump/pg_dump.c                 |  30 +++--
 src/test/regress/expected/constraints.out |  78 +++++++++++++
 src/test/regress/sql/constraints.sql      |  39 +++++++
 5 files changed, 322 insertions(+), 76 deletions(-)

diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c
index 778b7c381d..56c083fe21 100644
--- a/src/backend/catalog/pg_constraint.c
+++ b/src/backend/catalog/pg_constraint.c
@@ -19,6 +19,7 @@
 #include "access/htup_details.h"
 #include "access/sysattr.h"
 #include "access/table.h"
+#include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/dependency.h"
 #include "catalog/heap.h"
@@ -933,6 +934,8 @@ RemoveConstraintById(Oid conId)
 	Relation	conDesc;
 	HeapTuple	tup;
 	Form_pg_constraint con;
+	bool		dropping_pk = false;
+	List	   *unconstrained_cols = NIL;
 
 	conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
 
@@ -957,7 +960,9 @@ RemoveConstraintById(Oid conId)
 		/*
 		 * We need to update the relchecks count if it is a check constraint
 		 * being dropped.  This update will force backends to rebuild relcache
-		 * entries when we commit.
+		 * entries when we commit.  For not-null and primary key constraints,
+		 * obtain the list of columns affected, so that we can reset their
+		 * attnotnull flags below.
 		 */
 		if (con->contype == CONSTRAINT_CHECK)
 		{
@@ -984,6 +989,36 @@ RemoveConstraintById(Oid conId)
 
 			table_close(pgrel, RowExclusiveLock);
 		}
+		else if (con->contype == CONSTRAINT_NOTNULL)
+		{
+			unconstrained_cols = list_make1_int(extractNotNullColumn(tup));
+		}
+		else if (con->contype == CONSTRAINT_PRIMARY)
+		{
+			Datum		adatum;
+			ArrayType  *arr;
+			int			numkeys;
+			bool		isNull;
+			int16	   *attnums;
+
+			dropping_pk = true;
+
+			adatum = heap_getattr(tup, Anum_pg_constraint_conkey,
+								  RelationGetDescr(conDesc), &isNull);
+			if (isNull)
+				elog(ERROR, "null conkey for constraint %u", con->oid);
+			arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
+			numkeys = ARR_DIMS(arr)[0];
+			if (ARR_NDIM(arr) != 1 ||
+				numkeys < 0 ||
+				ARR_HASNULL(arr) ||
+				ARR_ELEMTYPE(arr) != INT2OID)
+				elog(ERROR, "conkey is not a 1-D smallint array");
+			attnums = (int16 *) ARR_DATA_PTR(arr);
+
+			for (int i = 0; i < numkeys; i++)
+				unconstrained_cols = lappend_int(unconstrained_cols, attnums[i]);
+		}
 
 		/* Keep lock on constraint's rel until end of xact */
 		table_close(rel, NoLock);
@@ -1003,6 +1038,85 @@ RemoveConstraintById(Oid conId)
 	/* Fry the constraint itself */
 	CatalogTupleDelete(conDesc, &tup->t_self);
 
+	/*
+	 * If this was a NOT NULL or the primary key, the constrained columns must
+	 * have had pg_attribute.attnotnull set.  See if we need to reset it, and
+	 * do so.
+	 */
+	if (unconstrained_cols != NIL)
+	{
+		Relation	tablerel;
+		Relation	attrel;
+		Bitmapset  *pkcols;
+		ListCell   *lc;
+
+		/* Make the above deletion visible */
+		CommandCounterIncrement();
+
+		tablerel = table_open(con->conrelid, NoLock);	/* already have lock */
+		attrel = table_open(AttributeRelationId, RowExclusiveLock);
+
+		/*
+		 * We want to test columns for their presence in the primary key, but
+		 * only if we're not dropping it.
+		 */
+		pkcols = dropping_pk ? NULL :
+			RelationGetIndexAttrBitmap(tablerel,
+									   INDEX_ATTR_BITMAP_PRIMARY_KEY);
+
+		foreach(lc, unconstrained_cols)
+		{
+			AttrNumber	attnum = lfirst_int(lc);
+			HeapTuple	atttup;
+			HeapTuple	contup;
+			Bitmapset  *ircols;
+			Form_pg_attribute attForm;
+
+			/*
+			 * Obtain pg_attribute tuple and verify conditions on it.  We use
+			 * a copy we can scribble on.
+			 */
+			atttup = SearchSysCacheCopyAttNum(con->conrelid, attnum);
+			if (!HeapTupleIsValid(atttup))
+				elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+					 attnum, con->conrelid);
+			attForm = (Form_pg_attribute) GETSTRUCT(atttup);
+
+			/*
+			 * Since the above deletion has been made visible, we can now
+			 * search for any remaining constraints on this column (or these
+			 * columns, in the case we're dropping a multicol primary key.)
+			 * Then, verify whether any further NOT NULL or primary key
+			 * exists, and reset attnotnull if none.
+			 */
+			contup = findNotNullConstraintAttnum(con->conrelid, attnum);
+			if (contup ||
+				bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
+							  pkcols))
+				continue;
+
+			/*
+			 * ... but don't do it if the column is in the replica identity or
+			 * it's a generated column
+			 */
+			if (attForm->attidentity != '\0')
+				continue;
+			ircols = RelationGetIndexAttrBitmap(tablerel, INDEX_ATTR_BITMAP_IDENTITY_KEY);	/* XXX bms_free */
+			if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, ircols))
+				continue;
+
+			/* Reset attnotnull */
+			if (attForm->attnotnull)
+			{
+				attForm->attnotnull = false;
+				CatalogTupleUpdate(attrel, &atttup->t_self, atttup);
+			}
+		}
+
+		table_close(attrel, RowExclusiveLock);
+		table_close(tablerel, NoLock);
+	}
+
 	/* Clean up */
 	ReleaseSysCache(tup);
 	table_close(conDesc, RowExclusiveLock);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index f72b2dcadf..7b01c85f6b 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -149,6 +149,7 @@ typedef enum AlterTablePass
 	AT_PASS_ALTER_TYPE,			/* ALTER COLUMN TYPE */
 	AT_PASS_ADD_COL,			/* ADD COLUMN */
 	AT_PASS_SET_EXPRESSION,		/* ALTER SET EXPRESSION */
+	AT_PASS_OLD_COL_ATTRS,		/* re-install attnotnull */
 	AT_PASS_OLD_INDEX,			/* re-add existing indexes */
 	AT_PASS_OLD_CONSTR,			/* re-add existing constraints */
 	/* We could support a RENAME COLUMN pass here, but not currently used */
@@ -7662,17 +7663,23 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
 	}
 
 	/*
-	 * Find the constraint that makes this column NOT NULL.
+	 * Find the constraint that makes this column NOT NULL, and drop it if we
+	 * see one.  dropconstraint_internal() will do necessary consistency
+	 * checking.  If there isn't one, there are two possibilities: either the
+	 * column is marked attnotnull because it's part of the primary key, and
+	 * then we just throw an appropriate error; or it's a leftover marking
+	 * that we can remove.  However, before doing the latter, to avoid
+	 * breaking consistency any further, prevent this if the column is part of
+	 * the replica identity.
 	 */
 	conTup = findNotNullConstraint(RelationGetRelid(rel), colName);
 	if (conTup == NULL)
 	{
 		Bitmapset  *pkcols;
+		Bitmapset  *ircols;
 
 		/*
-		 * There's no not-null constraint, so throw an error.  If the column
-		 * is in a primary key, we can throw a specific error.  Otherwise,
-		 * this is unexpected.
+		 * If the column is in a primary key, throw a specific error message.
 		 */
 		pkcols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_PRIMARY_KEY);
 		if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
@@ -7681,16 +7688,27 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
 					errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 					errmsg("column \"%s\" is in a primary key", colName));
 
-		/* this shouldn't happen */
-		elog(ERROR, "could not find not-null constraint on column \"%s\", relation \"%s\"",
-			 colName, RelationGetRelationName(rel));
+		/* Also throw an error if the column is in the replica identity */
+		ircols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY);
+		if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, ircols))
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+					errmsg("column \"%s\" is in index used as replica identity",
+						   get_attname(RelationGetRelid(rel), attnum, false)));
+
+		/* Otherwise, just remove the attnotnull marking and do nothing else. */
+		attTup->attnotnull = false;
+		CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
 	}
+	else
+	{
+		/* The normal case: we have a pg_constraint row, remove it */
+		readyRels = NIL;
+		dropconstraint_internal(rel, conTup, DROP_RESTRICT, recurse, false,
+								false, &readyRels, lockmode);
 
-	readyRels = NIL;
-	dropconstraint_internal(rel, conTup, DROP_RESTRICT, recurse, false,
-							false, &readyRels, lockmode);
-
-	heap_freetuple(conTup);
+		heap_freetuple(conTup);
+	}
 
 	InvokeObjectPostAlterHook(RelationRelationId,
 							  RelationGetRelid(rel), attnum);
@@ -12927,12 +12945,11 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
 	Form_pg_constraint con;
 	ObjectAddress conobj;
 	List	   *children;
-	ListCell   *child;
 	bool		is_no_inherit_constraint = false;
-	bool		dropping_pk = false;
 	char	   *constrName;
 	List	   *unconstrained_cols = NIL;
-	char	   *colname;
+	char	   *colname = NULL;
+	bool		dropping_pk = false;
 
 	if (list_member_oid(*readyRels, RelationGetRelid(rel)))
 		return InvalidObjectAddress;
@@ -12989,10 +13006,12 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
 	 */
 	if (con->contype == CONSTRAINT_NOTNULL)
 	{
-		AttrNumber	colnum = extractNotNullColumn(constraintTup);
+		AttrNumber	colnum;
 
-		if (colnum != InvalidAttrNumber)
-			unconstrained_cols = list_make1_int(colnum);
+		colnum = extractNotNullColumn(constraintTup);
+		unconstrained_cols = list_make1_int(colnum);
+		colname = NameStr(TupleDescAttr(RelationGetDescr(rel),
+										colnum - 1)->attname);
 	}
 	else if (con->contype == CONSTRAINT_PRIMARY)
 	{
@@ -13048,18 +13067,16 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
 	performDeletion(&conobj, behavior, 0);
 
 	/*
-	 * If this was a NOT NULL or the primary key, the constrained columns must
-	 * have had pg_attribute.attnotnull set.  See if we need to reset it, and
-	 * do so.
+	 * If this was a NOT NULL or the primary key, verify that we still have
+	 * constraints to support GENERATED AS IDENTITY or the replica identity.
 	 */
-	if (unconstrained_cols)
+	if (unconstrained_cols != NIL)
 	{
 		Relation	attrel;
 		Bitmapset  *pkcols;
 		Bitmapset  *ircols;
-		ListCell   *lc;
 
-		/* Make the above deletion visible */
+		/* Make implicit attnotnull changes visible */
 		CommandCounterIncrement();
 
 		attrel = table_open(AttributeRelationId, RowExclusiveLock);
@@ -13073,33 +13090,31 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
 									   INDEX_ATTR_BITMAP_PRIMARY_KEY);
 		ircols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY);
 
-		foreach(lc, unconstrained_cols)
+		foreach_int(attnum, unconstrained_cols)
 		{
-			AttrNumber	attnum = lfirst_int(lc);
 			HeapTuple	atttup;
 			HeapTuple	contup;
 			Form_pg_attribute attForm;
+			char		attidentity;
 
 			/*
-			 * Obtain pg_attribute tuple and verify conditions on it.  We use
-			 * a copy we can scribble on.
+			 * Obtain pg_attribute tuple and verify conditions on it.
 			 */
-			atttup = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum);
+			atttup = SearchSysCacheAttNum(RelationGetRelid(rel), attnum);
 			if (!HeapTupleIsValid(atttup))
 				elog(ERROR, "cache lookup failed for attribute %d of relation %u",
 					 attnum, RelationGetRelid(rel));
 			attForm = (Form_pg_attribute) GETSTRUCT(atttup);
+			attidentity = attForm->attidentity;
+			ReleaseSysCache(atttup);
 
 			/*
 			 * Since the above deletion has been made visible, we can now
 			 * search for any remaining constraints on this column (or these
 			 * columns, in the case we're dropping a multicol primary key.)
 			 * Then, verify whether any further NOT NULL or primary key
-			 * exists, and reset attnotnull if none.
-			 *
-			 * However, if this is a generated identity column, abort the
-			 * whole thing with a specific error message, because the
-			 * constraint is required in that case.
+			 * exists: if none and this is a generated identity column or the
+			 * replica identity, abort the whole thing.
 			 */
 			contup = findNotNullConstraintAttnum(RelationGetRelid(rel), attnum);
 			if (contup ||
@@ -13111,7 +13126,7 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
 			 * It's not valid to drop the not-null constraint for a GENERATED
 			 * AS IDENTITY column.
 			 */
-			if (attForm->attidentity)
+			if (attidentity != '\0')
 				ereport(ERROR,
 						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 						errmsg("column \"%s\" of relation \"%s\" is an identity column",
@@ -13123,18 +13138,11 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
 			 * It's not valid to drop the not-null constraint for a column in
 			 * the replica identity index, either. (FULL is not affected.)
 			 */
-			if (bms_is_member(lfirst_int(lc) - FirstLowInvalidHeapAttributeNumber, ircols))
+			if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, ircols))
 				ereport(ERROR,
 						errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 						errmsg("column \"%s\" is in index used as replica identity",
-							   get_attname(RelationGetRelid(rel), lfirst_int(lc), false)));
-
-			/* Reset attnotnull */
-			if (attForm->attnotnull)
-			{
-				attForm->attnotnull = false;
-				CatalogTupleUpdate(attrel, &atttup->t_self, atttup);
-			}
+							   get_attname(RelationGetRelid(rel), attnum, false)));
 		}
 		table_close(attrel, RowExclusiveLock);
 	}
@@ -13173,16 +13181,8 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
 				 errmsg("cannot remove constraint from only the partitioned table when partitions exist"),
 				 errhint("Do not specify the ONLY keyword.")));
 
-	/* For not-null constraints we recurse by column name */
-	if (con->contype == CONSTRAINT_NOTNULL)
-		colname = NameStr(TupleDescAttr(RelationGetDescr(rel),
-										linitial_int(unconstrained_cols) - 1)->attname);
-	else
-		colname = NULL;			/* keep compiler quiet */
-
-	foreach(child, children)
+	foreach_oid(childrelid, children)
 	{
-		Oid			childrelid = lfirst_oid(child);
 		Relation	childrel;
 		HeapTuple	tuple;
 		Form_pg_constraint childcon;
@@ -13195,8 +13195,8 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
 		CheckTableNotInUse(childrel, "ALTER TABLE");
 
 		/*
-		 * We search for not-null constraint by column number, and other
-		 * constraints by name.
+		 * We search for not-null constraints by column name, and others by
+		 * constraint name.
 		 */
 		if (con->contype == CONSTRAINT_NOTNULL)
 		{
@@ -13304,7 +13304,6 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
 		rel->rd_rel->relhassubclass)
 	{
 		List	   *colnames = NIL;
-		ListCell   *lc;
 		List	   *pkready = NIL;
 
 		/*
@@ -13317,15 +13316,14 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
 		 * Find out the list of column names to process.  Fortunately, we
 		 * already have the list of column numbers.
 		 */
-		foreach(lc, unconstrained_cols)
+		foreach_int(attnum, unconstrained_cols)
 		{
 			colnames = lappend(colnames, get_attname(RelationGetRelid(rel),
-													 lfirst_int(lc), false));
+													 attnum, false));
 		}
 
-		foreach(child, children)
+		foreach_oid(childrelid, children)
 		{
-			Oid			childrelid = lfirst_oid(child);
 			Relation	childrel;
 
 			if (list_member_oid(pkready, childrelid))
@@ -13335,10 +13333,9 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
 			childrel = table_open(childrelid, NoLock);
 			CheckTableNotInUse(childrel, "ALTER TABLE");
 
-			foreach(lc, colnames)
+			foreach_ptr(char, colName, colnames)
 			{
 				HeapTuple	contup;
-				char	   *colName = lfirst(lc);
 
 				contup = findNotNullConstraint(childrelid, colName);
 				if (contup == NULL)
@@ -14677,12 +14674,16 @@ ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd,
 				else if (cmd->subtype == AT_SetAttNotNull)
 				{
 					/*
-					 * The parser will create AT_AttSetNotNull subcommands for
-					 * columns of PRIMARY KEY indexes/constraints, but we need
-					 * not do anything with them here, because the columns'
-					 * NOT NULL marks will already have been propagated into
-					 * the new table definition.
+					 * We see this subtype when a primary key is created
+					 * internally, for example when it is replaced with a new
+					 * constraint (say because one of the columns changes
+					 * type); in this case we need to reinstate attnotnull,
+					 * because it was removed because of the drop of the old
+					 * PK.  Schedule this subcommand to an upcoming AT pass.
 					 */
+					cmd->subtype = AT_SetAttNotNull;
+					tab->subcmds[AT_PASS_OLD_COL_ATTRS] =
+						lappend(tab->subcmds[AT_PASS_OLD_COL_ATTRS], cmd);
 				}
 				else
 					elog(ERROR, "unexpected statement subtype: %d",
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 6d2f3fdef3..9edda90469 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -8788,12 +8788,15 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 						 "), E',\n    ') AS attfdwoptions,\n");
 
 	/*
-	 * Find out any NOT NULL markings for each column.  In 17 and up we have
-	 * to read pg_constraint, and keep track whether it's NO INHERIT; in older
-	 * versions we rely on pg_attribute.attnotnull.
+	 * Find out any NOT NULL markings for each column.  In 17 and up we read
+	 * pg_constraint to obtain the constraint name.  notnull_noinherit is set
+	 * according to the NO INHERIT property.  For versions prior to 17, we
+	 * store an empty string as the name when a constraint is marked as
+	 * attnotnull (this cues dumpTableSchema to print the NOT NULL clause
+	 * without a name); also, such cases are never NO INHERIT.
 	 *
-	 * We also track whether the constraint was defined directly in this table
-	 * or via an ancestor, for binary upgrade.
+	 * We track in notnull_inh whether the constraint was defined directly in
+	 * this table or via an ancestor, for binary upgrade.
 	 *
 	 * Lastly, we need to know if the PK for the table involves each column;
 	 * for columns that are there we need a NOT NULL marking even if there's
@@ -8801,13 +8804,24 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 	 * NULLs after the data is loaded when the PK is created, later in the
 	 * dump; for this case we add throwaway constraints that are dropped once
 	 * the PK is created.
+	 *
+	 * Another complication arises from columns that have attnotnull set, but
+	 * for which no corresponding not-null nor PK constraint exists.  This can
+	 * happen if, for example, a primary key is dropped indirectly -- say,
+	 * because one of its columns is dropped.  This is an irregular condition,
+	 * so we don't work hard to preserve it, and instead act as though an
+	 * unnamed not-null constraint exists.
 	 */
 	if (fout->remoteVersion >= 170000)
 		appendPQExpBufferStr(q,
-							 "co.conname AS notnull_name,\n"
-							 "co.connoinherit AS notnull_noinherit,\n"
+							 "CASE WHEN co.conname IS NOT NULL THEN co.conname "
+							 "  WHEN a.attnotnull AND copk.conname IS NULL THEN '' ELSE NULL END AS notnull_name,\n"
+							 "CASE WHEN co.conname IS NOT NULL THEN co.connoinherit "
+							 "  WHEN a.attnotnull THEN false ELSE NULL END AS notnull_noinherit,\n"
 							 "copk.conname IS NOT NULL as notnull_is_pk,\n"
-							 "coalesce(NOT co.conislocal, true) AS notnull_inh,\n");
+							 "CASE WHEN co.conname IS NOT NULL THEN "
+							 "  coalesce(NOT co.conislocal, true) "
+							 "ELSE false END as notnull_inh,\n");
 	else
 		appendPQExpBufferStr(q,
 							 "CASE WHEN a.attnotnull THEN '' ELSE NULL END AS notnull_name,\n"
diff --git a/src/test/regress/expected/constraints.out b/src/test/regress/expected/constraints.out
index d50dd1f61a..2fc0be7925 100644
--- a/src/test/regress/expected/constraints.out
+++ b/src/test/regress/expected/constraints.out
@@ -926,9 +926,87 @@ DROP TABLE notnull_tbl1;
 -- nope
 CREATE TABLE notnull_tbl2 (a INTEGER CONSTRAINT blah NOT NULL, b INTEGER CONSTRAINT blah NOT NULL);
 ERROR:  constraint "blah" for relation "notnull_tbl2" already exists
+-- can't drop not-null in primary key
 CREATE TABLE notnull_tbl2 (a INTEGER PRIMARY KEY);
 ALTER TABLE notnull_tbl2 ALTER a DROP NOT NULL;
 ERROR:  column "a" is in a primary key
+DROP TABLE notnull_tbl2;
+-- make sure attnotnull is reset correctly when a PK is dropped indirectly,
+-- or kept if there's a reason for that
+CREATE TABLE notnull_tbl1 (c0 int, c1 int, PRIMARY KEY (c0, c1));
+ALTER TABLE  notnull_tbl1 DROP c1;
+\d+ notnull_tbl1
+                               Table "public.notnull_tbl1"
+ Column |  Type   | Collation | Nullable | Default | Storage | Stats target | Description 
+--------+---------+-----------+----------+---------+---------+--------------+-------------
+ c0     | integer |           |          |         | plain   |              | 
+
+DROP TABLE notnull_tbl1;
+-- same, via dropping a domain
+CREATE DOMAIN notnull_dom1 AS INTEGER;
+CREATE TABLE notnull_tbl1 (c0 notnull_dom1, c1 int, PRIMARY KEY (c0, c1));
+DROP DOMAIN notnull_dom1 CASCADE;
+NOTICE:  drop cascades to column c0 of table notnull_tbl1
+\d+ notnull_tbl1
+                               Table "public.notnull_tbl1"
+ Column |  Type   | Collation | Nullable | Default | Storage | Stats target | Description 
+--------+---------+-----------+----------+---------+---------+--------------+-------------
+ c1     | integer |           |          |         | plain   |              | 
+
+DROP TABLE notnull_tbl1;
+-- with a REPLICA IDENTITY column.  Here the not-nulls must be kept
+CREATE DOMAIN notnull_dom1 AS INTEGER;
+CREATE TABLE notnull_tbl1 (c0 notnull_dom1, c1 int UNIQUE, c2 int generated by default as identity, PRIMARY KEY (c0, c1, c2));
+ALTER TABLE notnull_tbl1 DROP CONSTRAINT notnull_tbl1_c2_not_null;
+ALTER TABLE notnull_tbl1 REPLICA IDENTITY USING INDEX notnull_tbl1_c1_key;
+DROP DOMAIN notnull_dom1 CASCADE;
+NOTICE:  drop cascades to column c0 of table notnull_tbl1
+ALTER TABLE  notnull_tbl1 ALTER c1 DROP NOT NULL;	-- can't be dropped
+ERROR:  column "c1" is in index used as replica identity
+ALTER TABLE  notnull_tbl1 ALTER c1 SET NOT NULL;	-- can be set right
+\d+ notnull_tbl1
+                                            Table "public.notnull_tbl1"
+ Column |  Type   | Collation | Nullable |             Default              | Storage | Stats target | Description 
+--------+---------+-----------+----------+----------------------------------+---------+--------------+-------------
+ c1     | integer |           | not null |                                  | plain   |              | 
+ c2     | integer |           | not null | generated by default as identity | plain   |              | 
+Indexes:
+    "notnull_tbl1_c1_key" UNIQUE CONSTRAINT, btree (c1) REPLICA IDENTITY
+Not-null constraints:
+    "notnull_tbl1_c1_not_null" NOT NULL "c1"
+
+DROP TABLE notnull_tbl1;
+CREATE DOMAIN notnull_dom2 AS INTEGER;
+CREATE TABLE notnull_tbl2 (c0 notnull_dom2, c1 int UNIQUE, c2 int generated by default as identity, PRIMARY KEY (c0, c1, c2));
+ALTER TABLE notnull_tbl2 DROP CONSTRAINT notnull_tbl2_c2_not_null;
+ALTER TABLE notnull_tbl2 REPLICA IDENTITY USING INDEX notnull_tbl2_c1_key;
+DROP DOMAIN notnull_dom2 CASCADE;
+NOTICE:  drop cascades to column c0 of table notnull_tbl2
+\d+ notnull_tbl2
+                                            Table "public.notnull_tbl2"
+ Column |  Type   | Collation | Nullable |             Default              | Storage | Stats target | Description 
+--------+---------+-----------+----------+----------------------------------+---------+--------------+-------------
+ c1     | integer |           | not null |                                  | plain   |              | 
+ c2     | integer |           | not null | generated by default as identity | plain   |              | 
+Indexes:
+    "notnull_tbl2_c1_key" UNIQUE CONSTRAINT, btree (c1) REPLICA IDENTITY
+
+BEGIN;
+/* make sure the table can be put right, but roll that back */
+ALTER TABLE notnull_tbl2 REPLICA IDENTITY FULL, ALTER c2 DROP IDENTITY;
+ALTER TABLE notnull_tbl2 ALTER c1 DROP NOT NULL, ALTER c2 DROP NOT NULL;
+\d+ notnull_tbl2
+                               Table "public.notnull_tbl2"
+ Column |  Type   | Collation | Nullable | Default | Storage | Stats target | Description 
+--------+---------+-----------+----------+---------+---------+--------------+-------------
+ c1     | integer |           |          |         | plain   |              | 
+ c2     | integer |           |          |         | plain   |              | 
+Indexes:
+    "notnull_tbl2_c1_key" UNIQUE CONSTRAINT, btree (c1)
+Replica Identity: FULL
+
+ROLLBACK;
+-- Leave this table around for pg_upgrade testing
 CREATE TABLE notnull_tbl3 (a INTEGER NOT NULL, CHECK (a IS NOT NULL));
 ALTER TABLE notnull_tbl3 ALTER A DROP NOT NULL;
 ALTER TABLE notnull_tbl3 ADD b int, ADD CONSTRAINT pk PRIMARY KEY (a, b);
diff --git a/src/test/regress/sql/constraints.sql b/src/test/regress/sql/constraints.sql
index 7a39b504a3..8f85e72050 100644
--- a/src/test/regress/sql/constraints.sql
+++ b/src/test/regress/sql/constraints.sql
@@ -624,8 +624,47 @@ DROP TABLE notnull_tbl1;
 -- nope
 CREATE TABLE notnull_tbl2 (a INTEGER CONSTRAINT blah NOT NULL, b INTEGER CONSTRAINT blah NOT NULL);
 
+-- can't drop not-null in primary key
 CREATE TABLE notnull_tbl2 (a INTEGER PRIMARY KEY);
 ALTER TABLE notnull_tbl2 ALTER a DROP NOT NULL;
+DROP TABLE notnull_tbl2;
+
+-- make sure attnotnull is reset correctly when a PK is dropped indirectly,
+-- or kept if there's a reason for that
+CREATE TABLE notnull_tbl1 (c0 int, c1 int, PRIMARY KEY (c0, c1));
+ALTER TABLE  notnull_tbl1 DROP c1;
+\d+ notnull_tbl1
+DROP TABLE notnull_tbl1;
+-- same, via dropping a domain
+CREATE DOMAIN notnull_dom1 AS INTEGER;
+CREATE TABLE notnull_tbl1 (c0 notnull_dom1, c1 int, PRIMARY KEY (c0, c1));
+DROP DOMAIN notnull_dom1 CASCADE;
+\d+ notnull_tbl1
+DROP TABLE notnull_tbl1;
+-- with a REPLICA IDENTITY column.  Here the not-nulls must be kept
+CREATE DOMAIN notnull_dom1 AS INTEGER;
+CREATE TABLE notnull_tbl1 (c0 notnull_dom1, c1 int UNIQUE, c2 int generated by default as identity, PRIMARY KEY (c0, c1, c2));
+ALTER TABLE notnull_tbl1 DROP CONSTRAINT notnull_tbl1_c2_not_null;
+ALTER TABLE notnull_tbl1 REPLICA IDENTITY USING INDEX notnull_tbl1_c1_key;
+DROP DOMAIN notnull_dom1 CASCADE;
+ALTER TABLE  notnull_tbl1 ALTER c1 DROP NOT NULL;	-- can't be dropped
+ALTER TABLE  notnull_tbl1 ALTER c1 SET NOT NULL;	-- can be set right
+\d+ notnull_tbl1
+DROP TABLE notnull_tbl1;
+
+CREATE DOMAIN notnull_dom2 AS INTEGER;
+CREATE TABLE notnull_tbl2 (c0 notnull_dom2, c1 int UNIQUE, c2 int generated by default as identity, PRIMARY KEY (c0, c1, c2));
+ALTER TABLE notnull_tbl2 DROP CONSTRAINT notnull_tbl2_c2_not_null;
+ALTER TABLE notnull_tbl2 REPLICA IDENTITY USING INDEX notnull_tbl2_c1_key;
+DROP DOMAIN notnull_dom2 CASCADE;
+\d+ notnull_tbl2
+BEGIN;
+/* make sure the table can be put right, but roll that back */
+ALTER TABLE notnull_tbl2 REPLICA IDENTITY FULL, ALTER c2 DROP IDENTITY;
+ALTER TABLE notnull_tbl2 ALTER c1 DROP NOT NULL, ALTER c2 DROP NOT NULL;
+\d+ notnull_tbl2
+ROLLBACK;
+-- Leave this table around for pg_upgrade testing
 
 CREATE TABLE notnull_tbl3 (a INTEGER NOT NULL, CHECK (a IS NOT NULL));
 ALTER TABLE notnull_tbl3 ALTER A DROP NOT NULL;
-- 
2.39.2

Reply via email to