On 2020/10/02 0:46, Bharath Rupireddy wrote:
On Thu, Oct 1, 2020 at 8:10 PM Fujii Masao <masao.fu...@oss.nttdata.com> wrote:

pg_stat_clear_snapshot() can be used to reset the entry.


Thanks. I wasn't knowing it.


+               EXIT WHEN proccnt = 0;
+    END LOOP;

Isn't it better to sleep here, to avoid th busy loop?


+1.


So what I thought was something like

CREATE OR REPLACE PROCEDURE wait_for_backend_termination()
LANGUAGE plpgsql
AS $$
BEGIN
      LOOP
          PERFORM * FROM pg_stat_activity WHERE application_name = 
'fdw_retry_check';
          EXIT WHEN NOT FOUND;
          PERFORM pg_sleep(1), pg_stat_clear_snapshot();
      END LOOP;
END;
$$;


Changed.

Attaching v8 patch, please review it.. Both make check and make
check-world passes on v8.

Thanks for updating the patch! It basically looks good to me.
I tweaked the patch as follows.

+               if (!entry->conn ||
+                       PQstatus(entry->conn) != CONNECTION_BAD ||

With the above change, if entry->conn is NULL, an error is thrown and no new
connection is reestablished. But why? IMO it's more natural to reestablish
new connection in that case. So I removed "!entry->conn" from the above
condition.

+               ereport(DEBUG3,
+                               (errmsg("could not start remote transaction on 
connection %p",
+                                entry->conn)),

I replaced errmsg() with errmsg_internal() because the translation of
this debug message is not necessary.


+SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE
+backend_type = 'client backend' AND application_name = 'fdw_retry_check';
+CALL wait_for_backend_termination();

Since we always use pg_terminate_backend() and wait_for_backend_termination()
together, I merged them into one function.

I simplied the comments on the regression test.

 Attached is the updated version of the patch. If this patch is ok,
 I'd like to mark it as ready for committer.


I have another question not related to this patch: though we have
wait_pid() function, we are not able to use it like
pg_terminate_backend() in other modules, wouldn't be nice if we can
make it generic under the name pg_wait_pid() and usable across all pg
modules?

I thought that, too. But I could not come up with good idea for *real* use case
of that function. At least that's useful for the regression test, though.
Anyway, IMO it's worth proposing that and hearing more opinions about that
from other hackers.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/contrib/postgres_fdw/connection.c 
b/contrib/postgres_fdw/connection.c
index 08daf26fdf..901d3a4661 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -108,6 +108,7 @@ PGconn *
 GetConnection(UserMapping *user, bool will_prep_stmt)
 {
        bool            found;
+       volatile bool retry_conn = false;
        ConnCacheEntry *entry;
        ConnCacheKey key;
 
@@ -159,23 +160,25 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
        /* Reject further use of connections which failed abort cleanup. */
        pgfdw_reject_incomplete_xact_state_change(entry);
 
+retry:
        /*
         * If the connection needs to be remade due to invalidation, disconnect 
as
-        * soon as we're out of all transactions.
+        * soon as we're out of all transactions. Also, if previous attempt to
+        * start new remote transaction failed on the cached connection, 
disconnect
+        * it to retry a new connection.
         */
-       if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+       if ((entry->conn != NULL && entry->invalidated &&
+                entry->xact_depth == 0) || retry_conn)
        {
-               elog(DEBUG3, "closing connection %p for option changes to take 
effect",
-                        entry->conn);
+               if (retry_conn)
+                       elog(DEBUG3, "closing connection %p to reestablish a 
new one",
+                                entry->conn);
+               else
+                       elog(DEBUG3, "closing connection %p for option changes 
to take effect",
+                                entry->conn);
                disconnect_pg_server(entry);
        }
 
-       /*
-        * We don't check the health of cached connection here, because it would
-        * require some overhead.  Broken connection will be detected when the
-        * connection is actually used.
-        */
-
        /*
         * If cache entry doesn't have a connection, we have to establish a new
         * connection.  (If connect_pg_server throws an error, the cache entry
@@ -206,9 +209,35 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
        }
 
        /*
-        * Start a new transaction or subtransaction if needed.
+        * We check the health of the cached connection here when starting
+        * a new remote transaction. If broken connection is detected,
+        * we try to reestablish a new connection. If broken connection is
+        * detected again here, we give up getting a connection.
         */
-       begin_remote_xact(entry);
+       PG_TRY();
+       {
+               /* Start a new transaction or subtransaction if needed. */
+               begin_remote_xact(entry);
+               retry_conn = false;
+       }
+       PG_CATCH();
+       {
+               if (PQstatus(entry->conn) != CONNECTION_BAD ||
+                       entry->xact_depth > 0 ||
+                       retry_conn)
+                       PG_RE_THROW();
+               retry_conn = true;
+       }
+       PG_END_TRY();
+
+       if (retry_conn)
+       {
+               ereport(DEBUG3,
+                               (errmsg_internal("could not start remote 
transaction on connection %p",
+                                entry->conn)),
+                                errdetail_internal("%s", 
pchomp(PQerrorMessage(entry->conn))));
+               goto retry;
+       }
 
        /* Remember if caller will prepare statements */
        entry->have_prep_stmt |= will_prep_stmt;
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out 
b/contrib/postgres_fdw/expected/postgres_fdw.out
index 10e23d02ed..2c5614073f 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8987,3 +8987,51 @@ PREPARE TRANSACTION 'fdw_tpc';
 ERROR:  cannot PREPARE a transaction that has operated on postgres_fdw foreign 
tables
 ROLLBACK;
 WARNING:  there is no transaction in progress
+-- ===================================================================
+-- reestablish new connection
+-- ===================================================================
+-- Terminate the backend having the specified application_name and wait for
+-- the termination to complete.
+CREATE OR REPLACE PROCEDURE terminate_backend_and_wait(appname text) AS $$
+BEGIN
+    PERFORM pg_terminate_backend(pid) FROM pg_stat_activity
+    WHERE application_name = appname;
+    LOOP
+        PERFORM * FROM pg_stat_activity WHERE application_name = appname;
+        EXIT WHEN NOT FOUND;
+        PERFORM pg_sleep(1), pg_stat_clear_snapshot();
+    END LOOP;
+END;
+$$ LANGUAGE plpgsql;
+-- Change application_name of remote connection to special one
+-- so that we can easily terminate the connection later.
+ALTER SERVER loopback OPTIONS (application_name 'fdw_retry_check');
+SELECT 1 FROM ft1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- Terminate the remote connection.
+CALL terminate_backend_and_wait('fdw_retry_check');
+-- This query should detect the broken connection when starting new remote
+-- transaction, reestablish new connection, and then succeed.
+BEGIN;
+SELECT 1 FROM ft1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- If the query detects the broken connection when starting new remote
+-- subtransaction, it doesn't reestablish new connection and should fail.
+CALL terminate_backend_and_wait('fdw_retry_check');
+SAVEPOINT s;
+SELECT 1 FROM ft1 LIMIT 1;    -- should fail
+ERROR:  server closed the connection unexpectedly
+       This probably means the server terminated abnormally
+       before or while processing the request.
+CONTEXT:  remote SQL command: SAVEPOINT s2
+COMMIT;
+-- Clean up
+DROP PROCEDURE terminate_backend_and_wait(text);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql 
b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 78156d10b4..4da1f78956 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2653,3 +2653,44 @@ SELECT count(*) FROM ft1;
 -- error here
 PREPARE TRANSACTION 'fdw_tpc';
 ROLLBACK;
+
+-- ===================================================================
+-- reestablish new connection
+-- ===================================================================
+
+-- Terminate the backend having the specified application_name and wait for
+-- the termination to complete.
+CREATE OR REPLACE PROCEDURE terminate_backend_and_wait(appname text) AS $$
+BEGIN
+    PERFORM pg_terminate_backend(pid) FROM pg_stat_activity
+    WHERE application_name = appname;
+    LOOP
+        PERFORM * FROM pg_stat_activity WHERE application_name = appname;
+        EXIT WHEN NOT FOUND;
+        PERFORM pg_sleep(1), pg_stat_clear_snapshot();
+    END LOOP;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Change application_name of remote connection to special one
+-- so that we can easily terminate the connection later.
+ALTER SERVER loopback OPTIONS (application_name 'fdw_retry_check');
+SELECT 1 FROM ft1 LIMIT 1;
+
+-- Terminate the remote connection.
+CALL terminate_backend_and_wait('fdw_retry_check');
+
+-- This query should detect the broken connection when starting new remote
+-- transaction, reestablish new connection, and then succeed.
+BEGIN;
+SELECT 1 FROM ft1 LIMIT 1;
+
+-- If the query detects the broken connection when starting new remote
+-- subtransaction, it doesn't reestablish new connection and should fail.
+CALL terminate_backend_and_wait('fdw_retry_check');
+SAVEPOINT s;
+SELECT 1 FROM ft1 LIMIT 1;    -- should fail
+COMMIT;
+
+-- Clean up
+DROP PROCEDURE terminate_backend_and_wait(text);

Reply via email to