From 2c55b70d82d0769d3be2c446ae270e9c78c8759d Mon Sep 17 00:00:00 2001
From: Aleksander Alekseev <aleksander@timescale.com>
Date: Wed, 15 Jun 2022 16:34:50 +0300
Subject: [PATCH v3 1/2] CREATE TABLE ( .. STORAGE .. )

TODO FIXME description

Author:	Teodor Sigaev <teodor@sigaev.ru>
Reviewed-by: TODO FIXME
Discussion: https://postgr.es/m/de83407a-ae3d-a8e1-a788-920eb334f25b@sigaev.ru
---
 doc/src/sgml/ref/create_table.sgml        | 24 +++++++-
 src/backend/commands/tablecmds.c          | 75 +++++++++++++++++------
 src/backend/nodes/copyfuncs.c             |  1 +
 src/backend/nodes/equalfuncs.c            |  1 +
 src/backend/parser/gram.y                 | 18 ++++--
 src/include/nodes/parsenodes.h            |  1 +
 src/test/regress/expected/alter_table.out |  6 +-
 src/test/regress/sql/alter_table.sql      |  5 +-
 8 files changed, 103 insertions(+), 28 deletions(-)

diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index 6c9918b0a1..f1f00cf834 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS ] <replaceable class="parameter">table_name</replaceable> ( [
-  { <replaceable class="parameter">column_name</replaceable> <replaceable class="parameter">data_type</replaceable> [ COMPRESSION <replaceable>compression_method</replaceable> ] [ COLLATE <replaceable>collation</replaceable> ] [ <replaceable class="parameter">column_constraint</replaceable> [ ... ] ]
+  { <replaceable class="parameter">column_name</replaceable> <replaceable class="parameter">data_type</replaceable> [ STORAGE { PLAIN | EXTERNAL | EXTENDED | MAIN } ] [ COMPRESSION <replaceable>compression_method</replaceable> ] [ COLLATE <replaceable>collation</replaceable> ] [ <replaceable class="parameter">column_constraint</replaceable> [ ... ] ]
     | <replaceable>table_constraint</replaceable>
     | LIKE <replaceable>source_table</replaceable> [ <replaceable>like_option</replaceable> ... ] }
     [, ... ]
@@ -297,6 +297,28 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term> <literal>SET STORAGE { PLAIN | EXTERNAL | EXTENDED | MAIN }</literal></term>
+    <listitem>
+     <para>
+      This form sets the storage mode for a column. This controls whether this
+      column is held inline or in a secondary <acronym>TOAST</acronym> table, and
+      whether the data
+      should be compressed or not. <literal>PLAIN</literal> must be used
+      for fixed-length values such as <type>integer</type> and is
+      inline, uncompressed. <literal>MAIN</literal> is for inline,
+      compressible data. <literal>EXTERNAL</literal> is for external,
+      uncompressed data, and <literal>EXTENDED</literal> is for external,
+      compressed data.  <literal>EXTENDED</literal> is the default for most
+      data types that support non-<literal>PLAIN</literal> storage.
+      Use of <literal>EXTERNAL</literal> will make substring operations on
+      very large <type>text</type> and <type>bytea</type> values run faster,
+      at the penalty of increased storage space. 
+      See <xref linkend="storage-toast"/> for more information.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>COMPRESSION <replaceable class="parameter">compression_method</replaceable></literal></term>
     <listitem>
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 2de0ebacec..708d7354b5 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -633,6 +633,7 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
 static List *GetParentedForeignKeyRefs(Relation partition);
 static void ATDetachCheckNoForeignKeyRefs(Relation partition);
 static char GetAttributeCompression(Oid atttypid, char *compression);
+static char	GetAttributeStorage(const char *storagemode);
 
 
 /* ----------------------------------------------------------------
@@ -931,6 +932,22 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 		if (colDef->compression)
 			attr->attcompression = GetAttributeCompression(attr->atttypid,
 														   colDef->compression);
+
+		if (colDef->storage_name)
+		{
+			attr->attstorage = GetAttributeStorage(colDef->storage_name);
+			/*
+			 * safety check: do not allow toasted storage modes unless column datatype
+			 * is TOAST-aware.
+			 */
+			if (!(attr->attstorage == TYPSTORAGE_PLAIN ||
+				  TypeIsToastable(attr->atttypid)))
+				ereport(ERROR,
+						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						 errmsg("column data type %s can only have storage PLAIN",
+								format_type_be(attr->atttypid))));
+		}
+
 	}
 
 	/*
@@ -6819,7 +6836,23 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	attribute.atttypmod = typmod;
 	attribute.attbyval = tform->typbyval;
 	attribute.attalign = tform->typalign;
-	attribute.attstorage = tform->typstorage;
+	if (colDef->storage_name)
+	{
+		attribute.attstorage = GetAttributeStorage(colDef->storage_name);
+		/*
+		 * safety check: do not allow toasted storage modes unless column datatype
+		 * is TOAST-aware.
+		 */
+		if (!(attribute.attstorage == TYPSTORAGE_PLAIN ||
+			  TypeIsToastable(attribute.atttypid)))
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("column data type %s can only have storage PLAIN",
+							format_type_be(attribute.atttypid))));
+	}
+	else
+		attribute.attstorage = tform->typstorage;
+
 	attribute.attcompression = GetAttributeCompression(typeOid,
 													   colDef->compression);
 	attribute.attnotnull = colDef->is_not_null;
