Argh! I forgot the attachments, of course.

On 6/4/21 1:48 PM, Tomas Vondra wrote:
> Hi,
> 
> Here's two WIP patches that fixes the regression for me. The first part
> is from [1], so make large batches work, 0002 just creates a copy of the
> tupledesc to not cause issues in resource owner, 0003 ensures we only
> initialize the slots once (not per batch).
> 
> With the patches applied, the timings look like this:
> 
>  batch          timing
>  ----------------------
>      1    64194.942 ms
>     10     7233.785 ms
>    100     2244.255 ms
>    32k     1372.175 ms
> 
> which seems fine. I still need to get this properly tested etc. and make
> sure nothing is left over.
> 
> regards
> 
> 
> [1]
> https://postgr.es/m/OS0PR01MB571603973C0AC2874AD6BF2594299%40OS0PR01MB5716.jpnprd01.prod.outlook.com
> 

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
>From f3bd790af94c89d5b61a9d9c23a4b05a309d73bf Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Fri, 4 Jun 2021 12:34:23 +0200
Subject: [PATCH 1/3] Add PQ_QUERY_PARAM_MAX_LIMIT

---
 contrib/postgres_fdw/postgres_fdw.c | 11 +++++++++--
 doc/src/sgml/postgres-fdw.sgml      | 11 +++++++++++
 src/interfaces/libpq/fe-exec.c      | 21 ++++++++++++---------
 src/interfaces/libpq/libpq-fe.h     |  2 ++
 4 files changed, 34 insertions(+), 11 deletions(-)

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index c48a421e88..ac86b06b8f 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1979,7 +1979,7 @@ postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
 	Assert(fmstate == NULL || fmstate->aux_fmstate == NULL);
 
 	/*
-	 * In EXPLAIN without ANALYZE, ri_fdwstate is NULL, so we have to lookup
+	 * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup
 	 * the option directly in server/table options. Otherwise just use the
 	 * value we determined earlier.
 	 */
@@ -1994,7 +1994,14 @@ postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
 		 resultRelInfo->ri_TrigDesc->trig_insert_after_row))
 		return 1;
 
-	/* Otherwise use the batch size specified for server/table. */
+	/*
+	 * Otherwise use the batch size specified for server/table. The number of
+	 * parameters in a batch is limited to 64k (uint16), so make sure we don't
+	 * exceed this limit by using the maximum batch_size possible.
+	 */
+	if (fmstate && fmstate->p_nums > 0)
+		batch_size = Min(batch_size, PQ_QUERY_PARAM_MAX_LIMIT / fmstate->p_nums);
+
 	return batch_size;
 }
 
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index fb87372bde..564651dfaa 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -372,6 +372,17 @@ OPTIONS (ADD password_required 'false');
        overrides an option specified for the server.
        The default is <literal>1</literal>.
       </para>
+
+      <para>
+       Note the actual number of rows <filename>postgres_fdw</filename> inserts at
+       once depends on the number of columns and the provided
+       <literal>batch_size</literal> value. The batch is executed as a single
+       query, and the libpq protocol (which <filename>postgres_fdw</filename>
+       uses to connect to a remote server) limits the number of parameters in a
+       single query to 64k. When the number of columns * <literal>batch_size</literal>
+       exceeds the limit, the <literal>batch_size</literal> will be adjusted to
+       avoid an error.
+      </para>
      </listitem>
     </varlistentry>
 
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 03592bdce9..832d61c544 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1403,10 +1403,11 @@ PQsendQueryParams(PGconn *conn,
 							 libpq_gettext("command string is a null pointer\n"));
 		return 0;
 	}
-	if (nParams < 0 || nParams > 65535)
+	if (nParams < 0 || nParams > PQ_QUERY_PARAM_MAX_LIMIT)
 	{
-		appendPQExpBufferStr(&conn->errorMessage,
-							 libpq_gettext("number of parameters must be between 0 and 65535\n"));
+		appendPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("number of parameters must be between 0 and %d\n"),
+						  PQ_QUERY_PARAM_MAX_LIMIT);
 		return 0;
 	}
 
@@ -1451,10 +1452,11 @@ PQsendPrepare(PGconn *conn,
 							 libpq_gettext("command string is a null pointer\n"));
 		return 0;
 	}
-	if (nParams < 0 || nParams > 65535)
+	if (nParams < 0 || nParams > PQ_QUERY_PARAM_MAX_LIMIT)
 	{
-		appendPQExpBufferStr(&conn->errorMessage,
-							 libpq_gettext("number of parameters must be between 0 and 65535\n"));
+		appendPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("number of parameters must be between 0 and %d\n"),
+						  PQ_QUERY_PARAM_MAX_LIMIT);
 		return 0;
 	}
 
@@ -1548,10 +1550,11 @@ PQsendQueryPrepared(PGconn *conn,
 							 libpq_gettext("statement name is a null pointer\n"));
 		return 0;
 	}
-	if (nParams < 0 || nParams > 65535)
+	if (nParams < 0 || nParams > PQ_QUERY_PARAM_MAX_LIMIT)
 	{
-		appendPQExpBufferStr(&conn->errorMessage,
-							 libpq_gettext("number of parameters must be between 0 and 65535\n"));
+		appendPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("number of parameters must be between 0 and %d\n"),
+						  PQ_QUERY_PARAM_MAX_LIMIT);
 		return 0;
 	}
 
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 227adde5a5..113ab52ada 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -429,6 +429,8 @@ extern PGresult *PQexecPrepared(PGconn *conn,
 								int resultFormat);
 
 /* Interface for multiple-result or asynchronous queries */
