Hi hackers,

I ran into an issue today when I was trying to insert a complex types where one of its attributes is also an array of complex types,

As an example:

CREATE TYPE inventory_item AS
(
    name        text,
    supplier_id integer,
    price       numeric
);
CREATE TYPE item_2d AS (id int, items inventory_item[][]);

CREATE TABLE item_2d_table (id int, item item_2d);

INSERT INTO item_2d_table VALUES(1, '(1,{{("inv a",42,1.99),("inv b",42,1.99)},{("inv c",42,1.99),("inv d",42,2)}})');

The INSERT statement will fail due to how complex types are parsed, I have included a patch in this email to support this scenario.

The reason why this fails is because record_in lacks support of detecting an array string when one of the attributes is of type array. Due to this, it will stop processing the column value prematurely, which results in a corrupted value for that particular column. As a result array_in will receive a malformed string which is bound to error.

To fix this, record_in can detect columns that are of type array and in such cases leave the input intact. array_in will attempt to extract the elements one by one. In case it is dealing with unquoted elements, the logic needs to slightly change, since if the element is a record, quotes can be allowed, ex: {{("test field")}

There are some adjustments that can be made to the patch, for example:
We can detect the number of the dimensions of the array in record_in, do we want to error out in case the string has more dimensions than MAXDIM in array.h, (to prevent number over/underflow-ing) or whether we want to error out if number of dimensions is not the same with the number of dimensions that the attribute is supposed to have, or both?

Regards,
Arjan Marku
From 4524a944e47c81fb6e738103441ddc202aca5a59 Mon Sep 17 00:00:00 2001
From: binoverfl0w <arjanmark...@gmail.com>
Date: Sat, 13 Jul 2024 19:01:25 +0200
Subject: [PATCH v1] [PATCH v1] Fix parsing of a complex type that has an array
 of complex types

record_in lacks support of detecting an array string when one of the attributes is of type array.
Due to this, it will stop processing the column value prematurely, which results in a corrupted value for that particular column.
As a result array_in will receive a malformed string which is bound to error.

To fix this, record_in can detect columns that are of type array and in such cases leave the input intact.
array_in will attempt to extract the elements one by one. In case it is dealing with unquoted elements, the logic needs to slightly change, since if the element is a record, quotes can be allowed, ex: {{("test field")}
---
 src/backend/utils/adt/arrayfuncs.c | 26 +++++++++++++++++++++---
 src/backend/utils/adt/rowtypes.c   | 32 ++++++++++++++++++++++++++++--
 2 files changed, 53 insertions(+), 5 deletions(-)

diff --git a/src/backend/utils/adt/arrayfuncs.c b/src/backend/utils/adt/arrayfuncs.c
index d6641b570d..6925b6a4ff 100644
--- a/src/backend/utils/adt/arrayfuncs.c
+++ b/src/backend/utils/adt/arrayfuncs.c
@@ -101,7 +101,7 @@ static bool ReadArrayStr(char **srcptr,
 						 int *nitems_p,
 						 Datum **values_p, bool **nulls_p,
 						 const char *origStr, Node *escontext);
-static ArrayToken ReadArrayToken(char **srcptr, StringInfo elembuf, char typdelim,
+static ArrayToken ReadArrayToken(char **srcptr, StringInfo elembuf, char typdelim, bool is_record,
 								 const char *origStr, Node *escontext);
 static void ReadArrayBinary(StringInfo buf, int nitems,
 							FmgrInfo *receiveproc, Oid typioparam, int32 typmod,
@@ -603,6 +603,7 @@ ReadArrayStr(char **srcptr,
 	bool		ndim_frozen;
 	bool		expect_delim;
 	int			nelems[MAXDIM];
+	bool		is_record;
 
 	/* Allocate some starting output workspace; we'll enlarge as needed */
 	maxitems = 16;
@@ -615,6 +616,9 @@ ReadArrayStr(char **srcptr,
 	/* Loop below assumes first token is ATOK_LEVEL_START */
 	Assert(**srcptr == '{');
 
+	/* Whether '(' should be treated as a special character that denotes the start of a record */
+	is_record = (inputproc->fn_oid == F_RECORD_IN);
+
 	/* Parse tokens until we reach the matching right brace */
 	nest_level = 0;
 	nitems = 0;
@@ -624,7 +628,7 @@ ReadArrayStr(char **srcptr,
 	{
 		ArrayToken	tok;
 
-		tok = ReadArrayToken(srcptr, &elembuf, typdelim, origStr, escontext);
+		tok = ReadArrayToken(srcptr, &elembuf, typdelim, is_record, origStr, escontext);
 
 		switch (tok)
 		{
@@ -793,7 +797,7 @@ dimension_error:
  * If the token is ATOK_ELEM, the de-escaped string is returned in elembuf.
  */
 static ArrayToken
-ReadArrayToken(char **srcptr, StringInfo elembuf, char typdelim,
+ReadArrayToken(char **srcptr, StringInfo elembuf, char typdelim, bool is_record,
 			   const char *origStr, Node *escontext)
 {
 	char	   *p = *srcptr;
@@ -912,6 +916,22 @@ unquoted_element:
 				dstlen = elembuf->len;	/* treat it as non-whitespace */
 				has_escapes = true;
 				break;
+			case '(':
+				if (is_record) {
+					bool in_quote = false;
+					while (*p && (in_quote || *p != ')')) {
+						if (*p == '"') {
+							in_quote = !in_quote;
+						}
+						appendStringInfoChar(elembuf, *p++);
+					}
+					if (*p == '\0')
+						goto ending_error;
+					appendStringInfoChar(elembuf, *p++);
+					dstlen = elembuf->len;
+					break;
+				}
+				// fall through
 			default:
 				/* End of elem? */
 				if (*p == typdelim || *p == '}')
diff --git a/src/backend/utils/adt/rowtypes.c b/src/backend/utils/adt/rowtypes.c
index 0214c23a1d..8aea524144 100644
--- a/src/backend/utils/adt/rowtypes.c
+++ b/src/backend/utils/adt/rowtypes.c
@@ -204,8 +204,15 @@ record_in(PG_FUNCTION_ARGS)
 			/* Extract string for this column */
 			bool		inquote = false;
 
+			/* Keep the string as it is since this is an array and its elements will be extracted later */
+			bool		is_array = att->attndims != 0;
+			int16	    nested_level = 0;
+
+			/* If '{' / '}' are encountered outside a record, they denote the start / end of an array dimension  */
+			bool		in_record = false;
+
 			resetStringInfo(&buf);
-			while (inquote || !(*ptr == ',' || *ptr == ')'))
+			while (is_array || inquote || !(*ptr == ',' || *ptr == ')'))
 			{
 				char		ch = *ptr++;
 
@@ -229,6 +236,9 @@ record_in(PG_FUNCTION_ARGS)
 								 errdetail("Unexpected end of input.")));
 						goto fail;
 					}
+					if (is_array) {
+						appendStringInfoChar(&buf, ch);
+					}
 					appendStringInfoChar(&buf, *ptr++);
 				}
 				else if (ch == '"')
@@ -242,9 +252,27 @@ record_in(PG_FUNCTION_ARGS)
 					}
 					else
 						inquote = false;
+
+					if (is_array) {
+						appendStringInfoChar(&buf, ch);
+					}
+				} else if (!in_record && !inquote && (ch == '{' || ch == '}')) {
+					if (ch == '{') {
+						nested_level++;
+					} else {
+						nested_level--;
+						if (nested_level == 0) {
+							is_array = false;
+						}
+					}
+					appendStringInfoChar(&buf, ch);
 				}
-				else
+				else {
+					if (is_array && !inquote && (ch == '(' || ch == ')')) {
+						in_record = (ch == '(');
+					}
 					appendStringInfoChar(&buf, ch);
+				}
 			}
 
 			column_data = buf.data;
-- 
2.45.2

Reply via email to