@@ -8262,7 +8295,6 @@ SetIndexStorageProperties(Relation rel, Relation attrelation,
 static ObjectAddress
 ATExecSetStorage(Relation rel, const char *colName, Node *newValue, LOCKMODE lockmode)
 {
-	char	   *storagemode;
 	char		newstorage;
 	Relation	attrelation;
 	HeapTuple	tuple;
@@ -8271,24 +8303,8 @@ ATExecSetStorage(Relation rel, const char *colName, Node *newValue, LOCKMODE loc
 	ObjectAddress address;
 
 	Assert(IsA(newValue, String));
-	storagemode = strVal(newValue);
 
-	if (pg_strcasecmp(storagemode, "plain") == 0)
-		newstorage = TYPSTORAGE_PLAIN;
-	else if (pg_strcasecmp(storagemode, "external") == 0)
-		newstorage = TYPSTORAGE_EXTERNAL;
-	else if (pg_strcasecmp(storagemode, "extended") == 0)
-		newstorage = TYPSTORAGE_EXTENDED;
-	else if (pg_strcasecmp(storagemode, "main") == 0)
-		newstorage = TYPSTORAGE_MAIN;
-	else
-	{
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("invalid storage type \"%s\"",
-						storagemode)));
-		newstorage = 0;			/* keep compiler quiet */
-	}
+	newstorage = GetAttributeStorage(strVal(newValue));
 
 	attrelation = table_open(AttributeRelationId, RowExclusiveLock);
 
@@ -19288,3 +19304,24 @@ GetAttributeCompression(Oid atttypid, char *compression)
 
 	return cmethod;
 }
+
+static char
+GetAttributeStorage(const char *storagemode)
+{
+	if (pg_strcasecmp(storagemode, "plain") == 0)
+		return TYPSTORAGE_PLAIN;
+	else if (pg_strcasecmp(storagemode, "external") == 0)
+		return TYPSTORAGE_EXTERNAL;
+	else if (pg_strcasecmp(storagemode, "extended") == 0)
+		return TYPSTORAGE_EXTENDED;
+	else if (pg_strcasecmp(storagemode, "main") == 0)
+		return TYPSTORAGE_MAIN;
+	else
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("invalid storage type \"%s\"",
+						storagemode)));
+		return 0;			/* keep compiler quiet */
+	}
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 51d630fa89..636654b291 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3560,6 +3560,7 @@ _copyColumnDef(const ColumnDef *from)
 	COPY_SCALAR_FIELD(is_not_null);
 	COPY_SCALAR_FIELD(is_from_type);
 	COPY_SCALAR_FIELD(storage);
+	COPY_STRING_FIELD(storage_name);
 	COPY_NODE_FIELD(raw_default);
 	COPY_NODE_FIELD(cooked_default);
 	COPY_SCALAR_FIELD(identity);
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index e747e1667d..064943f52d 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -3050,6 +3050,7 @@ _equalColumnDef(const ColumnDef *a, const ColumnDef *b)
 	COMPARE_SCALAR_FIELD(is_not_null);
 	COMPARE_SCALAR_FIELD(is_from_type);
 	COMPARE_SCALAR_FIELD(storage);
+	COMPARE_STRING_FIELD(storage_name);
 	COMPARE_NODE_FIELD(raw_default);
 	COMPARE_NODE_FIELD(cooked_default);
 	COMPARE_SCALAR_FIELD(identity);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 969c9c158f..c88d6d406d 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -595,7 +595,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 %type <node>	TableConstraint TableLikeClause
 %type <ival>	TableLikeOptionList TableLikeOption
-%type <str>		column_compression opt_column_compression
+%type <str>		column_compression opt_column_compression opt_column_storage
 %type <list>	ColQualList
 %type <node>	ColConstraint ColConstraintElem ConstraintAttr
 %type <ival>	key_match
@@ -2537,7 +2537,7 @@ alter_table_cmd:
 					$$ = (Node *) n;
 				}
 			/* ALTER TABLE <name> ALTER [COLUMN] <colname> SET STORAGE <storagemode> */
