On 22.09.2020 16:40, Konstantin Knizhnik wrote:


On 22.09.2020 15:52, Konstantin Knizhnik wrote:


On 20.08.2020 10:36, Kyotaro Horiguchi wrote:
At Wed, 19 Aug 2020 23:25:36 -0500, Justin Pryzby <pry...@telsasoft.com> wrote in
On Thu, Jul 02, 2020 at 11:14:48AM +0900, Kyotaro Horiguchi wrote:
As the result of a discussion with Fujita-san off-list, I'm going to
hold off development until he decides whether mine or Thomas' is
better.
The latest patch doesn't apply so I set as WoA.
https://commitfest.postgresql.org/29/2491/
Thanks. This is rebased version.

At Fri, 14 Aug 2020 13:29:16 +1200, Thomas Munro <thomas.mu...@gmail.com> wrote in
Either way, we definitely need patch 0001.  One comment:

-CreateWaitEventSet(MemoryContext context, int nevents)
+CreateWaitEventSet(MemoryContext context, ResourceOwner res, int nevents)

I wonder if it's better to have it receive ResourceOwner like that, or
to have it capture CurrentResourceOwner.  I think the latter is more
common in existing code.
There's no existing WaitEventSets belonging to a resowner. So
unconditionally capturing CurrentResourceOwner doesn't work well. I
could pass a bool instead but that make things more complex.

Come to think of "complex", ExecAsync stuff in this patch might be
too-much for a short-term solution until executor overhaul, if it
comes shortly. (the patch of mine here as a whole is like that,
though..). The queueing stuff in postgres_fdw is, too.

regards.



Hi,
Looks like current implementation of asynchronous append incorrectly handle LIMIT clause:

psql:append.sql:10: ERROR:  another command is already in progress
CONTEXT:  remote SQL command: CLOSE c1



Just FYI: the following patch fixes the problem:

--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1667,6 +1667,11 @@ remove_async_node(ForeignScanState *node)

         if (cur == node)
         {
+            PGconn *conn = curstate->s.conn;
+
+            while(PQisBusy(conn))
+                PQclear(PQgetResult(conn));
+
             prev_state->waiter = curstate->waiter;

             /* relink to the previous node if the last node was removed */


Sorry, but it is not the only problem.
If you execute the query above and then in the same backend try to insert more records, then backend is crashed:

Program terminated with signal SIGSEGV, Segmentation fault.
#0  0x00007f5dfc59a231 in fetch_received_data (node=0x230c130) at postgres_fdw.c:3736
3736        Assert(fsstate->s.commonstate->leader == node);
(gdb) p sstate->s.commonstate
No symbol "sstate" in current context.
(gdb) p fsstate->s.commonstate
Cannot access memory at address 0x7f7f7f7f7f7f7f87

Also my patch doesn't solve the problem for small number of records (100) in the table.
I attach yet another patch which fix both problems.
Please notice that I did not go deep inside code of async append, so I am not sure that my patch is complete and correct.


--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 1482436..ff15642 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1623,7 +1623,19 @@ remove_async_node(ForeignScanState *node)
 	PgFdwScanState		*prev_state;
 	ForeignScanState	*cur;
 
-	/* no need to remove me */
+	if (fsstate->s.commonstate->busy)
+	{
+		/*
+		 * this node is waiting for result, absorb the result first so
+		 * that the following commands can be sent on the connection.
+		 */
+		PGconn *conn = fsstate->s.conn;
+
+		while(PQisBusy(conn))
+			PQclear(PQgetResult(conn));
+	}
+
+    /* no need to remove me */
 	if (!leader || !fsstate->queued)
 		return;
 
@@ -1631,23 +1643,7 @@ remove_async_node(ForeignScanState *node)
 
 	if (leader == node)
 	{
-		if (leader_state->s.commonstate->busy)
-		{
-			/*
-			 * this node is waiting for result, absorb the result first so
-			 * that the following commands can be sent on the connection.
-			 */
-			PgFdwScanState *leader_state = GetPgFdwScanState(leader);
-			PGconn *conn = leader_state->s.conn;
-
-			while(PQisBusy(conn))
-				PQclear(PQgetResult(conn));
-
-			leader_state->s.commonstate->busy = false;
-		}
-
 		move_to_next_waiter(node);
-
 		return;
 	}
 
@@ -1858,7 +1854,7 @@ postgresEndForeignScan(ForeignScanState *node)
 	/* Release remote connection */
 	ReleaseConnection(fsstate->s.conn);
 	fsstate->s.conn = NULL;
-
+	fsstate->s.commonstate->leader = NULL;
 	/* MemoryContexts will be deleted automatically. */
 }
 

Reply via email to