Hi

While working on a big table recently, I noticed that ALTER TABLE does not 
check for column existence in operations like SET NOT NULL before starting 
working on the table, for instance adding a primary key.
It is thus possible, if a typo has been made, to generate a long lock and a 
lot of WAL that will serve no purpose since the whole transaction will be 
discarded.

For example :

toto=# alter table test add primary key(i), alter column typo set not null;
ERROR:  column "typo" of relation "test" does not exist
Time: 10.794 s


The attached patch fixes this behaviour by adding a small check in the first 
pass of alter table to make sure that a column referenced by an alter command 
exists first. It also checks if the column is added by another alter sub-
command. It does not handle every scenario (dropping a column and then 
altering it for instance), these are left to the exec code to exclude.
The patch has been checked with make check, and I see no documentation change 
to do since this does not alter any existing documented behaviour.


Regards

 Pierre
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 74e020bffc..48efb6ef5b 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -352,6 +352,7 @@ static void ATRewriteTables(AlterTableStmt *parsetree,
 				List **wqueue, LOCKMODE lockmode);
 static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode);
 static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
+static void ATColumnExists(Relation rel, const char *colName, AlteredTableInfo *tab);
 static void ATSimplePermissions(Relation rel, int allowed_targets);
 static void ATWrongRelkindError(Relation rel, int allowed_targets);
 static void ATSimpleRecursion(List **wqueue, Relation rel,
@@ -3592,6 +3593,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 			 */
 			ATSimplePermissions(rel, ATT_TABLE | ATT_VIEW | ATT_FOREIGN_TABLE);
 			ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
+			ATColumnExists(rel, cmd->name, tab);
 			/* No command-specific prep needed */
 			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
 			break;
@@ -3611,6 +3613,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 			ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
 			ATPrepDropNotNull(rel, recurse, recursing);
 			ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
+			ATColumnExists(rel, cmd->name, tab);
 			/* No command-specific prep needed */
 			pass = AT_PASS_DROP;
 			break;
@@ -3618,6 +3621,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 			ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
 			ATPrepSetNotNull(rel, recurse, recursing);
 			ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
+			ATColumnExists(rel, cmd->name, tab);
 			/* No command-specific prep needed */
 			pass = AT_PASS_ADD_CONSTR;
 			break;
@@ -3630,12 +3634,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 		case AT_SetOptions:		/* ALTER COLUMN SET ( options ) */
 		case AT_ResetOptions:	/* ALTER COLUMN RESET ( options ) */
 			ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW | ATT_INDEX | ATT_FOREIGN_TABLE);
+			ATColumnExists(rel, cmd->name, tab);
 			/* This command never recurses */
 			pass = AT_PASS_MISC;
 			break;
 		case AT_SetStorage:		/* ALTER COLUMN SET STORAGE */
 			ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW | ATT_FOREIGN_TABLE);
 			ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
+			ATColumnExists(rel, cmd->name, tab);
 			/* No command-specific prep needed */
 			pass = AT_PASS_MISC;
 			break;
@@ -3643,6 +3649,10 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 			ATSimplePermissions(rel,
 								ATT_TABLE | ATT_COMPOSITE_TYPE | ATT_FOREIGN_TABLE);
 			ATPrepDropColumn(wqueue, rel, recurse, recursing, cmd, lockmode);
+			if (!cmd->missing_ok)
+			{
+				ATColumnExists(rel, cmd->name, tab);
+			}
 			/* Recursion occurs during execution phase */
 			pass = AT_PASS_DROP;
 			break;
@@ -4850,6 +4860,54 @@ ATSimplePermissions(Relation rel, int allowed_targets)
 						RelationGetRelationName(rel))));
 }
 
+/*
+ * ATColumnExists
+ *
+ * - Ensure that the targeted column exists
+ */
+static void
+ATColumnExists(Relation rel, const char *colName, AlteredTableInfo *tab)
+{
+	HeapTuple	tuple;
+	List		*subcmds;
+	bool		column_found = false;
+	tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
+
+	if (HeapTupleIsValid(tuple))
+	{
+		column_found = true;
+	}
+	else
+	{
+		/* Check that the column is not added in a previous operation */
+		subcmds = tab->subcmds[AT_PASS_ADD_COL];
+		if (subcmds != NULL)
+		{
+			ListCell   *lcmd;
+			ColumnDef  *coldef;
+
+			foreach(lcmd, subcmds)
+			{
+				AlterTableCmd *cmd = castNode(AlterTableCmd, lfirst(lcmd));
+				coldef = (ColumnDef *) cmd->def;
+				if (strcmp(coldef->colname, colName) == 0)
+				{
+					column_found = true;
+					break;
+				}
+			}
+		}
+	}
+
+	if (!column_found)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_COLUMN),
+				 errmsg("column \"%s\" of relation \"%s\" does not exist",
+						colName, RelationGetRelationName(rel))));
+	}
+}
+
 /*
  * ATWrongRelkindError
  *

Reply via email to