-			| ALTER opt_column ColId SET STORAGE ColId
+			| ALTER opt_column ColId SET STORAGE name
 				{
 					AlterTableCmd *n = makeNode(AlterTableCmd);
 
@@ -3778,13 +3778,14 @@ TypedTableElement:
 			| TableConstraint					{ $$ = $1; }
 		;
 
-columnDef:	ColId Typename opt_column_compression create_generic_options ColQualList
+columnDef:	ColId Typename opt_column_storage opt_column_compression create_generic_options ColQualList
 				{
 					ColumnDef *n = makeNode(ColumnDef);
 
 					n->colname = $1;
 					n->typeName = $2;
-					n->compression = $3;
+					n->storage_name = $3;
+					n->compression = $4;
 					n->inhcount = 0;
 					n->is_local = true;
 					n->is_not_null = false;
@@ -3793,8 +3794,8 @@ columnDef:	ColId Typename opt_column_compression create_generic_options ColQualL
 					n->raw_default = NULL;
 					n->cooked_default = NULL;
 					n->collOid = InvalidOid;
-					n->fdwoptions = $4;
-					SplitColQualList($5, &n->constraints, &n->collClause,
+					n->fdwoptions = $5;
+					SplitColQualList($6, &n->constraints, &n->collClause,
 									 yyscanner);
 					n->location = @1;
 					$$ = (Node *) n;
@@ -3851,6 +3852,11 @@ opt_column_compression:
 			| /*EMPTY*/								{ $$ = NULL; }
 		;
 
+opt_column_storage:
+			STORAGE	name							{ $$ = $2; }
+			| /*EMPTY*/								{ $$ = NULL; }
+		;
+
 ColQualList:
 			ColQualList ColConstraint				{ $$ = lappend($1, $2); }
 			| /*EMPTY*/								{ $$ = NIL; }
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 73f635b455..8247edd5a8 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -683,6 +683,7 @@ typedef struct ColumnDef
 	bool		is_not_null;	/* NOT NULL constraint specified? */
 	bool		is_from_type;	/* column definition came from table type */
 	char		storage;		/* attstorage setting, or 0 for default */
+	char		*storage_name;	/* attstorage setting name or NULL for default*/
 	Node	   *raw_default;	/* default value (untransformed parse tree) */
 	Node	   *cooked_default; /* default value (transformed expr tree) */
 	char		identity;		/* attidentity setting */
diff --git a/src/test/regress/expected/alter_table.out b/src/test/regress/expected/alter_table.out
index 5ede56d9b5..fa1ee9e9b6 100644
--- a/src/test/regress/expected/alter_table.out
+++ b/src/test/regress/expected/alter_table.out
@@ -2244,7 +2244,7 @@ alter table recur1 add column f2 int;
 alter table recur1 alter column f2 type recur2; -- fails
 ERROR:  composite type recur1 cannot be made a member of itself
 -- SET STORAGE may need to add a TOAST table
-create table test_storage (a text);
+create table test_storage (a text, c text storage plain);
 alter table test_storage alter a set storage plain;
 alter table test_storage add b int default 0; -- rewrite table to remove its TOAST table
 alter table test_storage alter a set storage extended; -- re-add TOAST table
@@ -2256,6 +2256,9 @@ where oid = 'test_storage'::regclass;
  t
 (1 row)
 
+--check STORAGE correctness
+create table test_storage_failed (a text, b int storage extended);
+ERROR:  column data type integer can only have storage PLAIN
 -- test that SET STORAGE propagates to index correctly
 create index test_storage_idx on test_storage (b, a);
 alter table test_storage alter column a set storage external;
@@ -2264,6 +2267,7 @@ alter table test_storage alter column a set storage external;
  Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
 --------+---------+-----------+----------+---------+----------+--------------+-------------
  a      | text    |           |          |         | external |              | 
+ c      | text    |           |          |         | plain    |              | 
  b      | integer |           |          | 0       | plain    |              | 
 Indexes:
     "test_storage_idx" btree (b, a)
diff --git a/src/test/regress/sql/alter_table.sql b/src/test/regress/sql/alter_table.sql
index 52001e3135..534166501c 100644
--- a/src/test/regress/sql/alter_table.sql
+++ b/src/test/regress/sql/alter_table.sql
@@ -1527,7 +1527,7 @@ alter table recur1 add column f2 int;
 alter table recur1 alter column f2 type recur2; -- fails
 
 -- SET STORAGE may need to add a TOAST table
-create table test_storage (a text);
+create table test_storage (a text, c text storage plain);
 alter table test_storage alter a set storage plain;
 alter table test_storage add b int default 0; -- rewrite table to remove its TOAST table
 alter table test_storage alter a set storage extended; -- re-add TOAST table
@@ -1536,6 +1536,9 @@ select reltoastrelid <> 0 as has_toast_table
 from pg_class
 where oid = 'test_storage'::regclass;
 
+--check STORAGE correctness
+create table test_storage_failed (a text, b int storage extended);
+
 -- test that SET STORAGE propagates to index correctly
 create index test_storage_idx on test_storage (b, a);
 alter table test_storage alter column a set storage external;
-- 
2.36.1

