On 10/5/20 11:35 AM, Etsuro Fujita wrote: Hi,I found a small problem. If we have a mix of async and sync subplans when we catch an assertion on a busy connection. Just for example:
PLAN ====Nested Loop (cost=100.00..174316.95 rows=975 width=8) (actual time=5.191..9.262 rows=9 loops=1)
Join Filter: (frgn.a = l.a) Rows Removed by Join Filter: 8991-> Append (cost=0.00..257.20 rows=11890 width=4) (actual time=0.419..2.773 rows=1000 loops=1)
Async subplans: 4-> Async Foreign Scan on f_1 l_2 (cost=100.00..197.75 rows=2925 width=4) (actual time=0.381..0.585 rows=211 loops=1) -> Async Foreign Scan on f_2 l_3 (cost=100.00..197.75 rows=2925 width=4) (actual time=0.005..0.206 rows=195 loops=1) -> Async Foreign Scan on f_3 l_4 (cost=100.00..197.75 rows=2925 width=4) (actual time=0.003..0.282 rows=187 loops=1) -> Async Foreign Scan on f_4 l_5 (cost=100.00..197.75 rows=2925 width=4) (actual time=0.003..0.316 rows=217 loops=1) -> Seq Scan on l_0 l_1 (cost=0.00..2.90 rows=190 width=4) (actual time=0.017..0.057 rows=190 loops=1) -> Materialize (cost=100.00..170.94 rows=975 width=4) (actual time=0.001..0.002 rows=9 loops=1000) -> Foreign Scan on frgn (cost=100.00..166.06 rows=975 width=4) (actual time=0.766..0.768 rows=9 loops=1)
Reproduction script 'test1.sql' see in attachment. Here I force the problem reproduction with setting enable_hashjoin and enable_mergejoin to off.
'asyncmix.patch' contains my solution to this problem. -- regards, Andrey Lepikhov Postgres Professional
test1.sql
Description: application/sql
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 14824368cc..613d406982 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -455,7 +455,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, void *arg); static void create_cursor(ForeignScanState *node); static void request_more_data(ForeignScanState *node); -static void fetch_received_data(ForeignScanState *node); +static void fetch_received_data(ForeignScanState *node, bool vacateconn); static void vacate_connection(PgFdwState *fdwconn, bool clear_queue); static void close_cursor(PGconn *conn, unsigned int cursor_number); static PgFdwModifyState *create_foreign_modify(EState *estate, @@ -1706,15 +1706,19 @@ postgresIterateForeignScan(ForeignScanState *node) { /* * finish the running query before sending the next command for - * this node + * this node. + * When the plan contains both asynchronous subplans and non-async + * subplans backend could request more data in async mode and want to + * get data in sync mode by the same connection. Here it must wait + * for async data before request another. */ - if (!fsstate->s.commonstate->busy) - vacate_connection((PgFdwState *)fsstate, false); + if (fsstate->s.commonstate->busy) + vacate_connection(&fsstate->s, false); request_more_data(node); /* Fetch the result immediately. */ - fetch_received_data(node); + fetch_received_data(node, false); } else if (!fsstate->s.commonstate->busy) { @@ -1749,7 +1753,7 @@ postgresIterateForeignScan(ForeignScanState *node) /* fetch the leader's data and enqueue it for the next request */ if (available) { - fetch_received_data(leader); + fetch_received_data(leader, false); add_async_waiter(leader); } } @@ -3729,7 +3733,7 @@ request_more_data(ForeignScanState *node) * Fetches received data and automatically send requests of the next waiter. */ static void -fetch_received_data(ForeignScanState *node) +fetch_received_data(ForeignScanState *node, bool vacateconn) { PgFdwScanState *fsstate = GetPgFdwScanState(node); PGresult *volatile res = NULL; @@ -3817,7 +3821,8 @@ fetch_received_data(ForeignScanState *node) waiter = move_to_next_waiter(node); /* send the next request if any */ - if (waiter) + if (waiter && (!vacateconn || + GetPgFdwScanState(node)->s.conn != GetPgFdwScanState(waiter)->s.conn)) request_more_data(waiter); MemoryContextSwitchTo(oldcontext); @@ -3843,7 +3848,7 @@ vacate_connection(PgFdwState *fdwstate, bool clear_queue) * query */ leader = commonstate->leader; - fetch_received_data(leader); + fetch_received_data(leader, true); /* let the first waiter be the next leader of this connection */ move_to_next_waiter(leader);