On 10/5/20 11:35 AM, Etsuro Fujita wrote:
Hi,
I found a small problem. If we have a mix of async and sync subplans when we catch an assertion on a busy connection. Just for example:

PLAN
====
Nested Loop (cost=100.00..174316.95 rows=975 width=8) (actual time=5.191..9.262 rows=9 loops=1)
   Join Filter: (frgn.a = l.a)
   Rows Removed by Join Filter: 8991
-> Append (cost=0.00..257.20 rows=11890 width=4) (actual time=0.419..2.773 rows=1000 loops=1)
         Async subplans: 4
-> Async Foreign Scan on f_1 l_2 (cost=100.00..197.75 rows=2925 width=4) (actual time=0.381..0.585 rows=211 loops=1) -> Async Foreign Scan on f_2 l_3 (cost=100.00..197.75 rows=2925 width=4) (actual time=0.005..0.206 rows=195 loops=1) -> Async Foreign Scan on f_3 l_4 (cost=100.00..197.75 rows=2925 width=4) (actual time=0.003..0.282 rows=187 loops=1) -> Async Foreign Scan on f_4 l_5 (cost=100.00..197.75 rows=2925 width=4) (actual time=0.003..0.316 rows=217 loops=1) -> Seq Scan on l_0 l_1 (cost=0.00..2.90 rows=190 width=4) (actual time=0.017..0.057 rows=190 loops=1) -> Materialize (cost=100.00..170.94 rows=975 width=4) (actual time=0.001..0.002 rows=9 loops=1000) -> Foreign Scan on frgn (cost=100.00..166.06 rows=975 width=4) (actual time=0.766..0.768 rows=9 loops=1)

Reproduction script 'test1.sql' see in attachment. Here I force the problem reproduction with setting enable_hashjoin and enable_mergejoin to off.

'asyncmix.patch' contains my solution to this problem.

--
regards,
Andrey Lepikhov
Postgres Professional

Attachment: test1.sql
Description: application/sql

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 14824368cc..613d406982 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -455,7 +455,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 									  void *arg);
 static void create_cursor(ForeignScanState *node);
 static void request_more_data(ForeignScanState *node);
-static void fetch_received_data(ForeignScanState *node);
+static void fetch_received_data(ForeignScanState *node, bool vacateconn);
 static void vacate_connection(PgFdwState *fdwconn, bool clear_queue);
 static void close_cursor(PGconn *conn, unsigned int cursor_number);
 static PgFdwModifyState *create_foreign_modify(EState *estate,
@@ -1706,15 +1706,19 @@ postgresIterateForeignScan(ForeignScanState *node)
 		{
 			/*
 			 * finish the running query before sending the next command for
-			 * this node
+			 * this node.
+			 * When the plan contains both asynchronous subplans and non-async
+			 * subplans backend could request more data in async mode and want to
+			 * get data in sync mode by the same connection. Here it must wait
+			 * for async data before request another.
 			 */
-			if (!fsstate->s.commonstate->busy)
-				vacate_connection((PgFdwState *)fsstate, false);
+			if (fsstate->s.commonstate->busy)
+				vacate_connection(&fsstate->s, false);
 
 			request_more_data(node);
 
 			/* Fetch the result immediately. */
-			fetch_received_data(node);
+			fetch_received_data(node, false);
 		}
 		else if (!fsstate->s.commonstate->busy)
 		{
@@ -1749,7 +1753,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 			/* fetch the leader's data and enqueue it for the next request */
 			if (available)
 			{
-				fetch_received_data(leader);
+				fetch_received_data(leader, false);
 				add_async_waiter(leader);
 			}
 		}
@@ -3729,7 +3733,7 @@ request_more_data(ForeignScanState *node)
  * Fetches received data and automatically send requests of the next waiter.
  */
 static void
-fetch_received_data(ForeignScanState *node)
+fetch_received_data(ForeignScanState *node, bool vacateconn)
 {
 	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	PGresult   *volatile res = NULL;
@@ -3817,7 +3821,8 @@ fetch_received_data(ForeignScanState *node)
 	waiter = move_to_next_waiter(node);
 
 	/* send the next request if any */
-	if (waiter)
+	if (waiter && (!vacateconn ||
+		GetPgFdwScanState(node)->s.conn != GetPgFdwScanState(waiter)->s.conn))
 		request_more_data(waiter);
 
 	MemoryContextSwitchTo(oldcontext);
@@ -3843,7 +3848,7 @@ vacate_connection(PgFdwState *fdwstate, bool clear_queue)
 	 * query
 	 */
 	leader = commonstate->leader;
-	fetch_received_data(leader);
+	fetch_received_data(leader, true);
 
 	/* let the first waiter be the next leader of this connection */
 	move_to_next_waiter(leader);

Reply via email to