+#define PQ_QUERY_PARAM_MAX_LIMIT 65535
+
 extern int	PQsendQuery(PGconn *conn, const char *query);
 extern int	PQsendQueryParams(PGconn *conn,
 							  const char *command,
-- 
2.31.1

>From e1f083732a7f02b1093fc2435c40596d959549d5 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Fri, 4 Jun 2021 12:45:18 +0200
Subject: [PATCH 2/3] create copy of a descriptor for batching

---
 src/backend/executor/nodeModifyTable.c | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 379b056310..c287a371a1 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -678,6 +678,8 @@ ExecInsert(ModifyTableState *mtstate,
 		 */
 		if (resultRelInfo->ri_BatchSize > 1)
 		{
+			TupleDesc tdesc;
+
 			/*
 			 * If a certain number of tuples have already been accumulated, or
 			 * a tuple has come for a different relation than that for the
@@ -703,13 +705,16 @@ ExecInsert(ModifyTableState *mtstate,
 													 resultRelInfo->ri_BatchSize);
 			}
 
+			tdesc = CreateTupleDescCopy(slot->tts_tupleDescriptor);
+
 			resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
-				MakeSingleTupleTableSlot(slot->tts_tupleDescriptor,
+				MakeSingleTupleTableSlot(tdesc,
 										 slot->tts_ops);
 			ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
 						 slot);
+
 			resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
-				MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor,
+				MakeSingleTupleTableSlot(tdesc,
 										 planSlot->tts_ops);
 			ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
 						 planSlot);
-- 
2.31.1

>From 605a29661db9f89dfb21692cc6c918a439d14252 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Fri, 4 Jun 2021 12:59:47 +0200
Subject: [PATCH 3/3] initialize slots only once for batching

---
 src/backend/executor/nodeModifyTable.c | 42 ++++++++++++++------------
 src/include/nodes/execnodes.h          |  1 +
 2 files changed, 24 insertions(+), 19 deletions(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index c287a371a1..f16c17e8d4 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -678,8 +678,6 @@ ExecInsert(ModifyTableState *mtstate,
 		 */
 		if (resultRelInfo->ri_BatchSize > 1)
 		{
-			TupleDesc tdesc;
-
 			/*
 			 * If a certain number of tuples have already been accumulated, or
 			 * a tuple has come for a different relation than that for the
@@ -705,19 +703,25 @@ ExecInsert(ModifyTableState *mtstate,
 													 resultRelInfo->ri_BatchSize);
 			}
 
-			tdesc = CreateTupleDescCopy(slot->tts_tupleDescriptor);
+			/* initialize the slot, if not already done */
+			if (resultRelInfo->ri_NumSlots >= resultRelInfo->ri_BatchInitialized)
+			{
+				TupleDesc tdesc = CreateTupleDescCopy(slot->tts_tupleDescriptor);
 
-			resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
-				MakeSingleTupleTableSlot(tdesc,
-										 slot->tts_ops);
-			ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
-						 slot);
+				resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
+					MakeSingleTupleTableSlot(tdesc,
+											 slot->tts_ops);
+				ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
+							 slot);
 
-			resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
-				MakeSingleTupleTableSlot(tdesc,
-										 planSlot->tts_ops);
-			ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
-						 planSlot);
+				resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
+					MakeSingleTupleTableSlot(tdesc,
+											 planSlot->tts_ops);
+				ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
+							 planSlot);
+
+				resultRelInfo->ri_BatchInitialized++;
+			}
 
 			resultRelInfo->ri_NumSlots++;
 
@@ -1039,12 +1043,6 @@ ExecBatchInsert(ModifyTableState *mtstate,
 
 	if (canSetTag && numInserted > 0)
 		estate->es_processed += numInserted;
-
-	for (i = 0; i < numSlots; i++)
-	{
-		ExecDropSingleTupleTableSlot(slots[i]);
-		ExecDropSingleTupleTableSlot(planSlots[i]);
-	}
 }
 
 /* ----------------------------------------------------------------
@@ -3174,6 +3172,12 @@ ExecEndModifyTable(ModifyTableState *node)
 			resultRelInfo->ri_FdwRoutine->EndForeignModify != NULL)
 			resultRelInfo->ri_FdwRoutine->EndForeignModify(node->ps.state,
 														   resultRelInfo);
+
+		for (i = 0; i < resultRelInfo->ri_NumSlots; i++)
+		{
+			ExecDropSingleTupleTableSlot(resultRelInfo->ri_Slots[i]);
+			ExecDropSingleTupleTableSlot(resultRelInfo->ri_PlanSlots[i]);
+		}
 	}
 
 	/*
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 7795a69490..1062a44ee1 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -463,6 +463,7 @@ typedef struct ResultRelInfo
 	/* batch insert stuff */
 	int			ri_NumSlots;	/* number of slots in the array */
 	int			ri_BatchSize;	/* max slots inserted in a single batch */
+	int			ri_BatchInitialized;	/* number of slots initialized */
 	TupleTableSlot **ri_Slots;	/* input tuples for batch insert */
 	TupleTableSlot **ri_PlanSlots;
 
-- 
2.31.1

Reply via email to